Persistence¶
Pluggable operational data persistence: protocol, configuration, SQLite backend, and Postgres backend.
Protocol¶
protocol
¶
PersistenceBackend protocol -- lifecycle + repository access.
Application code depends on this protocol for storage lifecycle management. Repository protocols provide entity-level access.
PersistenceBackend
¶
Bases: Protocol
Lifecycle management for operational data storage.
Concrete backends implement this protocol to provide connection management, health monitoring, schema migrations, and access to entity-specific repositories.
Attributes:
| Name | Type | Description |
|---|---|---|
is_connected |
bool
|
Whether the backend has an active connection. |
backend_name |
NotBlankStr
|
Human-readable backend identifier. |
tasks |
TaskRepository
|
Repository for Task persistence. |
cost_records |
CostRecordRepository
|
Repository for CostRecord persistence. |
messages |
MessageRepository
|
Repository for Message persistence. |
lifecycle_events |
LifecycleEventRepository
|
Repository for AgentLifecycleEvent persistence. |
task_metrics |
TaskMetricRepository
|
Repository for TaskMetricRecord persistence. |
collaboration_metrics |
CollaborationMetricRepository
|
Repository for CollaborationMetricRecord persistence. |
parked_contexts |
ParkedContextRepository
|
Repository for ParkedContext persistence. |
audit_entries |
AuditRepository
|
Repository for AuditEntry persistence. |
users |
UserRepository
|
Repository for User persistence. |
api_keys |
ApiKeyRepository
|
Repository for ApiKey persistence. |
checkpoints |
CheckpointRepository
|
Repository for Checkpoint persistence. |
heartbeats |
HeartbeatRepository
|
Repository for Heartbeat persistence. |
agent_states |
AgentStateRepository
|
Repository for AgentRuntimeState persistence. |
settings |
SettingsRepository
|
Repository for namespaced settings persistence. |
artifacts |
ArtifactRepository
|
Repository for Artifact persistence. |
projects |
ProjectRepository
|
Repository for Project persistence. |
custom_presets |
PersonalityPresetRepository
|
Repository for custom personality preset persistence. |
workflow_definitions |
WorkflowDefinitionRepository
|
Repository for workflow definition persistence. |
workflow_executions |
WorkflowExecutionRepository
|
Repository for workflow execution persistence. |
workflow_versions |
VersionRepository[WorkflowDefinition]
|
Repository for workflow definition version snapshot persistence. |
identity_versions |
VersionRepository[AgentIdentity]
|
Repository for AgentIdentity version snapshot persistence. |
evaluation_config_versions |
VersionRepository[EvaluationConfig]
|
Repository for EvaluationConfig version snapshot persistence. |
budget_config_versions |
VersionRepository[BudgetConfig]
|
Repository for BudgetConfig version snapshot persistence. |
company_versions |
VersionRepository[Company]
|
Repository for Company version snapshot persistence. |
role_versions |
VersionRepository[Role]
|
Repository for Role version snapshot persistence. |
decision_records |
DecisionRepository
|
Repository for DecisionRecord persistence (auditable approval-gate decisions drop-box). |
risk_overrides |
RiskOverrideRepository
|
Repository for RiskTierOverride persistence. |
ssrf_violations |
SsrfViolationRepository
|
Repository for SsrfViolation persistence. |
circuit_breaker_state |
CircuitBreakerStateRepository
|
Repository for circuit breaker state persistence. |
connections |
ConnectionRepository
|
Repository for external service connection persistence. |
connection_secrets |
ConnectionSecretRepository
|
Repository for encrypted connection secret persistence. |
oauth_states |
OAuthStateRepository
|
Repository for transient OAuth authorization state persistence. |
webhook_receipts |
WebhookReceiptRepository
|
Repository for webhook receipt log persistence. |
idempotency_keys |
IdempotencyRepository
|
Repository for persistent idempotency keys -- atomic claim/complete/fail primitive shared by webhook receivers, the backup endpoint, and any other retry-prone surface that needs cross-restart deduplication. |
training_plans |
TrainingPlanRepository
|
Repository for training plan persistence. |
training_results |
TrainingResultRepository
|
Repository for training result persistence. |
custom_rules |
CustomRuleRepository
|
Repository for custom signal rule persistence. |
kind
property
¶
Return the backend's discriminator string.
One of "sqlite" or "postgres". Used by call sites that
need to pick a backend-specific helper (e.g. backup handler
factories) without isinstance checks. The Literal type
means mypy rejects an implementation that returns any other
string.
collaboration_metrics
property
¶
Repository for CollaborationMetricRecord persistence.
provider_audit_events
property
¶
Repository for the provider mutation audit log.
preset_overrides
property
¶
Repository for operator-authored provider preset overrides.
decision_records
property
¶
Repository for DecisionRecord persistence (decisions drop-box).
workflow_definitions
property
¶
Repository for workflow definition persistence.
workflow_versions
property
¶
Repository for workflow definition version snapshot persistence.
identity_versions
property
¶
Repository for AgentIdentity version snapshot persistence.
evaluation_config_versions
property
¶
Repository for EvaluationConfig version snapshot persistence.
budget_config_versions
property
¶
Repository for BudgetConfig version snapshot persistence.
circuit_breaker_state
property
¶
Repository for circuit breaker state persistence.
ceremony_scheduler_state
property
¶
Repository for ceremony scheduler per-sprint state snapshots.
meeting_cooldown
property
¶
Repository for meeting cooldown last-triggered timestamps.
tracked_containers
property
¶
Repository for Docker sandbox tracked-container records.
connection_secrets
property
¶
Repository for encrypted connection secret persistence.
principle_overrides
property
¶
Repository for rollback-restored principle overrides.
project_cost_aggregates
property
¶
Repository for durable per-project cost aggregates.
fine_tune_checkpoints
property
¶
Repository for fine-tune checkpoint persistence.
Implementations that do not support fine-tuning MUST raise
NotImplementedError with a descriptive message so callers
do not silently receive an unusable repo.
fine_tune_runs
property
¶
Repository for fine-tune pipeline run persistence.
Same availability semantics as :attr:fine_tune_checkpoints.
connect
async
¶
Establish connection to the storage backend.
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If the connection cannot be established. |
disconnect
async
¶
health_check
async
¶
Check whether the backend is healthy and responsive.
Returns:
| Type | Description |
|---|---|
bool
|
|
migrate
async
¶
get_db
¶
Return the underlying database connection.
Returns:
| Type | Description |
|---|---|
Any
|
The raw database connection object (backend-specific). |
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not yet connected. |
write_context
¶
Async context manager around mutating SQL on this backend.
The mutual-exclusion guarantee is backend-specific:
- SQLite acquires a shared in-process write lock so that
multi-statement transactions on the single
aiosqlite.Connectioncannot interleave at the statement level. Concurrent writers on the same backend instance serialize. - Postgres yields immediately: each repository operation checks out an independent connection from the async pool, so writers are already isolated at the database level. The method exists on this backend only to keep the cross-backend interface uniform; it does not provide mutual exclusion beyond what the pool already gives.
Use it in repository write paths so the same code path works on both backends::
async with backend.write_context():
await db.execute(...)
await db.commit()
Each call returns a fresh context manager. On SQLite, the
underlying lock primitive is shared across calls so concurrent
callers serialize. On Postgres, there is no shared primitive.
Repositories are wired with this method (as a callable) at
backend construction; callers that already hold a
PersistenceBackend reference can use it directly for
cross-repo transactional boundaries.
Callers must not rely on write_context for distributed
mutual exclusion or cross-backend serializability.
Source code in src/synthorg/persistence/protocol.py
build_lockouts
¶
Construct a lockout repository for this backend.
Method-based rather than property because :class:LockoutRepository
needs the operator's AuthConfig (threshold, window, duration)
which is app-layer config, not persistence-layer. Callers supply
the config at startup; the returned repo shares this backend's
connection / pool.
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If the backend is not connected. |
Source code in src/synthorg/persistence/protocol.py
build_escalations
¶
Construct an escalation queue repository for this backend.
Method-based rather than property because Postgres escalations
accept an optional NOTIFY channel name -- cross-instance notify
config lives on the escalation subsystem, not on persistence.
notify_channel is ignored by the SQLite implementation.
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If the backend is not connected. |
Source code in src/synthorg/persistence/protocol.py
build_ontology_versioning
¶
Construct the ontology versioning service bound to this backend.
Returns a versioning service wired to the backend's active DB
handle. SQLite implementations bind the service to their
aiosqlite.Connection; Postgres implementations bind to their
AsyncConnectionPool.
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If the backend is not connected. |
Source code in src/synthorg/persistence/protocol.py
get_setting
async
¶
Retrieve a setting value by key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
NotBlankStr
|
Setting key. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The setting value, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/protocol.py
set_setting
async
¶
Store a setting value.
Upserts -- creates or updates the key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
NotBlankStr
|
Setting key. |
required |
value
|
str
|
Setting value. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/protocol.py
Config¶
config
¶
Persistence configuration models.
Frozen Pydantic models for persistence backend selection and backend-specific settings.
SQLiteConfig
pydantic-model
¶
Bases: BaseModel
SQLite-specific persistence configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
NotBlankStr
|
Database file path. Use |
wal_mode |
bool
|
Whether to enable WAL journal mode for concurrent read performance. |
journal_size_limit |
int
|
Maximum WAL journal size in bytes (default 64 MB). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
path(NotBlankStr) -
wal_mode(bool) -
journal_size_limit(int)
Validators:
-
_reject_traversal
PostgresConfig
pydantic-model
¶
Bases: BaseModel
Postgres-specific persistence configuration.
Credentials are carried as SecretStr so they are redacted from
logs, repr output, and Pydantic serialization unless
explicitly unwrapped via get_secret_value().
Attributes:
| Name | Type | Description |
|---|---|---|
host |
NotBlankStr
|
Database host. |
port |
int
|
Database port (default 5432). |
database |
NotBlankStr
|
Database name. |
username |
NotBlankStr
|
Database username. |
password |
SecretStr
|
Database password (redacted in logs). |
ssl_mode |
PostgresSslMode
|
libpq SSL mode. Default |
pool_min_size |
int
|
Minimum pooled connections (warmed on connect). |
pool_max_size |
int
|
Maximum pooled connections; must be
|
pool_timeout_seconds |
float
|
Seconds to wait for a pool checkout before raising. |
application_name |
NotBlankStr
|
libpq |
statement_timeout_ms |
int
|
Postgres |
connect_timeout_seconds |
float
|
Seconds to wait for an initial connection attempt before raising. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
host(NotBlankStr) -
port(int) -
database(NotBlankStr) -
username(NotBlankStr) -
password(SecretStr) -
ssl_mode(PostgresSslMode) -
pool_min_size(int) -
pool_max_size(int) -
pool_timeout_seconds(float) -
application_name(NotBlankStr) -
statement_timeout_ms(int) -
connect_timeout_seconds(float) -
enable_timescaledb(bool) -
cost_records_chunk_interval(NotBlankStr) -
audit_entries_chunk_interval(NotBlankStr)
Validators:
-
_validate_chunk_interval→cost_records_chunk_interval,audit_entries_chunk_interval -
_validate_pool_sizes
application_name
pydantic-field
¶
libpq application_name session parameter
statement_timeout_ms
pydantic-field
¶
Postgres statement_timeout session param in ms (0 disables)
connect_timeout_seconds
pydantic-field
¶
Initial connection timeout in seconds
enable_timescaledb
pydantic-field
¶
Enable TimescaleDB hypertable conversion for append-only time-series tables (cost_records, audit_entries). Uses Apache-2.0 licensed hypertable features only; retention policies and compression are Timescale-License features and are not used. Requires the timescaledb extension on the Postgres server. Not supported on managed Postgres providers (AWS RDS, Cloud SQL, Azure Postgres).
cost_records_chunk_interval
pydantic-field
¶
Hypertable chunk interval for cost_records. Ignored when enable_timescaledb is False.
audit_entries_chunk_interval
pydantic-field
¶
Hypertable chunk interval for audit_entries. Ignored when enable_timescaledb is False.
PersistenceConfig
pydantic-model
¶
Bases: BaseModel
Top-level persistence configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
backend |
NotBlankStr
|
Backend name. One of |
sqlite |
SQLiteConfig
|
SQLite-specific settings (used when
|
postgres |
PostgresConfig | None
|
Postgres-specific settings (required when
|
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
backend(NotBlankStr) -
sqlite(SQLiteConfig) -
postgres(PostgresConfig | None)
Validators:
-
_validate_backend_name
Repositories¶
Repository protocols are split by domain so each lives alongside its typed-ID and helper imports.
agent_state_protocol
¶
AgentState repository protocol.
AgentStateRepository
¶
Bases: IdKeyedRepository['AgentRuntimeState', NotBlankStr], Protocol
CRUD + query interface for agent runtime state persistence.
Composes :class:IdKeyedRepository (ADR-0001). Bespoke per D7:
:meth:get_active filters non-idle agents and orders by
last_activity_at DESC, which the generic list_items cannot
express and which dashboard live views poll on the hot path.
save
async
¶
Upsert an agent runtime state by agent_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
AgentRuntimeState
|
The agent runtime state to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/agent_state_protocol.py
get
async
¶
Retrieve an agent runtime state by agent ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The agent identifier. |
required |
Returns:
| Type | Description |
|---|---|
AgentRuntimeState | None
|
The agent state, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/agent_state_protocol.py
list_items
async
¶
List all agent runtime states in agent_id order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[AgentRuntimeState, ...]
|
Agent states in ascending |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/agent_state_protocol.py
get_active
async
¶
Retrieve a bounded page of non-idle agent states.
Returns states where status != 'idle', ordered by
last_activity_at descending then agent_id ascending
(the stable secondary key makes paging deterministic when
activity timestamps tie). Callers that need every active
state drain via
:func:synthorg.persistence._shared.collect_all.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[AgentRuntimeState, ...]
|
A page of active agent states. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/agent_state_protocol.py
delete
async
¶
Delete an agent runtime state by agent ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The agent identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/agent_state_protocol.py
artifact_protocol
¶
Artifact repository protocol.
ArtifactFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for ArtifactRepository.query (ADR-0001).
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
task_id(NotBlankStr | None) -
created_by(NotBlankStr | None) -
artifact_type(ArtifactType | None)
ArtifactRepository
¶
Bases: IdKeyedRepository[Artifact, NotBlankStr], FilteredQueryRepository[Artifact, ArtifactFilterSpec], Protocol
CRUD + query interface for Artifact persistence.
Composes :class:IdKeyedRepository + :class:FilteredQueryRepository
(ADR-0001).
The single bespoke method :meth:save_returning_outcome is retained
(D7 bespoke-method policy) because callers need the insert/update
outcome to attach the correct API_ARTIFACT_CREATED /
API_ARTIFACT_UPDATED audit event without a TOCTOU get + save
race. This avoids the rare but real bug where concurrent writers both
observe "missing" and both report API_ARTIFACT_CREATED.
save
async
¶
Persist an artifact (insert or update by id).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
Artifact
|
The artifact to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
save_returning_outcome
async
¶
Persist an artifact atomically and return the insert/update outcome.
This is a D7 bespoke method retained for the audit-event correctness
invariant: callers need to know whether the write was an insert or
update so they can emit the corresponding API_ARTIFACT_CREATED or
API_ARTIFACT_UPDATED event. The outcome is computed atomically
with the write (SQLite: INSERT ... ON CONFLICT(id) DO NOTHING
rowcount; Postgres: xmax = 0 AS created) to avoid the TOCTOU
window of a separate get() probe.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
artifact
|
Artifact
|
The artifact to persist. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
it updated an existing row in place. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/artifact_protocol.py
get
async
¶
Retrieve an artifact by its ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The artifact identifier. |
required |
Returns:
| Type | Description |
|---|---|
Artifact | None
|
The artifact, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/artifact_protocol.py
list_items
async
¶
List all artifacts with pagination.
Results are ordered by artifact ID ascending to ensure deterministic pagination across backends.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[Artifact, ...]
|
Artifacts ordered by id ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/artifact_protocol.py
query
async
¶
List artifacts matching the filter spec (paginated).
Results are ordered by artifact ID ascending to ensure deterministic pagination across backends.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
ArtifactFilterSpec
|
Carries optional filters for task_id, created_by, artifact_type. |
required |
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[Artifact, ...]
|
Matching artifacts ordered by ID, as a tuple. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
QueryError
|
If |
Source code in src/synthorg/persistence/artifact_protocol.py
count
async
¶
Count artifacts matching the filter spec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
ArtifactFilterSpec
|
Carries optional filters. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total number of matching artifacts. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/artifact_protocol.py
delete
async
¶
Delete an artifact by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The artifact identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/artifact_protocol.py
audit_protocol
¶
Audit repository protocol.
AuditFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for AuditRepository.query.
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
agent_id(NotBlankStr | None) -
action_type(NotBlankStr | None) -
verdict(AuditVerdictStr | None) -
risk_level(ApprovalRiskLevel | None) -
since(AwareDatetime | None) -
until(AwareDatetime | None)
Validators:
-
_validate_window
AuditRepository
¶
Bases: AppendOnlyRepository['AuditEntry', AuditFilterSpec], Protocol
Append-only persistence + query interface for AuditEntry.
Composes :class:AppendOnlyRepository. Audit entries are immutable
records of security evaluations. No update operations are provided
to preserve audit integrity.
The single delete-style operation is :meth:purge_before, the
retention sweeper used to enforce the operator-configurable
security.audit_retention_days window. This is a deliberate
exception to the append-only rule; see :meth:purge_before
for the retention-vs-forensic tradeoff.
append
async
¶
Persist an audit entry (append-only).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry
|
AuditEntry
|
The audit entry to persist. |
required |
Raises:
| Type | Description |
|---|---|
DuplicateRecordError
|
If an entry with the same ID exists. |
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/audit_protocol.py
query
async
¶
Return audit entries matching the filter spec (paginated).
Results are ordered by timestamp descending (newest first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
AuditFilterSpec
|
Audit filter specification with optional filters. |
required |
limit
|
int
|
Maximum number of entries to return (must be >= 1). |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Number of entries to skip (for pagination). |
0
|
Returns:
| Type | Description |
|---|---|
tuple[AuditEntry, ...]
|
Matching audit entries as a tuple. |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails, limit < 1, or until is earlier than since in the filter spec. |
Source code in src/synthorg/persistence/audit_protocol.py
purge_before
async
¶
Delete audit entries older than cutoff (CFG-1 audit).
This is the one exception to the append-only rule: it powers
the retention sweeper which enforces the operator-configurable
security.audit_retention_days window. Rows are removed
permanently; the retention-vs-forensic tradeoff is decided at
the retention-window level, not per row.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cutoff
|
AwareDatetime
|
Entries strictly older than this UTC timestamp are deleted. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows removed. |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/audit_protocol.py
checkpoint_protocol
¶
Checkpoint and heartbeat repository protocols.
CheckpointFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for CheckpointRepository.query.
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
execution_id(NotBlankStr | None) -
task_id(NotBlankStr | None)
CheckpointRepository
¶
Bases: AppendOnlyRepository['Checkpoint', CheckpointFilterSpec], Protocol
Append-only persistence interface for checkpoint rows.
Composes :class:AppendOnlyRepository.
get_latestreturns the single newest row byturn_numberunder a filter; the genericquerycannot express theLIMIT 1 ORDER BY turn_number DESCshape efficiently.delete_by_executionis a batch delete keyed onexecution_id; the genericpurge_before(threshold)removes only by timestamp, not by execution scope.
append
async
¶
Persist a checkpoint row (append-only).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
checkpoint
|
Checkpoint
|
The checkpoint to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
query
async
¶
Return checkpoints matching the filter, newest first.
purge_before
async
¶
Delete checkpoints with saved_at < threshold.
threshold must be timezone-aware UTC; naive datetimes are
rejected at the boundary so purge cut-offs do not depend on the
caller's local-time assumption.
Source code in src/synthorg/persistence/checkpoint_protocol.py
get_latest
async
¶
Retrieve the latest checkpoint by turn_number.
At least one filter (execution_id or task_id) is required.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
execution_id
|
NotBlankStr | None
|
Filter by execution identifier. |
None
|
task_id
|
NotBlankStr | None
|
Filter by task identifier. |
None
|
Returns:
| Type | Description |
|---|---|
Checkpoint | None
|
The checkpoint with the highest turn_number, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
ValueError
|
If neither filter is provided. |
Source code in src/synthorg/persistence/checkpoint_protocol.py
delete_by_execution
async
¶
Delete all checkpoints for an execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
execution_id
|
NotBlankStr
|
The execution identifier. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of checkpoints deleted. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/checkpoint_protocol.py
HeartbeatRepository
¶
Bases: Protocol
CRUD interface for heartbeat persistence.
Heartbeats are a "singleton per execution" (one row per
execution_id) but the dominant access pattern is
:meth:get_stale (range query over last_heartbeat_at), which
is not expressible in the generic categories. The save/get/delete
surface looks superficially like :class:IdKeyedRepository, but
composing that protocol would require list_items pagination
that no caller needs while still leaving get_stale outside the
generic surface. A fully bespoke protocol is simpler than splitting
awareness across two surfaces.
save
async
¶
Persist a heartbeat (upsert by execution_id).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
heartbeat
|
Heartbeat
|
The heartbeat to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/checkpoint_protocol.py
get
async
¶
Retrieve a heartbeat by execution ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
execution_id
|
NotBlankStr
|
The execution identifier. |
required |
Returns:
| Type | Description |
|---|---|
Heartbeat | None
|
The heartbeat, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/checkpoint_protocol.py
get_stale
async
¶
Retrieve a bounded page of heartbeats older than the threshold.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
threshold
|
AwareDatetime
|
Heartbeats with |
required |
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
Heartbeat
|
A page of stale heartbeats ordered by |
...
|
then |
tuple[Heartbeat, ...]
|
deterministic paging). Callers needing every stale |
tuple[Heartbeat, ...]
|
heartbeat drain via |
tuple[Heartbeat, ...]
|
func: |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/checkpoint_protocol.py
delete
async
¶
Delete a heartbeat by execution ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
execution_id
|
NotBlankStr
|
The execution identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/checkpoint_protocol.py
connection_protocol
¶
Repository protocols for integration persistence.
Defines CRUD interfaces for connections, encrypted secret blobs, OAuth authorization states, and webhook receipts.
ConnectionFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for ConnectionRepository.query (ADR-0001).
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
connection_type(ConnectionType | None)
ConnectionRepository
¶
Bases: IdKeyedRepository[Connection, NotBlankStr], FilteredQueryRepository[Connection, ConnectionFilterSpec], Protocol
CRUD + query interface for Connection persistence.
Composes :class:IdKeyedRepository + :class:FilteredQueryRepository
(ADR-0001). Entity is keyed by name field.
save
async
¶
get
async
¶
list_items
async
¶
List all connections with pagination.
Sorted by name ascending.
query
async
¶
List connections matching the filter spec.
Sorted by name ascending.
Source code in src/synthorg/persistence/connection_protocol.py
count
async
¶
delete
async
¶
Delete a connection by name.
Returns:
| Type | Description |
|---|---|
bool
|
|
ConnectionSecretRepository
¶
OAuthStateRepository
¶
Bases: IdKeyedRepository[OAuthState, NotBlankStr], Protocol
CRUD for transient OAuth authorization states.
Composes :class:IdKeyedRepository (ADR-0001). Entity is keyed by
state_token field. Bespoke per ADR-0001 D7: :meth:mark_consumed
(compare-and-set for idempotency) and :meth:cleanup_expired (TTL-based
garbage collection).
save
async
¶
get
async
¶
list_items
async
¶
delete
async
¶
mark_consumed
async
¶
Mark a state token as consumed by a successful callback.
Stamps consumed_at and records connection_name so a
redelivered callback (provider retry, browser back-button,
CDN replay) returns the original connection name without
re-exchanging the authorization code. The compare-and-set is
atomic at the row level; second and subsequent calls observe
the existing consumed_at and return False.
Implementations MUST stamp both consumed_at and
connection_name_returned in a single atomic UPDATE.
:class:OAuthState validates the two fields are always set
together (see _validate_consumed_pair); a partial write
would let a redelivered callback observe consumed_at set
with no connection_name_returned to return.
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
already consumed); |
bool
|
already consumed. |
Raises:
| Type | Description |
|---|---|
QueryError
|
On database errors. |
Source code in src/synthorg/persistence/connection_protocol.py
cleanup_expired
async
¶
Delete all expired states.
Also reaps consumed-but-stale rows older than
retention_seconds so the idempotency table does not grow
unbounded.
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted rows. |
Source code in src/synthorg/persistence/connection_protocol.py
WebhookReceiptRepository
¶
Bases: IdKeyedRepository[WebhookReceipt, NotBlankStr], Protocol
CRUD for webhook receipt log entries.
Composes :class:IdKeyedRepository (ADR-0001). Entity is keyed by
receipt_id field. Bespoke per ADR-0001 D7: :meth:update_status
and :meth:update_status_if_current (lifecycle updates), :meth:get_by_connection
(alternate-key query), and :meth:cleanup_old_for_connection (retention
policy).
save
async
¶
get
async
¶
Fetch a single receipt by ID, or None when absent.
Used by the retry endpoint to look up a failed receipt before re-publishing its captured payload to the bus.
Source code in src/synthorg/persistence/connection_protocol.py
list_items
async
¶
List all webhook receipts with pagination.
delete
async
¶
Delete a webhook receipt by ID.
Returns:
| Type | Description |
|---|---|
bool
|
|
update_status
async
¶
Update the receipt's lifecycle fields.
Returns True when the row existed and was updated, False
when no row matched the ID. Callers can use the boolean to
distinguish "not found" from a successful no-op without raising.
Source code in src/synthorg/persistence/connection_protocol.py
update_status_if_current
async
¶
Compare-and-set variant of update_status.
Atomically updates the row only when its current status column
equals expected_status. Returns True on a successful
transition, False when the row is missing OR the row's
current status differs from expected_status (lost the race).
The retry endpoint uses this to close the TOCTOU window where
two concurrent operator-triggered retries could both load the
same receipt, both transition it to retrying, and both
republish the captured payload.
Source code in src/synthorg/persistence/connection_protocol.py
get_by_connection
async
¶
List receipts for a connection, newest first.
limit <= 0 returns (); offset skips that many rows
before slicing the limit window.
Source code in src/synthorg/persistence/connection_protocol.py
cleanup_old_for_connection
async
¶
Delete receipts for connection_name older than retention_days.
retention_days <= 0 is a no-op so callers cannot accidentally
truncate a connection's log via misconfiguration.
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted rows. |
Source code in src/synthorg/persistence/connection_protocol.py
cost_record_protocol
¶
CostRecord repository protocol.
CostRecordFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for CostRecordRepository.query (ADR-0001).
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
agent_id(NotBlankStr | None) -
task_id(NotBlankStr | None)
CostRecordRepository
¶
Bases: AppendOnlyRepository['CostRecord', CostRecordFilterSpec], Protocol
Append-only persistence + query/aggregation for CostRecord.
Composes :class:AppendOnlyRepository (ADR-0001). Bespoke per D7:
aggregatesums total cost with a mixed-currency rejection invariant; the genericquerycannot express aggregation.
append
async
¶
Persist a cost record (append-only).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
CostRecord
|
The cost record to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
query
async
¶
Query cost records with optional filters and pagination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
CostRecordFilterSpec
|
Carries optional agent_id and task_id filters. |
required |
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before applying limit. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[CostRecord, ...]
|
Matching cost records as a tuple, ordered newest-first. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/cost_record_protocol.py
purge_before
async
¶
Delete cost records with timestamp before threshold (retention).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
threshold
|
datetime
|
Records older than this are deleted. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows removed. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/cost_record_protocol.py
aggregate
async
¶
Sum total cost, optionally filtered by agent and/or task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
NotBlankStr | None
|
Filter by agent identifier. |
None
|
task_id
|
NotBlankStr | None
|
Filter by task identifier. |
None
|
Returns:
| Type | Description |
|---|---|
float
|
Total cost in the configured currency. |
Raises:
| Type | Description |
|---|---|
MixedCurrencyAggregationError
|
If the matched cost records
span more than one currency. Aggregation is rejected
rather than silently summing across currencies; the
controller maps this to HTTP 409. Filter by
|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/cost_record_protocol.py
decision_protocol
¶
Decision records repository protocol.
DecisionRole
module-attribute
¶
Valid role filters for DecisionRepository.query.
DecisionFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for DecisionRepository.query (ADR-0001).
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
task_id(NotBlankStr | None) -
agent_id(NotBlankStr | None) -
role(DecisionRole | None)
DecisionRepository
¶
Bases: AppendOnlyRepository['DecisionRecord', DecisionFilterSpec], Protocol
Append-only persistence + query interface for DecisionRecord.
Decision records are immutable audit entries of review gate decisions. No update or delete operations are provided to preserve audit integrity.
Composes :class:AppendOnlyRepository (ADR-0001). Bespoke per D7:
append_with_next_versionatomically computes version via SQL subquery to eliminate the TOCTOU race under concurrent reviewers.get(record_id)is kept becauseAppendOnlyRepositoryhas no per-record retrieval (append-only logs are typically queried only via filtered scans); callers need direct ID-based lookups.list_by_taskandlist_by_agentare kept because they serve different consumers with different sort orders:list_by_taskreturns oldest-first (chronological), whilelist_by_agentreturns newest-first (cursor-pagination stable under concurrent appends).
append_with_next_version
async
¶
append_with_next_version(
*,
record_id,
task_id,
approval_id,
executing_agent_id,
reviewer_agent_id,
decision,
reason,
criteria_snapshot,
recorded_at,
metadata=None,
)
Atomically append a decision record computing version in SQL.
Computes version = COALESCE(MAX(version), 0) + 1 for the
given task_id inside a single INSERT statement (atomic
under aiosqlite's per-statement serialization), eliminating the
TOCTOU race that a list_by_task + len(...) + 1 pattern
would create under concurrent reviewers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record_id
|
NotBlankStr
|
Unique record identifier (UUID recommended). |
required |
task_id
|
NotBlankStr
|
Task that was reviewed. |
required |
approval_id
|
NotBlankStr | None
|
Associated |
required |
executing_agent_id
|
NotBlankStr
|
Agent that performed the work. |
required |
reviewer_agent_id
|
NotBlankStr
|
Agent or human that reviewed. |
required |
decision
|
DecisionOutcome
|
Outcome of the review. |
required |
reason
|
str | None
|
Optional rationale. |
required |
criteria_snapshot
|
tuple[NotBlankStr, ...]
|
Acceptance criteria at decision time. |
required |
recorded_at
|
AwareDatetime
|
Decision timestamp (must be timezone-aware).
Normalized to UTC before storage so records read back
via |
required |
metadata
|
dict[str, object] | None
|
Forward-compatible metadata. Defaults to |
None
|
Returns:
| Type | Description |
|---|---|
DecisionRecord
|
The persisted |
DecisionRecord
|
|
Raises:
| Type | Description |
|---|---|
DuplicateRecordError
|
If a record with |
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/decision_protocol.py
append
async
¶
Append a decision record via precomputed version.
Normally callers use append_with_next_version which
atomically computes the version in SQL. This method is provided
for completeness of the :class:AppendOnlyRepository interface
when the version is already known (rare).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
DecisionRecord
|
The decision record to persist. |
required |
Raises:
| Type | Description |
|---|---|
DuplicateRecordError
|
If a record with the same ID exists. |
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/decision_protocol.py
query
async
¶
Query decision records with optional filters and pagination.
Results are ordered by timestamp and ID. When both agent_id
and role are specified, returns records where the agent
acted in that role (query(DecisionFilterSpec(agent_id="a1",
role="reviewer")) returns decisions reviewed by agent "a1").
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
DecisionFilterSpec
|
Carries optional task_id, agent_id, and role filters. All filters are optional and combined with AND. |
required |
limit
|
int
|
Maximum rows to return (>= 1). |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip (>= 0). |
0
|
Returns:
| Type | Description |
|---|---|
DecisionRecord
|
Matching decision records as a tuple. When |
...
|
specified, results are oldest-first (ascending recorded_at). |
tuple[DecisionRecord, ...]
|
When |
tuple[DecisionRecord, ...]
|
|
tuple[DecisionRecord, ...]
|
recorded_at). Mixed filters default to task-oriented ordering |
tuple[DecisionRecord, ...]
|
(oldest-first). |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails or pagination args are out of range. |
Source code in src/synthorg/persistence/decision_protocol.py
get
async
¶
Retrieve a decision record by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record_id
|
NotBlankStr
|
The record identifier. |
required |
Returns:
| Type | Description |
|---|---|
DecisionRecord | None
|
The record, or |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/decision_protocol.py
list_by_task
async
¶
List decision records for a task (paginated, oldest first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
NotBlankStr
|
The task identifier. |
required |
limit
|
int
|
Maximum rows to return (>= 1). |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip (>= 0). |
0
|
Returns:
| Type | Description |
|---|---|
tuple[DecisionRecord, ...]
|
Matching records as a tuple (oldest first). |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails or pagination args are out of range. |
Source code in src/synthorg/persistence/decision_protocol.py
list_by_agent
async
¶
List decision records by agent role (paginated, newest first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
NotBlankStr
|
The agent identifier. |
required |
role
|
DecisionRole
|
Either |
required |
limit
|
int
|
Maximum rows to return (>= 1). |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip (>= 0). |
0
|
Returns:
| Type | Description |
|---|---|
DecisionRecord
|
Matching records as a tuple, ordered by |
...
|
|
tuple[DecisionRecord, ...]
|
stable under concurrent inserts. |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails, pagination args are
out of range, or |
Source code in src/synthorg/persistence/decision_protocol.py
purge_before
async
¶
Delete decision records older than threshold (retention).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
threshold
|
datetime
|
Records older than this are deleted. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows removed. |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/decision_protocol.py
message_protocol
¶
Message repository protocol.
MessageFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for MessageRepository.query.
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
channel(NotBlankStr | None)
MessageRepository
¶
Bases: AppendOnlyRepository[Message, MessageFilterSpec], Protocol
Write + history query interface for Message persistence.
Composes :class:AppendOnlyRepository.
get_historyreturns newest-first within one channel with the project's canonical limit default; it is the dashboard hot path and the existing controller calls already pass a channel name positionally.deletesupports per-message moderation / redaction; the genericpurge_before(threshold)is the retention sweeper.
append
async
¶
Persist a message (append-only).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The message to persist. |
required |
Raises:
| Type | Description |
|---|---|
DuplicateRecordError
|
If a message with the same ID exists. |
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/message_protocol.py
query
async
¶
Return messages matching the filter spec, newest first.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
MessageFilterSpec
|
Carries optional |
required |
limit
|
int
|
Maximum number of messages to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[Message, ...]
|
Messages ordered by timestamp descending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/message_protocol.py
purge_before
async
¶
Delete messages with timestamp < threshold.
Returns:
| Type | Description |
|---|---|
int
|
Number of rows removed. |
get_history
async
¶
Retrieve message history for a channel (dashboard hot path).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
NotBlankStr
|
Channel name to query. |
required |
limit
|
int
|
Maximum number of messages to return (newest first). |
DEFAULT_LIST_LIMIT
|
Returns:
| Type | Description |
|---|---|
tuple[Message, ...]
|
Messages ordered by timestamp descending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/message_protocol.py
get_by_id
async
¶
Fetch a single message by (channel, id).
messages.id is the primary key (globally unique), so the
lookup is an indexed point read; channel is an additional
scoping predicate so a caller cannot read a message off a
channel it did not address. Replaces the prior
get_history full-channel scan in
:meth:MessageService.get_message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
NotBlankStr
|
Channel the message must belong to. |
required |
message_id
|
NotBlankStr
|
The unique message identifier. |
required |
Returns:
| Type | Description |
|---|---|
Message | None
|
The matching :class: |
Message | None
|
with that id exists on that channel. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/message_protocol.py
delete
async
¶
Delete a message by id (moderation / redaction).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id
|
NotBlankStr
|
The unique message identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
exist. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/message_protocol.py
parked_context_protocol
¶
ParkedContext repository protocol.
ParkedContextRepository
¶
Bases: IdKeyedRepository[ParkedContext, NotBlankStr], Protocol
CRUD interface for parked agent execution contexts.
Composes :class:IdKeyedRepository (ADR-0001). Bespoke per D7:
:meth:get_by_approval is a unique-key lookup (each approval has
at most one parked context) and :meth:get_by_agent returns rows
ordered parked_at DESC, both of which are simpler/cheaper than
routing through a FilteredQueryRepository.query call.
save
async
¶
Persist a parked context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
ParkedContext
|
The parked context to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
get
async
¶
Retrieve a parked context by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The parked context identifier. |
required |
Returns:
| Type | Description |
|---|---|
ParkedContext | None
|
The parked context, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/parked_context_protocol.py
list_items
async
¶
List parked contexts in id order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[ParkedContext, ...]
|
Parked contexts in ascending id order. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/parked_context_protocol.py
get_by_approval
async
¶
Retrieve a parked context by approval ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
approval_id
|
NotBlankStr
|
The approval item identifier. |
required |
Returns:
| Type | Description |
|---|---|
ParkedContext | None
|
The parked context, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/parked_context_protocol.py
get_by_agent
async
¶
Retrieve a bounded page of parked contexts for an agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
NotBlankStr
|
The agent identifier. |
required |
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
ParkedContext
|
A page of parked contexts for the agent, ordered by |
...
|
|
tuple[ParkedContext, ...]
|
key for deterministic paging). Callers that need every |
tuple[ParkedContext, ...]
|
parked context drain via |
tuple[ParkedContext, ...]
|
func: |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/parked_context_protocol.py
delete
async
¶
Delete a parked context by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The parked context identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/parked_context_protocol.py
project_protocol
¶
Project repository protocol.
ProjectFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for ProjectRepository.query.
All fields are optional; an empty spec matches all projects.
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
status(ProjectStatus | None) -
lead(NotBlankStr | None)
ProjectRepository
¶
Bases: IdKeyedRepository[Project, NotBlankStr], FilteredQueryRepository[Project, ProjectFilterSpec], Protocol
CRUD + query interface for Project persistence.
The mutation surface is split into atomic create/update
methods so the service layer can attach the correct
API_PROJECT_CREATED / API_PROJECT_UPDATED audit event
without a TOCTOU get + save race. save remains as an
upsert convenience for callers that genuinely need
"persist regardless of prior state" semantics (migration / import
paths); production CRUD must go through the explicit pair.
Composes :class:IdKeyedRepository + :class:FilteredQueryRepository.
create and update are atomic lifecycle transitions that the
generic save (upsert) cannot distinguish; they preserve separate
create-vs-update audit semantics that the service layer depends on.
create
async
¶
Insert a new project, failing if the id already exists.
Atomic insert-only operation paired with :meth:update to
preserve distinct create vs. update audit events. See :meth:save
for the upsert convenience when lifecycle is unknown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
project
|
Project
|
The project to insert. |
required |
Raises:
| Type | Description |
|---|---|
DuplicateRecordError
|
A project with the same id is already persisted. |
QueryError
|
If the database operation fails. |
Source code in src/synthorg/persistence/project_protocol.py
update
async
¶
Update an existing project, failing if no row matches.
Atomic update-only operation paired with :meth:create to
preserve distinct create vs. update audit events. See :meth:save
for the upsert convenience when lifecycle is unknown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
project
|
Project
|
The project to update. |
required |
Raises:
| Type | Description |
|---|---|
RecordNotFoundError
|
No project with this id exists. |
QueryError
|
If the database operation fails. |
Source code in src/synthorg/persistence/project_protocol.py
save
async
¶
Persist a project via upsert (insert or update).
Used for migration / import paths that legitimately do not
know whether the row exists. Production CRUD endpoints must
use :meth:create / :meth:update so the API audit event
reflects the actual lifecycle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
Project
|
The project to persist. |
required |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the database operation fails. |
Source code in src/synthorg/persistence/project_protocol.py
get
async
¶
Retrieve a project by its ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The project identifier. |
required |
Returns:
| Type | Description |
|---|---|
Project | None
|
The project, or |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/project_protocol.py
list_items
async
¶
List all projects in ID order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum projects to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[Project, ...]
|
Projects in ascending ID order, capped at limit rows. |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/project_protocol.py
query
async
¶
List projects matching the filter spec.
Results are ordered by project ID ascending to ensure deterministic pagination across backends.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
ProjectFilterSpec
|
Carries optional |
required |
limit
|
int
|
Maximum projects to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[Project, ...]
|
Matching projects ordered by ID, capped at limit rows. |
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/project_protocol.py
count
async
¶
delete
async
¶
Delete a project by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The project identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
QueryError
|
If the operation fails. |
Source code in src/synthorg/persistence/project_protocol.py
settings_protocol
¶
Settings repository protocol.
SettingRowKey
module-attribute
¶
SettingRowKey = tuple[NotBlankStr, NotBlankStr]
Composite primary key: (namespace, key).
SettingRow
pydantic-model
¶
Bases: BaseModel
Persistent settings record.
Attributes:
| Name | Type | Description |
|---|---|---|
namespace |
NotBlankStr
|
Setting namespace (part of composite primary key). |
key |
NotBlankStr
|
Setting key within the namespace (part of composite primary key). |
value |
str
|
Setting value as a string. |
updated_at |
str
|
ISO 8601 timestamp of the last update. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
namespace(NotBlankStr) -
key(NotBlankStr) -
value(str) -
updated_at(str)
SettingsRepository
¶
Bases: IdKeyedRepository[SettingRow, SettingRowKey], Protocol
CRUD interface for namespaced settings persistence.
Composes :class:IdKeyedRepository (ADR-0001) with composite key
(namespace, key) per D8. Bespoke per D7: :meth:get_namespace,
:meth:set_many, :meth:delete_namespace, and
:meth:delete_namespace_returning_keys encode atomic multi-row and
namespace-scoped operations that the generic surface cannot express.
save
async
¶
Persist a setting (upsert by composite key).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
SettingRow
|
The setting to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
get
async
¶
Retrieve a setting by composite key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
SettingRowKey
|
|
required |
Returns:
| Type | Description |
|---|---|
SettingRow | None
|
The setting, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/settings_protocol.py
list_items
async
¶
List settings across all namespaces (paginated).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip from the head of the ordering. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[SettingRow, ...]
|
Paginated settings ordered by |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/settings_protocol.py
delete
async
¶
Delete a setting by composite key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
SettingRowKey
|
|
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/settings_protocol.py
get_namespace
async
¶
Retrieve all settings in a namespace (bespoke per ADR-0001 D7).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
NotBlankStr
|
Setting namespace. |
required |
Returns:
| Type | Description |
|---|---|
tuple[SettingRow, ...]
|
All settings in the namespace, sorted by key ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/settings_protocol.py
set_if_unchanged
async
¶
Upsert a setting with optional compare-and-swap (bespoke per D7).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
SettingRow
|
The setting to upsert. |
required |
expected_updated_at
|
str | None
|
When provided, enforces atomic CAS -- the
row is only updated if the current |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
was not met. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/settings_protocol.py
set_many
async
¶
Atomically upsert multiple settings (bespoke per ADR-0001 D7).
Each element of items is a SettingRow instance.
expected_updated_at_map optionally supplies a
compare-and-swap expected version per (namespace, key)
composite key; keys absent from the map are upserted
unconditionally. Pass an empty string "" in the map for
first-write CAS semantics (the row must not exist yet).
The whole operation is atomic: if any CAS check fails, the transaction rolls back and no rows are modified.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Sequence[SettingRow]
|
Settings to upsert. |
required |
expected_updated_at_map
|
Mapping[SettingRowKey, str] | None
|
Optional CAS version map keyed by
composite |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
check failed; callers should re-read versions and retry |
bool
|
if they need to recover. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
On DB-level failures (not CAS misses). |
Source code in src/synthorg/persistence/settings_protocol.py
delete_namespace
async
¶
Delete all settings in a namespace (bespoke per ADR-0001 D7).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
NotBlankStr
|
Setting namespace. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of settings deleted. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/settings_protocol.py
delete_namespace_returning_keys
async
¶
Atomically delete namespace, returning deleted keys (bespoke per D7).
Equivalent to :meth:delete_namespace but returns the keys
whose rows were actually removed in a single transaction;
callers (notably :class:SettingsService.delete_namespace)
rely on this to scope per-key change-publish notifications to
the subset that genuinely changed, without a TOCTOU
get_namespace + delete_namespace race.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
NotBlankStr
|
Setting namespace. |
required |
Returns:
| Type | Description |
|---|---|
NotBlankStr
|
Tuple of keys (within namespace) whose override row |
...
|
was removed by this call, in implementation-defined order. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/settings_protocol.py
task_protocol
¶
Task repository protocol.
TaskFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for TaskRepository.query (ADR-0001).
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
status(TaskStatus | None) -
assigned_to(NotBlankStr | None) -
project(NotBlankStr | None)
TaskRepository
¶
Bases: IdKeyedRepository[Task, NotBlankStr], FilteredQueryRepository[Task, TaskFilterSpec], Protocol
CRUD + query interface for Task persistence.
Composes :class:IdKeyedRepository + :class:FilteredQueryRepository
(ADR-0001).
save
async
¶
Persist a task (insert or update by id).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
Task
|
The task to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
get
async
¶
Retrieve a task by its ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The task identifier. |
required |
Returns:
| Type | Description |
|---|---|
Task | None
|
The task, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/task_protocol.py
list_items
async
¶
List tasks with pagination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[Task, ...]
|
Tasks ordered by id ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/task_protocol.py
query
async
¶
List tasks matching the filter spec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
TaskFilterSpec
|
Carries optional filters for status, assigned_to, project. |
required |
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[Task, ...]
|
Matching tasks ordered by id ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/task_protocol.py
count
async
¶
Count tasks matching the filter spec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
TaskFilterSpec
|
Carries optional filters. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total number of matching tasks. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/task_protocol.py
delete
async
¶
Delete a task by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The task identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/task_protocol.py
user_protocol
¶
User and ApiKey repository protocols.
Co-located because every API key belongs to a user (FK) and the two repositories share the auth admin surface.
UserFilterSpec
pydantic-model
¶
ApiKeyFilterSpec
pydantic-model
¶
Bases: BaseModel
Filter spec for ApiKeyRepository.query.
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
user_id(NotBlankStr | None) -
revoked_only(bool)
UserRepository
¶
Bases: IdKeyedRepository[User, NotBlankStr], FilteredQueryRepository[User, UserFilterSpec], Protocol
CRUD + query interface for User persistence.
Composes :class:IdKeyedRepository + :class:FilteredQueryRepository.
Bespoke methods:
- get_by_username: alternate-key lookup on indexed username column
- count_by_role: domain invariant (callers need role-count aggregate)
- list_after_id: keyset cursor pagination for the dashboard
save
async
¶
Persist a user (insert or update by id).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
User
|
The user to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
get
async
¶
Retrieve a user by its ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The user identifier. |
required |
Returns:
| Type | Description |
|---|---|
User | None
|
The user, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
get_by_username
async
¶
Retrieve a user by username (D7: alternate-key performance).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
username
|
NotBlankStr
|
The login username. |
required |
Returns:
| Type | Description |
|---|---|
User | None
|
The user, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
list_items
async
¶
List human users (excludes the system user) with pagination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum users to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[User, ...]
|
Human users ordered by id ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
list_after_id
async
¶
Keyset page of human users with id > after_id.
Pushes the cursor predicate into the SQL layer so dashboard pagination stays O(page) regardless of table size, instead of loading a capped prefix and filtering in memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
after_id
|
NotBlankStr | None
|
Exclusive lower bound on |
None
|
limit
|
int
|
Maximum users to return (ascending |
DEFAULT_PAGE_SIZE
|
Returns:
| Type | Description |
|---|---|
User
|
Up to limit human users with |
...
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
query
async
¶
List users matching the filter spec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
UserFilterSpec
|
Carries optional filter for role. |
required |
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[User, ...]
|
Matching users ordered by id ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
count
async
¶
Count users matching the filter spec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
UserFilterSpec
|
Carries optional filter for role. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total number of matching users. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
count_by_role
async
¶
Count users with a specific role (D7: domain invariant).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
HumanRole
|
The role to filter by. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of users with the given role. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
delete
async
¶
Delete a user by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The user identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
ApiKeyRepository
¶
Bases: IdKeyedRepository[ApiKey, NotBlankStr], FilteredQueryRepository[ApiKey, ApiKeyFilterSpec], Protocol
CRUD + query interface for API key persistence.
Composes :class:IdKeyedRepository + :class:FilteredQueryRepository
(ADR-0001). Bespoke method kept per D7:
- get_by_hash: alternate-key lookup on indexed key_hash column
save
async
¶
Persist an API key (insert or update by id).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity
|
ApiKey
|
The API key to persist. |
required |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
get
async
¶
Retrieve an API key by its ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The key identifier. |
required |
Returns:
| Type | Description |
|---|---|
ApiKey | None
|
The API key, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
get_by_hash
async
¶
Retrieve an API key by its hash (D7: alternate-key performance).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_hash
|
NotBlankStr
|
HMAC-SHA256 hex digest. |
required |
Returns:
| Type | Description |
|---|---|
ApiKey | None
|
The API key, or |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
list_items
async
¶
List API keys with pagination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum keys to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[ApiKey, ...]
|
API keys ordered by id ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
query
async
¶
List API keys matching the filter spec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
ApiKeyFilterSpec
|
Carries optional filters for user_id and revoked_only. |
required |
limit
|
int
|
Maximum rows to return. |
DEFAULT_PAGE_SIZE
|
offset
|
int
|
Rows to skip before the window. |
0
|
Returns:
| Type | Description |
|---|---|
tuple[ApiKey, ...]
|
Matching API keys ordered by id ascending. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
count
async
¶
Count API keys matching the filter spec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filter_spec
|
ApiKeyFilterSpec
|
Carries optional filters. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total number of matching API keys. |
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
delete
async
¶
Delete an API key by ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
NotBlankStr
|
The key identifier. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Raises:
| Type | Description |
|---|---|
PersistenceError
|
If the operation fails. |
Source code in src/synthorg/persistence/user_protocol.py
Factory¶
factory
¶
Factory for creating persistence backends from configuration.
Each company gets its own PersistenceBackend instance, which maps
to its own database. This enables multi-tenancy: one database per
company, selectable via the PersistenceConfig embedded in each
company's RootConfig.
default_registry
¶
create_backend
¶
Create a persistence backend from configuration.
Factory function that maps config.backend to the correct
concrete backend class via :class:PersistenceBackendRegistry.
Each call returns a new, disconnected backend instance -- the
caller is responsible for calling connect() and migrate().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
PersistenceConfig
|
Persistence configuration (includes backend selection and backend-specific settings). |
required |
Returns:
| Type | Description |
|---|---|
PersistenceBackend
|
A new, disconnected backend instance. |
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If the backend name is not registered, the optional dependency is missing, or backend-specific configuration is absent. |
Example::
config = PersistenceConfig(
backend="sqlite",
sqlite=SQLiteConfig(path="data/company-a.db"),
)
backend = create_backend(config)
await backend.connect()
await backend.migrate()
Source code in src/synthorg/persistence/factory.py
Errors¶
persistence_errors
¶
Persistence error hierarchy.
All persistence-related errors inherit from :class:PersistenceError so
callers can catch the entire family with a single except clause.
PersistenceError itself is rooted in :class:DomainError so the API
layer's centralised RFC 9457 dispatch picks up every subtype; the
existing handle_record_not_found / handle_persistence_integrity_error
/ handle_duplicate_record / handle_persistence_error handlers
remain registered so persistence-layer 4xx responses keep their fixed
public messages ("Resource not found", etc.) instead of leaking
record identifiers from str(exc).
Each concrete exception carries an is_retryable class attribute
mirroring the provider-layer convention in
:mod:synthorg.providers.errors. Callers that implement bounded
retry/backoff (e.g. a repository middleware) can branch on this flag
without string-matching the driver exception. Default: False.
Transient I/O failures override to True.
PersistenceError
¶
PersistenceConnectionError
¶
Bases: PersistenceError
Raised when a backend connection cannot be established or is lost.
Network drops, pool exhaustion, and connect timeouts are transient by default -- callers can retry with backoff.
Source code in src/synthorg/core/domain_errors.py
MigrationError
¶
Bases: PersistenceError
Raised when a database migration fails.
Non-retryable: a failed migration indicates schema drift or a logic bug, not a transient condition.
Source code in src/synthorg/core/domain_errors.py
RecordNotFoundError
¶
Bases: PersistenceError
Raised when a requested record does not exist.
Used by ArtifactStorageBackend.retrieve() when no content
exists for the given artifact ID. Repository get() methods
return None on miss instead of raising.
Source code in src/synthorg/core/domain_errors.py
DuplicateRecordError
¶
Bases: PersistenceError
Raised when inserting a record that already exists.
Source code in src/synthorg/core/domain_errors.py
QueryError
¶
Bases: PersistenceError
Raised when a query fails due to invalid parameters or backend issues.
Transient by default: connection drops and deadlocks during a
query surface here and are safe to retry. Deterministic failures
(bad SQL, invalid params) use :class:ConstraintViolationError
or :class:PersistenceVersionConflictError which override to
non-retryable.
Source code in src/synthorg/core/domain_errors.py
ConstraintViolationError
¶
Bases: QueryError
Raised when a DB constraint (unique, check, trigger) is violated.
Carries a constraint attribute that identifies the violated
constraint by its DB-side name (for Postgres) or by a stable
token parsed from the error message (for SQLite). Callers can
check this attribute to map the violation to a domain error
without parsing error strings.
Non-retryable: constraint violations are deterministic for a given input and will not succeed on a bare retry.
Blank constraint (empty / whitespace-only) is normalised to the
sentinel "<unknown>" rather than raising. Raising
:class:ValueError from __init__ would bypass downstream
except PersistenceError handlers; the sentinel keeps the
construction inside the persistence-error family so callers that
branch on constraint see a known token they can detect.
Source code in src/synthorg/core/persistence_errors.py
PersistenceVersionConflictError
¶
Bases: QueryError
Raised when an optimistic concurrency version check fails.
Non-retryable at this layer: the caller must re-read, re-apply its intended change, and resubmit with the fresh version. A blind retry would just lose the racing write.
The API layer translates this to
:class:synthorg.core.domain_errors.VersionConflictError (the
HTTP-aware sibling) so that controllers raise / catch consistently
with other 409 paths.
Source code in src/synthorg/core/domain_errors.py
MalformedRowError
¶
Bases: QueryError
Raised when a persisted row cannot be deserialized into its model.
JSON decode failures, validation errors, and missing-key errors on rows already committed to the database are deterministic data-integrity problems, not transient query failures. Retrying the same read returns the same corrupt row -- it just burns the budget and obscures the underlying integrity issue.
Non-retryable: callers must investigate the source row, not retry.
Source code in src/synthorg/core/domain_errors.py
ArtifactTooLargeError
¶
Bases: PersistenceError
Raised when a single artifact exceeds the maximum allowed size.
Source code in src/synthorg/core/domain_errors.py
ArtifactStorageFullError
¶
Bases: PersistenceError
Raised when total artifact storage exceeds capacity.
Source code in src/synthorg/core/domain_errors.py
SQLite Backend¶
backend
¶
SQLite persistence backend implementation.
SQLitePersistenceBackend
¶
Bases: _BackendRepositoryAccessors
SQLite implementation of the PersistenceBackend protocol.
Uses a single aiosqlite.Connection with WAL mode enabled by
default for file-based databases (in-memory databases do not
support WAL). Configurable via SQLiteConfig.wal_mode.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
SQLiteConfig
|
SQLite-specific configuration. |
required |
Source code in src/synthorg/persistence/sqlite/backend.py
config
property
¶
Public read-only view of the backend's config.
Exposed so callers that need backend-specific details (the
backup-handler factory walks the path; tests assert against
the resolved sqlite path) do not have to reach for the
private _config attribute.
connect
async
¶
Open the SQLite database and configure WAL mode.
Source code in src/synthorg/persistence/sqlite/backend.py
get_db
¶
Return the shared database connection.
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not yet connected. |
Source code in src/synthorg/persistence/sqlite/backend.py
write_context
async
¶
Acquire the shared write lock for the lifetime of the block.
Multi-statement transactions on the single aiosqlite.Connection
must serialize so a sibling repo's INSERT cannot interleave
between this repo's INSERT and COMMIT. See
PersistenceBackend.write_context for the cross-backend
contract.
Source code in src/synthorg/persistence/sqlite/backend.py
disconnect
async
¶
Close the database connection.
Source code in src/synthorg/persistence/sqlite/backend.py
health_check
async
¶
Check database connectivity.
Source code in src/synthorg/persistence/sqlite/backend.py
migrate
async
¶
Apply pending schema migrations via yoyo-migrations.
On failure the backend's repositories are reset so callers cannot reuse a half-initialised state machine (mirrors the postgres backend's pool-close-on-failure behaviour).
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not connected. |
MigrationError
|
If migration application fails. |
Source code in src/synthorg/persistence/sqlite/backend.py
build_lockouts
¶
Return the cached lockout repository (built once per connection).
The lockout repo maintains a process-local in-memory cache
(_locked) on the auth hot path. Returning a fresh instance
on every call would reset that cache and silently "unlock"
every user. The cache is cleared on disconnect via
_clear_state. The backend's write_context is passed
through so lockout transactions serialize with other
repositories writing to the same aiosqlite connection.
Source code in src/synthorg/persistence/sqlite/backend.py
build_escalations
¶
Construct an escalation queue repository.
notify_channel is ignored by SQLite (no cross-instance
NOTIFY/LISTEN). The backend's write_context is passed
through so escalation transactions serialize with other
repositories writing to the same aiosqlite connection.
Source code in src/synthorg/persistence/sqlite/backend.py
build_ontology_versioning
¶
Construct the ontology versioning service bound to this backend.
Source code in src/synthorg/persistence/sqlite/backend.py
get_setting
async
¶
Retrieve a setting value by key from the _system namespace.
Delegates to self.settings (the SettingsRepository).
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not connected. |
Source code in src/synthorg/persistence/sqlite/backend.py
set_setting
async
¶
Store a setting value (upsert) in the _system namespace.
Delegates to self.settings (the SettingsRepository).
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not connected. |
Source code in src/synthorg/persistence/sqlite/backend.py
repositories
¶
SQLite repository implementations for Task, CostRecord, and Message.
HR-related repositories (LifecycleEvent, TaskMetric, CollaborationMetric)
are in hr_repositories.py within this package.
SQLiteTaskRepository
¶
SQLite implementation of the TaskRepository protocol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
Connection
|
An open aiosqlite connection. |
required |
write_context
|
WriteContext
|
Async context manager that serializes writes on
the shared connection. Supplied by
|
required |
Source code in src/synthorg/persistence/sqlite/repositories.py
save
async
¶
Persist a task (upsert semantics).
Source code in src/synthorg/persistence/sqlite/repositories.py
get
async
¶
Retrieve a task by its ID.
Source code in src/synthorg/persistence/sqlite/repositories.py
list_items
async
¶
List tasks with pagination (no filters).
Ordering is deterministic on the primary key id so paginated
callers see stable windows.
Source code in src/synthorg/persistence/sqlite/repositories.py
query
async
¶
Query tasks matching the filter spec.
Ordering is deterministic on the primary key id so paginated
callers see stable windows.
Source code in src/synthorg/persistence/sqlite/repositories.py
count
async
¶
Count tasks matching the given filter spec.
Source code in src/synthorg/persistence/sqlite/repositories.py
delete
async
¶
Delete a task by ID.
Source code in src/synthorg/persistence/sqlite/repositories.py
SQLiteCostRecordRepository
¶
SQLite implementation of the CostRecordRepository protocol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
Connection
|
An open aiosqlite connection. |
required |
write_context
|
WriteContext
|
Async context manager that serializes writes on
the shared connection. Supplied by
|
required |
Source code in src/synthorg/persistence/sqlite/repositories.py
append
async
¶
Persist a cost record (append-only per AppendOnlyRepository).
Source code in src/synthorg/persistence/sqlite/repositories.py
query
async
¶
Query cost records matching filter spec with pagination.
Source code in src/synthorg/persistence/sqlite/repositories.py
aggregate
async
¶
Sum total cost, optionally filtered by agent and/or task.
Raises :class:MixedCurrencyAggregationError when the matched rows
span multiple currencies. The distinct-currency probe and the
SUM run in a single aggregating query (COUNT(DISTINCT)
+ GROUP_CONCAT(DISTINCT) + SUM) so the two observations
share one snapshot and a concurrent insert cannot change the
result between them.
Source code in src/synthorg/persistence/sqlite/repositories.py
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 | |
purge_before
async
¶
Delete cost records with timestamp before threshold (retention).
threshold must be timezone-aware: a naive value compared
against UTC-formatted stored timestamps would silently delete
the wrong window.
Source code in src/synthorg/persistence/sqlite/repositories.py
SQLiteMessageRepository
¶
SQLite implementation of the MessageRepository protocol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
Connection
|
An open aiosqlite connection. |
required |
write_context
|
WriteContext
|
Async context manager that serializes writes on
the shared connection. Supplied by
|
required |
Source code in src/synthorg/persistence/sqlite/repositories.py
append
async
¶
Persist a message (append-only per AppendOnlyRepository).
Source code in src/synthorg/persistence/sqlite/repositories.py
get_history
async
¶
Retrieve message history for a channel, newest first.
Source code in src/synthorg/persistence/sqlite/repositories.py
get_by_id
async
¶
Fetch one message by (channel, id) via the PK point read.
The id predicate alone resolves the row (it is the primary
key); the extra channel predicate is a deliberate scoping
guard so a caller holding only a message id cannot read a
message outside the channel it asked for.
Source code in src/synthorg/persistence/sqlite/repositories.py
query
async
¶
Return messages matching the filter spec, newest first.
Source code in src/synthorg/persistence/sqlite/repositories.py
purge_before
async
¶
Delete messages with timestamp < threshold (retention).
threshold must be timezone-aware: a naive value compared
against UTC-formatted stored timestamps would silently delete
the wrong window.
Source code in src/synthorg/persistence/sqlite/repositories.py
delete
async
¶
Delete a single message by id (bespoke per ADR D7, moderation).
Returns True when a row was removed, False when the id
did not exist. Concurrent writes are serialized through the
shared backend write context. The audit-grade mutation log is
emitted by :class:MessageService.delete_message; the
repository never logs mutations itself (persistence-boundary
rule, see docs/reference/persistence-boundary.md).
Source code in src/synthorg/persistence/sqlite/repositories.py
Postgres Backend¶
backend
¶
Postgres persistence backend implementation.
Implements the PersistenceBackend protocol on top of psycopg 3 and
psycopg_pool.AsyncConnectionPool. Repositories are instantiated
per-backend on connect() and receive the shared pool; each pool
checkout is an independent transaction, so this backend's
write_context is a no-op rather than the in-process lock SQLite
acquires to serialize writes across its single connection.
The schema uses native Postgres types (JSONB, TIMESTAMPTZ, BIGINT,
BOOLEAN) -- see src/synthorg/persistence/postgres/schema.sql. At
the Python level, the protocol surface is identical to the SQLite
backend: callers get Pydantic models back either way.
PostgresPersistenceBackend
¶
Bases: PostgresConnectionMixin, PostgresMigrationMixin
Postgres implementation of the PersistenceBackend protocol.
Uses a psycopg_pool.AsyncConnectionPool for connection
management. Each repository method acquires a connection from the
pool for the duration of its critical section, so writes are
isolated per-connection transaction. There is no shared write
lock -- unlike SQLite, Postgres per-connection transactions do not
share a single in-process connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
PostgresConfig
|
Postgres-specific configuration. |
required |
Source code in src/synthorg/persistence/postgres/backend.py
config
property
¶
Public read-only view of the backend's Postgres config.
Exposed so callers needing the connection details (the
backup-handler factory) do not have to reach for the
private _config attribute.
collaboration_metrics
property
¶
Repository for CollaborationMetricRecord persistence.
provider_audit_events
property
¶
Repository for the provider mutation audit log.
preset_overrides
property
¶
Repository for operator-authored provider preset overrides.
workflow_definitions
property
¶
Repository for workflow definition persistence.
workflow_versions
property
¶
Repository for workflow definition version persistence.
identity_versions
property
¶
Repository for AgentIdentity version snapshot persistence.
evaluation_config_versions
property
¶
Repository for EvaluationConfig version snapshot persistence.
budget_config_versions
property
¶
Repository for BudgetConfig version snapshot persistence.
circuit_breaker_state
property
¶
Repository for circuit breaker state persistence.
ceremony_scheduler_state
property
¶
Repository for ceremony scheduler per-sprint state snapshots.
meeting_cooldown
property
¶
Repository for meeting cooldown last-triggered timestamps.
tracked_containers
property
¶
Repository for Docker sandbox tracked-container records.
project_cost_aggregates
property
¶
Repository for durable project cost aggregates.
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not connected. |
fine_tune_checkpoints
property
¶
Repository for fine-tune checkpoint persistence.
connection_secrets
property
¶
Repository for encrypted connection secret persistence.
principle_overrides
property
¶
Repository for rollback-restored principle overrides.
get_db
¶
Return the shared connection pool.
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not yet connected. |
Source code in src/synthorg/persistence/postgres/backend.py
write_context
async
¶
No-op for Postgres.
Each repository checks out its own connection from the async
pool; transactions on different connections cannot interleave
at the statement level. Implementing the protocol method as a
no-op keeps the cross-backend interface honest and lets
callers write async with backend.write_context() without
backend-specific branching.
Source code in src/synthorg/persistence/postgres/backend.py
build_lockouts
¶
Construct a lockout repository using this backend's pool.
build_escalations
¶
Construct an escalation queue repository on the shared pool.
notify_channel enables cross-instance pg_notify publishing
when the escalation subsystem has enabled it.
Source code in src/synthorg/persistence/postgres/backend.py
build_ontology_versioning
¶
Construct the ontology versioning service bound to this backend.
Source code in src/synthorg/persistence/postgres/backend.py
get_setting
async
¶
Retrieve a setting value by key from the _system namespace.
Delegates to self.settings (the SettingsRepository).
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not connected or settings repository is not yet ported. |
Source code in src/synthorg/persistence/postgres/backend.py
set_setting
async
¶
Store a setting value (upsert) in the _system namespace.
Delegates to self.settings (the SettingsRepository).
Raises:
| Type | Description |
|---|---|
PersistenceConnectionError
|
If not connected or settings repository is not yet ported. |
Source code in src/synthorg/persistence/postgres/backend.py
repositories
¶
Postgres repository implementations for Task, CostRecord, and Message.
HR-related repositories (LifecycleEvent, TaskMetric, CollaborationMetric)
are in hr_repositories.py within this package.
PostgresTaskRepository
¶
Postgres implementation of the TaskRepository protocol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool
|
AsyncConnectionPool
|
An open psycopg_pool.AsyncConnectionPool. |
required |
Source code in src/synthorg/persistence/postgres/repositories.py
save
async
¶
Persist a task (upsert semantics).
Source code in src/synthorg/persistence/postgres/repositories.py
get
async
¶
Retrieve a task by its ID.
Source code in src/synthorg/persistence/postgres/repositories.py
list_items
async
¶
List tasks with pagination (no filters).
Ordering is deterministic on the primary key id so paginated
callers see stable windows.
Raises:
| Type | Description |
|---|---|
QueryError
|
If the query fails or pagination is out of range. |
Source code in src/synthorg/persistence/postgres/repositories.py
query
async
¶
Query tasks matching the filter spec.
Ordering is deterministic on the primary key id so paginated
callers see stable windows.
Raises:
| Type | Description |
|---|---|
QueryError
|
If the query fails or pagination is out of range. |
Source code in src/synthorg/persistence/postgres/repositories.py
count
async
¶
Count tasks matching the given filter spec.
Source code in src/synthorg/persistence/postgres/repositories.py
delete
async
¶
Delete a task by ID.
Source code in src/synthorg/persistence/postgres/repositories.py
PostgresCostRecordRepository
¶
Postgres implementation of the CostRecordRepository protocol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool
|
AsyncConnectionPool
|
An open psycopg_pool.AsyncConnectionPool. |
required |
Source code in src/synthorg/persistence/postgres/repositories.py
append
async
¶
Persist a cost record (append-only per AppendOnlyRepository).
Source code in src/synthorg/persistence/postgres/repositories.py
query
async
¶
Query cost records matching filter spec with pagination.
Raises:
| Type | Description |
|---|---|
QueryError
|
If the query fails or pagination is out of range. |
Source code in src/synthorg/persistence/postgres/repositories.py
aggregate
async
¶
Sum total cost, optionally filtered by agent and/or task.
Raises :class:MixedCurrencyAggregationError when the matched rows
span multiple currencies. The distinct-currency probe and the
SUM run in a single aggregating query
(COUNT(DISTINCT) + STRING_AGG(DISTINCT) + SUM) so the
two observations share one snapshot and a concurrent commit
cannot change the result between them.
Source code in src/synthorg/persistence/postgres/repositories.py
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 | |
purge_before
async
¶
Delete cost records with timestamp before threshold (retention).
Source code in src/synthorg/persistence/postgres/repositories.py
PostgresMessageRepository
¶
Postgres implementation of the MessageRepository protocol.
content is stored as TEXT containing a JSON-serialized parts
array (same as SQLite, for protocol compatibility). metadata
and attachments use native JSONB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool
|
AsyncConnectionPool
|
An open psycopg_pool.AsyncConnectionPool. |
required |
Source code in src/synthorg/persistence/postgres/repositories.py
append
async
¶
Persist a message (append-only per AppendOnlyRepository).
Source code in src/synthorg/persistence/postgres/repositories.py
get_history
async
¶
Retrieve message history for a channel, newest first.
Source code in src/synthorg/persistence/postgres/repositories.py
get_by_id
async
¶
Fetch one message by (channel, id) via the PK point read.
The id predicate alone resolves the row (it is the primary
key); the extra channel predicate is a deliberate scoping
guard so a caller holding only a message id cannot read a
message outside the channel it asked for.
Source code in src/synthorg/persistence/postgres/repositories.py
query
async
¶
Return messages matching the filter spec, newest first.
Raises:
| Type | Description |
|---|---|
QueryError
|
If the query fails or pagination is out of range. |
Source code in src/synthorg/persistence/postgres/repositories.py
purge_before
async
¶
Delete messages with timestamp < threshold (retention).
Source code in src/synthorg/persistence/postgres/repositories.py
delete
async
¶
Delete a single message by id (bespoke per ADR D7, moderation).
Returns True when a row was removed, False when the id
did not exist. The audit-grade mutation log is emitted by
:class:MessageService.delete_message; the repository never
logs mutations itself (persistence-boundary rule, see
docs/reference/persistence-boundary.md).