Skip to content

Persistence

Pluggable operational data persistence: protocol, configuration, SQLite backend, and Postgres backend.

Protocol

protocol

PersistenceBackend protocol -- lifecycle + repository access.

Application code depends on this protocol for storage lifecycle management. Repository protocols provide entity-level access.

PersistenceBackend

Bases: Protocol

Lifecycle management for operational data storage.

Concrete backends implement this protocol to provide connection management, health monitoring, schema migrations, and access to entity-specific repositories.

Attributes:

Name Type Description
is_connected bool

Whether the backend has an active connection.

backend_name NotBlankStr

Human-readable backend identifier.

tasks TaskRepository

Repository for Task persistence.

cost_records CostRecordRepository

Repository for CostRecord persistence.

messages MessageRepository

Repository for Message persistence.

lifecycle_events LifecycleEventRepository

Repository for AgentLifecycleEvent persistence.

task_metrics TaskMetricRepository

Repository for TaskMetricRecord persistence.

collaboration_metrics CollaborationMetricRepository

Repository for CollaborationMetricRecord persistence.

parked_contexts ParkedContextRepository

Repository for ParkedContext persistence.

audit_entries AuditRepository

Repository for AuditEntry persistence.

users UserRepository

Repository for User persistence.

api_keys ApiKeyRepository

Repository for ApiKey persistence.

checkpoints CheckpointRepository

Repository for Checkpoint persistence.

heartbeats HeartbeatRepository

Repository for Heartbeat persistence.

agent_states AgentStateRepository

Repository for AgentRuntimeState persistence.

settings SettingsRepository

Repository for namespaced settings persistence.

artifacts ArtifactRepository

Repository for Artifact persistence.

projects ProjectRepository

Repository for Project persistence.

custom_presets PersonalityPresetRepository

Repository for custom personality preset persistence.

workflow_definitions WorkflowDefinitionRepository

Repository for workflow definition persistence.

workflow_executions WorkflowExecutionRepository

Repository for workflow execution persistence.

workflow_versions VersionRepository[WorkflowDefinition]

Repository for workflow definition version snapshot persistence.

identity_versions VersionRepository[AgentIdentity]

Repository for AgentIdentity version snapshot persistence.

evaluation_config_versions VersionRepository[EvaluationConfig]

Repository for EvaluationConfig version snapshot persistence.

budget_config_versions VersionRepository[BudgetConfig]

Repository for BudgetConfig version snapshot persistence.

company_versions VersionRepository[Company]

Repository for Company version snapshot persistence.

role_versions VersionRepository[Role]

Repository for Role version snapshot persistence.

decision_records DecisionRepository

Repository for DecisionRecord persistence (auditable approval-gate decisions drop-box).

risk_overrides RiskOverrideRepository

Repository for RiskTierOverride persistence.

ssrf_violations SsrfViolationRepository

Repository for SsrfViolation persistence.

circuit_breaker_state CircuitBreakerStateRepository

Repository for circuit breaker state persistence.

connections ConnectionRepository

Repository for external service connection persistence.

connection_secrets ConnectionSecretRepository

Repository for encrypted connection secret persistence.

oauth_states OAuthStateRepository

Repository for transient OAuth authorization state persistence.

webhook_receipts WebhookReceiptRepository

Repository for webhook receipt log persistence.

idempotency_keys IdempotencyRepository

Repository for persistent idempotency keys -- atomic claim/complete/fail primitive shared by webhook receivers, the backup endpoint, and any other retry-prone surface that needs cross-restart deduplication.

training_plans TrainingPlanRepository

Repository for training plan persistence.

training_results TrainingResultRepository

Repository for training result persistence.

custom_rules CustomRuleRepository

Repository for custom signal rule persistence.

kind property

kind

Return the backend's discriminator string.

One of "sqlite" or "postgres". Used by call sites that need to pick a backend-specific helper (e.g. backup handler factories) without isinstance checks. The Literal type means mypy rejects an implementation that returns any other string.

is_connected property

is_connected

Whether the backend has an active connection.

backend_name property

backend_name

Human-readable backend identifier (e.g. "sqlite").

tasks property

tasks

Repository for Task persistence.

cost_records property

cost_records

Repository for CostRecord persistence.

messages property

messages

Repository for Message persistence.

lifecycle_events property

lifecycle_events

Repository for AgentLifecycleEvent persistence.

task_metrics property

task_metrics

Repository for TaskMetricRecord persistence.

collaboration_metrics property

collaboration_metrics

Repository for CollaborationMetricRecord persistence.

parked_contexts property

parked_contexts

Repository for ParkedContext persistence.

audit_entries property

audit_entries

Repository for AuditEntry persistence.

provider_audit_events property

provider_audit_events

Repository for the provider mutation audit log.

preset_overrides property

preset_overrides

Repository for operator-authored provider preset overrides.

decision_records property

decision_records

Repository for DecisionRecord persistence (decisions drop-box).

users property

users

Repository for User persistence.

api_keys property

api_keys

Repository for ApiKey persistence.

checkpoints property

checkpoints

Repository for Checkpoint persistence.

heartbeats property

heartbeats

Repository for Heartbeat persistence.

agent_states property

agent_states

Repository for AgentRuntimeState persistence.

settings property

settings

Repository for namespaced settings persistence.

artifacts property

artifacts

Repository for Artifact persistence.

projects property

projects

Repository for Project persistence.

custom_presets property

custom_presets

Repository for custom personality preset persistence.

workflow_definitions property

workflow_definitions

Repository for workflow definition persistence.

workflow_executions property

workflow_executions

Repository for workflow execution persistence.

subworkflows property

subworkflows

Repository for versioned subworkflow persistence.

workflow_versions property

workflow_versions

Repository for workflow definition version snapshot persistence.

identity_versions property

identity_versions

Repository for AgentIdentity version snapshot persistence.

evaluation_config_versions property

evaluation_config_versions

Repository for EvaluationConfig version snapshot persistence.

budget_config_versions property

budget_config_versions

Repository for BudgetConfig version snapshot persistence.

company_versions property

company_versions

Repository for Company version snapshot persistence.

role_versions property

role_versions

Repository for Role version snapshot persistence.

risk_overrides property

risk_overrides

Repository for risk tier override persistence.

ssrf_violations property

ssrf_violations

Repository for SSRF violation record persistence.

circuit_breaker_state property

circuit_breaker_state

Repository for circuit breaker state persistence.

ceremony_scheduler_state property

ceremony_scheduler_state

Repository for ceremony scheduler per-sprint state snapshots.

meeting_cooldown property

meeting_cooldown

Repository for meeting cooldown last-triggered timestamps.

tracked_containers property

tracked_containers

Repository for Docker sandbox tracked-container records.

connections property

connections

Repository for external service connection persistence.

connection_secrets property

connection_secrets

Repository for encrypted connection secret persistence.

oauth_states property

oauth_states

Repository for transient OAuth state persistence.

webhook_receipts property

webhook_receipts

Repository for webhook receipt log persistence.

idempotency_keys property

idempotency_keys

Repository for persistent idempotency keys.

seen_claims property

seen_claims

Repository for worker TaskClaim dedup persistence.

principle_overrides property

principle_overrides

Repository for rollback-restored principle overrides.

training_plans property

training_plans

Repository for training plan persistence.

training_results property

training_results

Repository for training result persistence.

custom_rules property

custom_rules

Repository for custom signal rule persistence.

sessions property

sessions

Repository for hybrid session state (durable + in-memory cache).

refresh_tokens property

refresh_tokens

Repository for single-use refresh-token rotation.

mcp_installations property

mcp_installations

Repository for MCP catalog installation records.

org_facts property

org_facts

Repository for organizational fact persistence (MVCC).

ontology_entities property

ontology_entities

Repository for ontology entity definitions.

ontology_drift property

ontology_drift

Repository for ontology drift reports.

project_cost_aggregates property

project_cost_aggregates

Repository for durable per-project cost aggregates.

fine_tune_checkpoints property

fine_tune_checkpoints

Repository for fine-tune checkpoint persistence.

Implementations that do not support fine-tuning MUST raise NotImplementedError with a descriptive message so callers do not silently receive an unusable repo.

fine_tune_runs property

fine_tune_runs

Repository for fine-tune pipeline run persistence.

Same availability semantics as :attr:fine_tune_checkpoints.

connect async

connect()

Establish connection to the storage backend.

Raises:

Type Description
PersistenceConnectionError

If the connection cannot be established.

Source code in src/synthorg/persistence/protocol.py
async def connect(self) -> None:
    """Establish connection to the storage backend.

    Raises:
        PersistenceConnectionError: If the connection cannot be
            established.
    """
    ...

disconnect async

disconnect()

Close the storage backend connection.

Safe to call even if not connected.

Source code in src/synthorg/persistence/protocol.py
async def disconnect(self) -> None:
    """Close the storage backend connection.

    Safe to call even if not connected.
    """
    ...

health_check async

health_check()

Check whether the backend is healthy and responsive.

Returns:

Type Description
bool

True if the backend is reachable and operational.

Source code in src/synthorg/persistence/protocol.py
async def health_check(self) -> bool:
    """Check whether the backend is healthy and responsive.

    Returns:
        ``True`` if the backend is reachable and operational.
    """
    ...

migrate async

migrate()

Run pending schema migrations.

Raises:

Type Description
MigrationError

If a migration fails.

Source code in src/synthorg/persistence/protocol.py
async def migrate(self) -> None:
    """Run pending schema migrations.

    Raises:
        MigrationError: If a migration fails.
    """
    ...

get_db

get_db()

Return the underlying database connection.

Returns:

Type Description
Any

The raw database connection object (backend-specific).

Raises:

Type Description
PersistenceConnectionError

If not yet connected.

Source code in src/synthorg/persistence/protocol.py
def get_db(self) -> Any:
    """Return the underlying database connection.

    Returns:
        The raw database connection object (backend-specific).

    Raises:
        PersistenceConnectionError: If not yet connected.
    """
    ...

write_context

write_context()

Async context manager around mutating SQL on this backend.

The mutual-exclusion guarantee is backend-specific:

  • SQLite acquires a shared in-process write lock so that multi-statement transactions on the single aiosqlite.Connection cannot interleave at the statement level. Concurrent writers on the same backend instance serialize.
  • Postgres yields immediately: each repository operation checks out an independent connection from the async pool, so writers are already isolated at the database level. The method exists on this backend only to keep the cross-backend interface uniform; it does not provide mutual exclusion beyond what the pool already gives.

Use it in repository write paths so the same code path works on both backends::

async with backend.write_context():
    await db.execute(...)
    await db.commit()

Each call returns a fresh context manager. On SQLite, the underlying lock primitive is shared across calls so concurrent callers serialize. On Postgres, there is no shared primitive. Repositories are wired with this method (as a callable) at backend construction; callers that already hold a PersistenceBackend reference can use it directly for cross-repo transactional boundaries.

Callers must not rely on write_context for distributed mutual exclusion or cross-backend serializability.

Source code in src/synthorg/persistence/protocol.py
def write_context(self) -> AbstractAsyncContextManager[None]:
    """Async context manager around mutating SQL on this backend.

    The mutual-exclusion guarantee is backend-specific:

    - **SQLite** acquires a shared in-process write lock so that
      multi-statement transactions on the single
      ``aiosqlite.Connection`` cannot interleave at the statement
      level. Concurrent writers on the same backend instance
      serialize.
    - **Postgres** yields immediately: each repository operation
      checks out an independent connection from the async pool, so
      writers are already isolated at the database level. The
      method exists on this backend only to keep the cross-backend
      interface uniform; it does not provide mutual exclusion
      beyond what the pool already gives.

    Use it in repository write paths so the same code path works
    on both backends::

        async with backend.write_context():
            await db.execute(...)
            await db.commit()

    Each call returns a fresh context manager. On SQLite, the
    underlying lock primitive is shared across calls so concurrent
    callers serialize. On Postgres, there is no shared primitive.
    Repositories are wired with this method (as a callable) at
    backend construction; callers that already hold a
    ``PersistenceBackend`` reference can use it directly for
    cross-repo transactional boundaries.

    Callers must not rely on ``write_context`` for distributed
    mutual exclusion or cross-backend serializability.
    """
    ...

build_lockouts

build_lockouts(auth_config)

Construct a lockout repository for this backend.

Method-based rather than property because :class:LockoutRepository needs the operator's AuthConfig (threshold, window, duration) which is app-layer config, not persistence-layer. Callers supply the config at startup; the returned repo shares this backend's connection / pool.

Raises:

Type Description
PersistenceConnectionError

If the backend is not connected.

Source code in src/synthorg/persistence/protocol.py
def build_lockouts(self, auth_config: AuthConfig) -> LockoutRepository:
    """Construct a lockout repository for this backend.

    Method-based rather than property because :class:`LockoutRepository`
    needs the operator's ``AuthConfig`` (threshold, window, duration)
    which is app-layer config, not persistence-layer.  Callers supply
    the config at startup; the returned repo shares this backend's
    connection / pool.

    Raises:
        PersistenceConnectionError: If the backend is not connected.
    """
    ...

build_escalations

build_escalations(*, notify_channel=None)

Construct an escalation queue repository for this backend.

Method-based rather than property because Postgres escalations accept an optional NOTIFY channel name -- cross-instance notify config lives on the escalation subsystem, not on persistence. notify_channel is ignored by the SQLite implementation.

Raises:

Type Description
PersistenceConnectionError

If the backend is not connected.

Source code in src/synthorg/persistence/protocol.py
def build_escalations(
    self,
    *,
    notify_channel: str | None = None,
) -> EscalationQueueRepository:
    """Construct an escalation queue repository for this backend.

    Method-based rather than property because Postgres escalations
    accept an optional NOTIFY channel name -- cross-instance notify
    config lives on the escalation subsystem, not on persistence.
    ``notify_channel`` is ignored by the SQLite implementation.

    Raises:
        PersistenceConnectionError: If the backend is not connected.
    """
    ...

build_ontology_versioning

build_ontology_versioning()

Construct the ontology versioning service bound to this backend.

Returns a versioning service wired to the backend's active DB handle. SQLite implementations bind the service to their aiosqlite.Connection; Postgres implementations bind to their AsyncConnectionPool.

Raises:

Type Description
PersistenceConnectionError

If the backend is not connected.

Source code in src/synthorg/persistence/protocol.py
def build_ontology_versioning(
    self,
) -> VersioningService[EntityDefinition]:
    """Construct the ontology versioning service bound to this backend.

    Returns a versioning service wired to the backend's active DB
    handle.  SQLite implementations bind the service to their
    ``aiosqlite.Connection``; Postgres implementations bind to their
    ``AsyncConnectionPool``.

    Raises:
        PersistenceConnectionError: If the backend is not connected.
    """
    ...

get_setting async

get_setting(key)

Retrieve a setting value by key.

Parameters:

Name Type Description Default
key NotBlankStr

Setting key.

required

Returns:

Type Description
str | None

The setting value, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/protocol.py
async def get_setting(self, key: NotBlankStr) -> str | None:
    """Retrieve a setting value by key.

    Args:
        key: Setting key.

    Returns:
        The setting value, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

set_setting async

set_setting(key, value)

Store a setting value.

Upserts -- creates or updates the key.

Parameters:

Name Type Description Default
key NotBlankStr

Setting key.

required
value str

Setting value.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/protocol.py
async def set_setting(self, key: NotBlankStr, value: str) -> None:
    """Store a setting value.

    Upserts -- creates or updates the key.

    Args:
        key: Setting key.
        value: Setting value.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

Config

config

Persistence configuration models.

Frozen Pydantic models for persistence backend selection and backend-specific settings.

SQLiteConfig pydantic-model

Bases: BaseModel

SQLite-specific persistence configuration.

Attributes:

Name Type Description
path NotBlankStr

Database file path. Use ":memory:" for in-memory databases (useful for testing).

wal_mode bool

Whether to enable WAL journal mode for concurrent read performance.

journal_size_limit int

Maximum WAL journal size in bytes (default 64 MB).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _reject_traversal

path pydantic-field

path = 'synthorg.db'

Database file path

wal_mode pydantic-field

wal_mode = True

Enable WAL journal mode

journal_size_limit pydantic-field

journal_size_limit = 67108864

Maximum WAL journal size in bytes

PostgresConfig pydantic-model

Bases: BaseModel

Postgres-specific persistence configuration.

Credentials are carried as SecretStr so they are redacted from logs, repr output, and Pydantic serialization unless explicitly unwrapped via get_secret_value().

Attributes:

Name Type Description
host NotBlankStr

Database host.

port int

Database port (default 5432).

database NotBlankStr

Database name.

username NotBlankStr

Database username.

password SecretStr

Database password (redacted in logs).

ssl_mode PostgresSslMode

libpq SSL mode. Default "require" refuses plaintext connections; production deployments with managed certificates should use "verify-full".

pool_min_size int

Minimum pooled connections (warmed on connect).

pool_max_size int

Maximum pooled connections; must be >= pool_min_size.

pool_timeout_seconds float

Seconds to wait for a pool checkout before raising.

application_name NotBlankStr

libpq application_name session parameter (appears in pg_stat_activity).

statement_timeout_ms int

Postgres statement_timeout session parameter; 0 disables. Default 30 seconds matches the pytest global timeout.

connect_timeout_seconds float

Seconds to wait for an initial connection attempt before raising.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

host pydantic-field

host = 'localhost'

Database host

port pydantic-field

port = 5432

Database port

database pydantic-field

database

Database name

username pydantic-field

username

Database username

password pydantic-field

password

Database password

ssl_mode pydantic-field

ssl_mode = 'require'

libpq SSL mode

pool_min_size pydantic-field

pool_min_size = 1

Minimum pooled connections

pool_max_size pydantic-field

pool_max_size = 10

Maximum pooled connections

pool_timeout_seconds pydantic-field

pool_timeout_seconds = 30.0

Pool checkout timeout in seconds

application_name pydantic-field

application_name = 'synthorg'

libpq application_name session parameter

statement_timeout_ms pydantic-field

statement_timeout_ms = 30000

Postgres statement_timeout session param in ms (0 disables)

connect_timeout_seconds pydantic-field

connect_timeout_seconds = 10.0

Initial connection timeout in seconds

enable_timescaledb pydantic-field

enable_timescaledb = False

Enable TimescaleDB hypertable conversion for append-only time-series tables (cost_records, audit_entries). Uses Apache-2.0 licensed hypertable features only; retention policies and compression are Timescale-License features and are not used. Requires the timescaledb extension on the Postgres server. Not supported on managed Postgres providers (AWS RDS, Cloud SQL, Azure Postgres).

cost_records_chunk_interval pydantic-field

cost_records_chunk_interval = '1 day'

Hypertable chunk interval for cost_records. Ignored when enable_timescaledb is False.

audit_entries_chunk_interval pydantic-field

audit_entries_chunk_interval = '1 day'

Hypertable chunk interval for audit_entries. Ignored when enable_timescaledb is False.

PersistenceConfig pydantic-model

Bases: BaseModel

Top-level persistence configuration.

Attributes:

Name Type Description
backend NotBlankStr

Backend name. One of "sqlite" or "postgres".

sqlite SQLiteConfig

SQLite-specific settings (used when backend="sqlite").

postgres PostgresConfig | None

Postgres-specific settings (required when backend="postgres").

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_backend_name

backend pydantic-field

backend = 'sqlite'

Persistence backend name

sqlite pydantic-field

sqlite

SQLite-specific settings

postgres pydantic-field

postgres = None

Postgres-specific settings (required when backend=postgres)

Repositories

Repository protocols are split by domain so each lives alongside its typed-ID and helper imports.

agent_state_protocol

AgentState repository protocol.

AgentStateRepository

Bases: IdKeyedRepository['AgentRuntimeState', NotBlankStr], Protocol

CRUD + query interface for agent runtime state persistence.

Composes :class:IdKeyedRepository (ADR-0001). Bespoke per D7: :meth:get_active filters non-idle agents and orders by last_activity_at DESC, which the generic list_items cannot express and which dashboard live views poll on the hot path.

save async

save(entity)

Upsert an agent runtime state by agent_id.

Parameters:

Name Type Description Default
entity AgentRuntimeState

The agent runtime state to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/agent_state_protocol.py
async def save(self, entity: AgentRuntimeState) -> None:
    """Upsert an agent runtime state by ``agent_id``.

    Args:
        entity: The agent runtime state to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(entity_id)

Retrieve an agent runtime state by agent ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The agent identifier.

required

Returns:

Type Description
AgentRuntimeState | None

The agent state, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/agent_state_protocol.py
async def get(self, entity_id: NotBlankStr) -> AgentRuntimeState | None:
    """Retrieve an agent runtime state by agent ID.

    Args:
        entity_id: The agent identifier.

    Returns:
        The agent state, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List all agent runtime states in agent_id order.

Parameters:

Name Type Description Default
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
tuple[AgentRuntimeState, ...]

Agent states in ascending agent_id order.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/agent_state_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[AgentRuntimeState, ...]:
    """List all agent runtime states in ``agent_id`` order.

    Args:
        limit: Maximum rows to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        Agent states in ascending ``agent_id`` order.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_active async

get_active(*, limit=DEFAULT_PAGE_SIZE, offset=0)

Retrieve a bounded page of non-idle agent states.

Returns states where status != 'idle', ordered by last_activity_at descending then agent_id ascending (the stable secondary key makes paging deterministic when activity timestamps tie). Callers that need every active state drain via :func:synthorg.persistence._shared.collect_all.

Parameters:

Name Type Description Default
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
tuple[AgentRuntimeState, ...]

A page of active agent states.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/agent_state_protocol.py
async def get_active(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[AgentRuntimeState, ...]:
    """Retrieve a bounded page of non-idle agent states.

    Returns states where ``status != 'idle'``, ordered by
    ``last_activity_at`` descending then ``agent_id`` ascending
    (the stable secondary key makes paging deterministic when
    activity timestamps tie). Callers that need every active
    state drain via
    :func:`synthorg.persistence._shared.collect_all`.

    Args:
        limit: Maximum rows to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        A page of active agent states.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(entity_id)

Delete an agent runtime state by agent ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The agent identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/agent_state_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete an agent runtime state by agent ID.

    Args:
        entity_id: The agent identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

artifact_protocol

Artifact repository protocol.

ArtifactFilterSpec pydantic-model

Bases: BaseModel

Filter spec for ArtifactRepository.query (ADR-0001).

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

task_id pydantic-field

task_id = None

Filter by originating task ID

created_by pydantic-field

created_by = None

Filter by creator agent ID

artifact_type pydantic-field

artifact_type = None

Filter by artifact type

ArtifactRepository

Bases: IdKeyedRepository[Artifact, NotBlankStr], FilteredQueryRepository[Artifact, ArtifactFilterSpec], Protocol

CRUD + query interface for Artifact persistence.

Composes :class:IdKeyedRepository + :class:FilteredQueryRepository (ADR-0001).

The single bespoke method :meth:save_returning_outcome is retained (D7 bespoke-method policy) because callers need the insert/update outcome to attach the correct API_ARTIFACT_CREATED / API_ARTIFACT_UPDATED audit event without a TOCTOU get + save race. This avoids the rare but real bug where concurrent writers both observe "missing" and both report API_ARTIFACT_CREATED.

save async

save(entity)

Persist an artifact (insert or update by id).

Parameters:

Name Type Description Default
entity Artifact

The artifact to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_protocol.py
async def save(self, entity: Artifact) -> None:
    """Persist an artifact (insert or update by id).

    Args:
        entity: The artifact to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

save_returning_outcome async

save_returning_outcome(artifact)

Persist an artifact atomically and return the insert/update outcome.

This is a D7 bespoke method retained for the audit-event correctness invariant: callers need to know whether the write was an insert or update so they can emit the corresponding API_ARTIFACT_CREATED or API_ARTIFACT_UPDATED event. The outcome is computed atomically with the write (SQLite: INSERT ... ON CONFLICT(id) DO NOTHING rowcount; Postgres: xmax = 0 AS created) to avoid the TOCTOU window of a separate get() probe.

Parameters:

Name Type Description Default
artifact Artifact

The artifact to persist.

required

Returns:

Type Description
bool

True when this call inserted a new row, False when

bool

it updated an existing row in place.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_protocol.py
async def save_returning_outcome(self, artifact: Artifact) -> bool:
    """Persist an artifact atomically and return the insert/update outcome.

    This is a D7 bespoke method retained for the audit-event correctness
    invariant: callers need to know whether the write was an insert or
    update so they can emit the corresponding ``API_ARTIFACT_CREATED`` or
    ``API_ARTIFACT_UPDATED`` event. The outcome is computed atomically
    with the write (SQLite: ``INSERT ... ON CONFLICT(id) DO NOTHING``
    rowcount; Postgres: ``xmax = 0 AS created``) to avoid the TOCTOU
    window of a separate ``get()`` probe.

    Args:
        artifact: The artifact to persist.

    Returns:
        ``True`` when this call inserted a new row, ``False`` when
        it updated an existing row in place.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(entity_id)

Retrieve an artifact by its ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The artifact identifier.

required

Returns:

Type Description
Artifact | None

The artifact, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_protocol.py
async def get(self, entity_id: NotBlankStr) -> Artifact | None:
    """Retrieve an artifact by its ID.

    Args:
        entity_id: The artifact identifier.

    Returns:
        The artifact, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List all artifacts with pagination.

Results are ordered by artifact ID ascending to ensure deterministic pagination across backends.

Parameters:

Name Type Description Default
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[Artifact, ...]

Artifacts ordered by id ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Artifact, ...]:
    """List all artifacts with pagination.

    Results are ordered by artifact ID ascending to ensure
    deterministic pagination across backends.

    Args:
        limit: Maximum rows to return.
        offset: Rows to skip before the window.

    Returns:
        Artifacts ordered by id ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List artifacts matching the filter spec (paginated).

Results are ordered by artifact ID ascending to ensure deterministic pagination across backends.

Parameters:

Name Type Description Default
filter_spec ArtifactFilterSpec

Carries optional filters for task_id, created_by, artifact_type.

required
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[Artifact, ...]

Matching artifacts ordered by ID, as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

QueryError

If limit < 1 or offset < 0.

Source code in src/synthorg/persistence/artifact_protocol.py
async def query(
    self,
    filter_spec: ArtifactFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Artifact, ...]:
    """List artifacts matching the filter spec (paginated).

    Results are ordered by artifact ID ascending to ensure
    deterministic pagination across backends.

    Args:
        filter_spec: Carries optional filters for task_id, created_by,
            artifact_type.
        limit: Maximum rows to return.
        offset: Rows to skip before the window.

    Returns:
        Matching artifacts ordered by ID, as a tuple.

    Raises:
        PersistenceError: If the operation fails.
        QueryError: If ``limit < 1`` or ``offset < 0``.
    """
    ...

count async

count(filter_spec)

Count artifacts matching the filter spec.

Parameters:

Name Type Description Default
filter_spec ArtifactFilterSpec

Carries optional filters.

required

Returns:

Type Description
int

Total number of matching artifacts.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_protocol.py
async def count(self, filter_spec: ArtifactFilterSpec) -> int:
    """Count artifacts matching the filter spec.

    Args:
        filter_spec: Carries optional filters.

    Returns:
        Total number of matching artifacts.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(entity_id)

Delete an artifact by ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The artifact identifier.

required

Returns:

Type Description
bool

True if the artifact was deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete an artifact by ID.

    Args:
        entity_id: The artifact identifier.

    Returns:
        ``True`` if the artifact was deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

audit_protocol

Audit repository protocol.

AuditFilterSpec pydantic-model

Bases: BaseModel

Filter spec for AuditRepository.query.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_window

agent_id pydantic-field

agent_id = None

Filter by agent identifier

action_type pydantic-field

action_type = None

Filter by action type string

verdict pydantic-field

verdict = None

Filter by verdict (allow/deny/escalate/output_scan)

risk_level pydantic-field

risk_level = None

Filter by risk level

since pydantic-field

since = None

Only return entries at or after this timestamp

until pydantic-field

until = None

Only return entries at or before this timestamp

AuditRepository

Bases: AppendOnlyRepository['AuditEntry', AuditFilterSpec], Protocol

Append-only persistence + query interface for AuditEntry.

Composes :class:AppendOnlyRepository. Audit entries are immutable records of security evaluations. No update operations are provided to preserve audit integrity.

The single delete-style operation is :meth:purge_before, the retention sweeper used to enforce the operator-configurable security.audit_retention_days window. This is a deliberate exception to the append-only rule; see :meth:purge_before for the retention-vs-forensic tradeoff.

append async

append(entry)

Persist an audit entry (append-only).

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to persist.

required

Raises:

Type Description
DuplicateRecordError

If an entry with the same ID exists.

QueryError

If the operation fails.

Source code in src/synthorg/persistence/audit_protocol.py
async def append(self, entry: AuditEntry) -> None:
    """Persist an audit entry (append-only).

    Args:
        entry: The audit entry to persist.

    Raises:
        DuplicateRecordError: If an entry with the same ID exists.
        QueryError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Return audit entries matching the filter spec (paginated).

Results are ordered by timestamp descending (newest first).

Parameters:

Name Type Description Default
filter_spec AuditFilterSpec

Audit filter specification with optional filters.

required
limit int

Maximum number of entries to return (must be >= 1).

DEFAULT_PAGE_SIZE
offset int

Number of entries to skip (for pagination).

0

Returns:

Type Description
tuple[AuditEntry, ...]

Matching audit entries as a tuple.

Raises:

Type Description
QueryError

If the operation fails, limit < 1, or until is earlier than since in the filter spec.

Source code in src/synthorg/persistence/audit_protocol.py
async def query(
    self,
    filter_spec: AuditFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[AuditEntry, ...]:
    """Return audit entries matching the filter spec (paginated).

    Results are ordered by timestamp descending (newest first).

    Args:
        filter_spec: Audit filter specification with optional filters.
        limit: Maximum number of entries to return (must be >= 1).
        offset: Number of entries to skip (for pagination).

    Returns:
        Matching audit entries as a tuple.

    Raises:
        QueryError: If the operation fails, *limit* < 1, or
            *until* is earlier than *since* in the filter spec.
    """
    ...

purge_before async

purge_before(cutoff)

Delete audit entries older than cutoff (CFG-1 audit).

This is the one exception to the append-only rule: it powers the retention sweeper which enforces the operator-configurable security.audit_retention_days window. Rows are removed permanently; the retention-vs-forensic tradeoff is decided at the retention-window level, not per row.

Parameters:

Name Type Description Default
cutoff AwareDatetime

Entries strictly older than this UTC timestamp are deleted.

required

Returns:

Type Description
int

Number of rows removed.

Raises:

Type Description
QueryError

If the operation fails.

Source code in src/synthorg/persistence/audit_protocol.py
async def purge_before(self, cutoff: AwareDatetime) -> int:
    """Delete audit entries older than *cutoff* (CFG-1 audit).

    This is the one exception to the append-only rule: it powers
    the retention sweeper which enforces the operator-configurable
    ``security.audit_retention_days`` window. Rows are removed
    permanently; the retention-vs-forensic tradeoff is decided at
    the retention-window level, not per row.

    Args:
        cutoff: Entries strictly older than this UTC timestamp
            are deleted.

    Returns:
        Number of rows removed.

    Raises:
        QueryError: If the operation fails.
    """
    ...

checkpoint_protocol

Checkpoint and heartbeat repository protocols.

CheckpointFilterSpec pydantic-model

Bases: BaseModel

Filter spec for CheckpointRepository.query.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

execution_id pydantic-field

execution_id = None

Filter to a single execution

task_id pydantic-field

task_id = None

Filter to a single task

CheckpointRepository

Bases: AppendOnlyRepository['Checkpoint', CheckpointFilterSpec], Protocol

Append-only persistence interface for checkpoint rows.

Composes :class:AppendOnlyRepository.

  • get_latest returns the single newest row by turn_number under a filter; the generic query cannot express the LIMIT 1 ORDER BY turn_number DESC shape efficiently.
  • delete_by_execution is a batch delete keyed on execution_id; the generic purge_before(threshold) removes only by timestamp, not by execution scope.

append async

append(checkpoint)

Persist a checkpoint row (append-only).

Parameters:

Name Type Description Default
checkpoint Checkpoint

The checkpoint to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def append(self, checkpoint: Checkpoint) -> None:
    """Persist a checkpoint row (append-only).

    Args:
        checkpoint: The checkpoint to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Return checkpoints matching the filter, newest first.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def query(
    self,
    filter_spec: CheckpointFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Checkpoint, ...]:
    """Return checkpoints matching the filter, newest first."""
    ...

purge_before async

purge_before(threshold)

Delete checkpoints with saved_at < threshold.

threshold must be timezone-aware UTC; naive datetimes are rejected at the boundary so purge cut-offs do not depend on the caller's local-time assumption.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def purge_before(self, threshold: AwareDatetime) -> int:
    """Delete checkpoints with ``saved_at < threshold``.

    ``threshold`` must be timezone-aware UTC; naive datetimes are
    rejected at the boundary so purge cut-offs do not depend on the
    caller's local-time assumption.
    """
    ...

get_latest async

get_latest(*, execution_id=None, task_id=None)

Retrieve the latest checkpoint by turn_number.

At least one filter (execution_id or task_id) is required.

Parameters:

Name Type Description Default
execution_id NotBlankStr | None

Filter by execution identifier.

None
task_id NotBlankStr | None

Filter by task identifier.

None

Returns:

Type Description
Checkpoint | None

The checkpoint with the highest turn_number, or None.

Raises:

Type Description
PersistenceError

If the operation fails.

ValueError

If neither filter is provided.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def get_latest(
    self,
    *,
    execution_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
) -> Checkpoint | None:
    """Retrieve the latest checkpoint by turn_number.

    At least one filter (``execution_id`` or ``task_id``) is required.

    Args:
        execution_id: Filter by execution identifier.
        task_id: Filter by task identifier.

    Returns:
        The checkpoint with the highest turn_number, or ``None``.

    Raises:
        PersistenceError: If the operation fails.
        ValueError: If neither filter is provided.
    """
    ...

delete_by_execution async

delete_by_execution(execution_id)

Delete all checkpoints for an execution.

Parameters:

Name Type Description Default
execution_id NotBlankStr

The execution identifier.

required

Returns:

Type Description
int

Number of checkpoints deleted.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def delete_by_execution(self, execution_id: NotBlankStr) -> int:
    """Delete all checkpoints for an execution.

    Args:
        execution_id: The execution identifier.

    Returns:
        Number of checkpoints deleted.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

HeartbeatRepository

Bases: Protocol

CRUD interface for heartbeat persistence.

Heartbeats are a "singleton per execution" (one row per execution_id) but the dominant access pattern is :meth:get_stale (range query over last_heartbeat_at), which is not expressible in the generic categories. The save/get/delete surface looks superficially like :class:IdKeyedRepository, but composing that protocol would require list_items pagination that no caller needs while still leaving get_stale outside the generic surface. A fully bespoke protocol is simpler than splitting awareness across two surfaces.

save async

save(heartbeat)

Persist a heartbeat (upsert by execution_id).

Parameters:

Name Type Description Default
heartbeat Heartbeat

The heartbeat to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def save(self, heartbeat: Heartbeat) -> None:
    """Persist a heartbeat (upsert by execution_id).

    Args:
        heartbeat: The heartbeat to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(execution_id)

Retrieve a heartbeat by execution ID.

Parameters:

Name Type Description Default
execution_id NotBlankStr

The execution identifier.

required

Returns:

Type Description
Heartbeat | None

The heartbeat, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def get(self, execution_id: NotBlankStr) -> Heartbeat | None:
    """Retrieve a heartbeat by execution ID.

    Args:
        execution_id: The execution identifier.

    Returns:
        The heartbeat, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_stale async

get_stale(threshold, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Retrieve a bounded page of heartbeats older than the threshold.

Parameters:

Name Type Description Default
threshold AwareDatetime

Heartbeats with last_heartbeat_at before this timestamp are considered stale.

required
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
Heartbeat

A page of stale heartbeats ordered by last_heartbeat_at

...

then execution_id (stable secondary key for

tuple[Heartbeat, ...]

deterministic paging). Callers needing every stale

tuple[Heartbeat, ...]

heartbeat drain via

tuple[Heartbeat, ...]

func:synthorg.persistence._shared.collect_all.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def get_stale(
    self,
    threshold: AwareDatetime,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Heartbeat, ...]:
    """Retrieve a bounded page of heartbeats older than the threshold.

    Args:
        threshold: Heartbeats with ``last_heartbeat_at`` before
            this timestamp are considered stale.
        limit: Maximum rows to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        A page of stale heartbeats ordered by ``last_heartbeat_at``
        then ``execution_id`` (stable secondary key for
        deterministic paging). Callers needing every stale
        heartbeat drain via
        :func:`synthorg.persistence._shared.collect_all`.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(execution_id)

Delete a heartbeat by execution ID.

Parameters:

Name Type Description Default
execution_id NotBlankStr

The execution identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/checkpoint_protocol.py
async def delete(self, execution_id: NotBlankStr) -> bool:
    """Delete a heartbeat by execution ID.

    Args:
        execution_id: The execution identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

connection_protocol

Repository protocols for integration persistence.

Defines CRUD interfaces for connections, encrypted secret blobs, OAuth authorization states, and webhook receipts.

ConnectionFilterSpec pydantic-model

Bases: BaseModel

Filter spec for ConnectionRepository.query (ADR-0001).

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

connection_type pydantic-field

connection_type = None

Filter by connection type

ConnectionRepository

Bases: IdKeyedRepository[Connection, NotBlankStr], FilteredQueryRepository[Connection, ConnectionFilterSpec], Protocol

CRUD + query interface for Connection persistence.

Composes :class:IdKeyedRepository + :class:FilteredQueryRepository (ADR-0001). Entity is keyed by name field.

save async

save(entity)

Persist a connection (insert or upsert by name).

Source code in src/synthorg/persistence/connection_protocol.py
async def save(self, entity: Connection) -> None:
    """Persist a connection (insert or upsert by name)."""
    ...

get async

get(entity_id)

Retrieve a connection by name.

Source code in src/synthorg/persistence/connection_protocol.py
async def get(self, entity_id: NotBlankStr) -> Connection | None:
    """Retrieve a connection by name."""
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List all connections with pagination.

Sorted by name ascending.

Source code in src/synthorg/persistence/connection_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Connection, ...]:
    """List all connections with pagination.

    Sorted by ``name`` ascending.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List connections matching the filter spec.

Sorted by name ascending.

Source code in src/synthorg/persistence/connection_protocol.py
async def query(
    self,
    filter_spec: ConnectionFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Connection, ...]:
    """List connections matching the filter spec.

    Sorted by ``name`` ascending.
    """
    ...

count async

count(filter_spec)

Count connections matching the filter spec.

Source code in src/synthorg/persistence/connection_protocol.py
async def count(self, filter_spec: ConnectionFilterSpec) -> int:
    """Count connections matching the filter spec."""
    ...

delete async

delete(entity_id)

Delete a connection by name.

Returns:

Type Description
bool

True if the connection existed and was deleted.

Source code in src/synthorg/persistence/connection_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete a connection by name.

    Returns:
        ``True`` if the connection existed and was deleted.
    """
    ...

ConnectionSecretRepository

Bases: Protocol

Low-level CRUD for encrypted secret blobs.

Used by EncryptedSqliteSecretBackend; other backends manage their own storage.

store async

store(secret_id, encrypted_value, key_version)

Persist an encrypted secret.

Source code in src/synthorg/persistence/connection_protocol.py
async def store(
    self,
    secret_id: NotBlankStr,
    encrypted_value: bytes,
    key_version: int,
) -> None:
    """Persist an encrypted secret."""
    ...

retrieve async

retrieve(secret_id)

Retrieve an encrypted secret blob.

Source code in src/synthorg/persistence/connection_protocol.py
async def retrieve(self, secret_id: NotBlankStr) -> bytes | None:
    """Retrieve an encrypted secret blob."""
    ...

delete async

delete(secret_id)

Delete an encrypted secret.

Source code in src/synthorg/persistence/connection_protocol.py
async def delete(self, secret_id: NotBlankStr) -> bool:
    """Delete an encrypted secret."""
    ...

OAuthStateRepository

Bases: IdKeyedRepository[OAuthState, NotBlankStr], Protocol

CRUD for transient OAuth authorization states.

Composes :class:IdKeyedRepository (ADR-0001). Entity is keyed by state_token field. Bespoke per ADR-0001 D7: :meth:mark_consumed (compare-and-set for idempotency) and :meth:cleanup_expired (TTL-based garbage collection).

save async

save(entity)

Persist an OAuth state.

Source code in src/synthorg/persistence/connection_protocol.py
async def save(self, entity: OAuthState) -> None:
    """Persist an OAuth state."""
    ...

get async

get(entity_id)

Retrieve by state token.

Source code in src/synthorg/persistence/connection_protocol.py
async def get(self, entity_id: NotBlankStr) -> OAuthState | None:
    """Retrieve by state token."""
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List all OAuth states with pagination.

Source code in src/synthorg/persistence/connection_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[OAuthState, ...]:
    """List all OAuth states with pagination."""
    ...

delete async

delete(entity_id)

Delete a state token (consumed or expired).

Source code in src/synthorg/persistence/connection_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete a state token (consumed or expired)."""
    ...

mark_consumed async

mark_consumed(state_token, *, connection_name, consumed_at)

Mark a state token as consumed by a successful callback.

Stamps consumed_at and records connection_name so a redelivered callback (provider retry, browser back-button, CDN replay) returns the original connection name without re-exchanging the authorization code. The compare-and-set is atomic at the row level; second and subsequent calls observe the existing consumed_at and return False.

Implementations MUST stamp both consumed_at and connection_name_returned in a single atomic UPDATE. :class:OAuthState validates the two fields are always set together (see _validate_consumed_pair); a partial write would let a redelivered callback observe consumed_at set with no connection_name_returned to return.

Returns:

Type Description
bool

True if a row was updated (state existed and was not

bool

already consumed); False if the row was missing or

bool

already consumed.

Raises:

Type Description
QueryError

On database errors.

Source code in src/synthorg/persistence/connection_protocol.py
async def mark_consumed(
    self,
    state_token: NotBlankStr,
    *,
    connection_name: NotBlankStr,
    consumed_at: datetime,
) -> bool:
    """Mark a state token as consumed by a successful callback.

    Stamps ``consumed_at`` and records ``connection_name`` so a
    redelivered callback (provider retry, browser back-button,
    CDN replay) returns the original connection name without
    re-exchanging the authorization code. The compare-and-set is
    atomic at the row level; second and subsequent calls observe
    the existing ``consumed_at`` and return ``False``.

    Implementations MUST stamp both ``consumed_at`` and
    ``connection_name_returned`` in a single atomic UPDATE.
    :class:`OAuthState` validates the two fields are always set
    together (see ``_validate_consumed_pair``); a partial write
    would let a redelivered callback observe ``consumed_at`` set
    with no ``connection_name_returned`` to return.

    Returns:
        ``True`` if a row was updated (state existed and was not
        already consumed); ``False`` if the row was missing or
        already consumed.

    Raises:
        QueryError: On database errors.
    """
    ...

cleanup_expired async

cleanup_expired(retention_seconds)

Delete all expired states.

Also reaps consumed-but-stale rows older than retention_seconds so the idempotency table does not grow unbounded.

Returns:

Type Description
int

Number of deleted rows.

Source code in src/synthorg/persistence/connection_protocol.py
async def cleanup_expired(self, retention_seconds: float) -> int:
    """Delete all expired states.

    Also reaps consumed-but-stale rows older than
    ``retention_seconds`` so the idempotency table does not grow
    unbounded.

    Returns:
        Number of deleted rows.
    """
    ...

WebhookReceiptRepository

Bases: IdKeyedRepository[WebhookReceipt, NotBlankStr], Protocol

CRUD for webhook receipt log entries.

Composes :class:IdKeyedRepository (ADR-0001). Entity is keyed by receipt_id field. Bespoke per ADR-0001 D7: :meth:update_status and :meth:update_status_if_current (lifecycle updates), :meth:get_by_connection (alternate-key query), and :meth:cleanup_old_for_connection (retention policy).

save async

save(entity)

Persist a webhook receipt.

Source code in src/synthorg/persistence/connection_protocol.py
async def save(self, entity: WebhookReceipt) -> None:
    """Persist a webhook receipt."""
    ...

get async

get(entity_id)

Fetch a single receipt by ID, or None when absent.

Used by the retry endpoint to look up a failed receipt before re-publishing its captured payload to the bus.

Source code in src/synthorg/persistence/connection_protocol.py
async def get(self, entity_id: NotBlankStr) -> WebhookReceipt | None:
    """Fetch a single receipt by ID, or ``None`` when absent.

    Used by the retry endpoint to look up a failed receipt before
    re-publishing its captured payload to the bus.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List all webhook receipts with pagination.

Source code in src/synthorg/persistence/connection_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[WebhookReceipt, ...]:
    """List all webhook receipts with pagination."""
    ...

delete async

delete(entity_id)

Delete a webhook receipt by ID.

Returns:

Type Description
bool

True if the receipt existed and was deleted.

Source code in src/synthorg/persistence/connection_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete a webhook receipt by ID.

    Returns:
        ``True`` if the receipt existed and was deleted.
    """
    ...

update_status async

update_status(receipt_id, *, status, processed_at, error)

Update the receipt's lifecycle fields.

Returns True when the row existed and was updated, False when no row matched the ID. Callers can use the boolean to distinguish "not found" from a successful no-op without raising.

Source code in src/synthorg/persistence/connection_protocol.py
async def update_status(
    self,
    receipt_id: NotBlankStr,
    *,
    status: str,
    processed_at: datetime | None,
    error: str | None,
) -> bool:
    """Update the receipt's lifecycle fields.

    Returns ``True`` when the row existed and was updated, ``False``
    when no row matched the ID. Callers can use the boolean to
    distinguish "not found" from a successful no-op without raising.
    """
    ...

update_status_if_current async

update_status_if_current(receipt_id, *, expected_status, status, processed_at, error)

Compare-and-set variant of update_status.

Atomically updates the row only when its current status column equals expected_status. Returns True on a successful transition, False when the row is missing OR the row's current status differs from expected_status (lost the race). The retry endpoint uses this to close the TOCTOU window where two concurrent operator-triggered retries could both load the same receipt, both transition it to retrying, and both republish the captured payload.

Source code in src/synthorg/persistence/connection_protocol.py
async def update_status_if_current(
    self,
    receipt_id: NotBlankStr,
    *,
    expected_status: str,
    status: str,
    processed_at: datetime | None,
    error: str | None,
) -> bool:
    """Compare-and-set variant of ``update_status``.

    Atomically updates the row only when its current ``status`` column
    equals ``expected_status``. Returns ``True`` on a successful
    transition, ``False`` when the row is missing OR the row's
    current status differs from ``expected_status`` (lost the race).
    The retry endpoint uses this to close the TOCTOU window where
    two concurrent operator-triggered retries could both load the
    same receipt, both transition it to ``retrying``, and both
    republish the captured payload.
    """
    ...

get_by_connection async

get_by_connection(connection_name, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List receipts for a connection, newest first.

limit <= 0 returns (); offset skips that many rows before slicing the limit window.

Source code in src/synthorg/persistence/connection_protocol.py
async def get_by_connection(
    self,
    connection_name: NotBlankStr,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[WebhookReceipt, ...]:
    """List receipts for a connection, newest first.

    ``limit <= 0`` returns ``()``; ``offset`` skips that many rows
    before slicing the limit window.
    """
    ...

cleanup_old_for_connection async

cleanup_old_for_connection(connection_name, retention_days)

Delete receipts for connection_name older than retention_days.

retention_days <= 0 is a no-op so callers cannot accidentally truncate a connection's log via misconfiguration.

Returns:

Type Description
int

Number of deleted rows.

Source code in src/synthorg/persistence/connection_protocol.py
async def cleanup_old_for_connection(
    self,
    connection_name: NotBlankStr,
    retention_days: int,
) -> int:
    """Delete receipts for *connection_name* older than *retention_days*.

    ``retention_days <= 0`` is a no-op so callers cannot accidentally
    truncate a connection's log via misconfiguration.

    Returns:
        Number of deleted rows.
    """
    ...

cost_record_protocol

CostRecord repository protocol.

CostRecordFilterSpec pydantic-model

Bases: BaseModel

Filter spec for CostRecordRepository.query (ADR-0001).

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id = None

Filter by agent identifier

task_id pydantic-field

task_id = None

Filter by task identifier

CostRecordRepository

Bases: AppendOnlyRepository['CostRecord', CostRecordFilterSpec], Protocol

Append-only persistence + query/aggregation for CostRecord.

Composes :class:AppendOnlyRepository (ADR-0001). Bespoke per D7:

  • aggregate sums total cost with a mixed-currency rejection invariant; the generic query cannot express aggregation.

append async

append(event)

Persist a cost record (append-only).

Parameters:

Name Type Description Default
event CostRecord

The cost record to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/cost_record_protocol.py
async def append(self, event: CostRecord) -> None:
    """Persist a cost record (append-only).

    Args:
        event: The cost record to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Query cost records with optional filters and pagination.

Parameters:

Name Type Description Default
filter_spec CostRecordFilterSpec

Carries optional agent_id and task_id filters.

required
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before applying limit.

0

Returns:

Type Description
tuple[CostRecord, ...]

Matching cost records as a tuple, ordered newest-first.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/cost_record_protocol.py
async def query(
    self,
    filter_spec: CostRecordFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[CostRecord, ...]:
    """Query cost records with optional filters and pagination.

    Args:
        filter_spec: Carries optional agent_id and task_id filters.
        limit: Maximum rows to return.
        offset: Rows to skip before applying limit.

    Returns:
        Matching cost records as a tuple, ordered newest-first.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

purge_before async

purge_before(threshold)

Delete cost records with timestamp before threshold (retention).

Parameters:

Name Type Description Default
threshold datetime

Records older than this are deleted.

required

Returns:

Type Description
int

Number of rows removed.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/cost_record_protocol.py
async def purge_before(self, threshold: datetime) -> int:
    """Delete cost records with timestamp before threshold (retention).

    Args:
        threshold: Records older than this are deleted.

    Returns:
        Number of rows removed.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

aggregate async

aggregate(*, agent_id=None, task_id=None)

Sum total cost, optionally filtered by agent and/or task.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent identifier.

None
task_id NotBlankStr | None

Filter by task identifier.

None

Returns:

Type Description
float

Total cost in the configured currency.

Raises:

Type Description
MixedCurrencyAggregationError

If the matched cost records span more than one currency. Aggregation is rejected rather than silently summing across currencies; the controller maps this to HTTP 409. Filter by agent_id/task_id (or by date window in caller code) to scope the aggregation to a single currency.

PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/cost_record_protocol.py
async def aggregate(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
) -> float:
    """Sum total cost, optionally filtered by agent and/or task.

    Args:
        agent_id: Filter by agent identifier.
        task_id: Filter by task identifier.

    Returns:
        Total cost in the configured currency.

    Raises:
        MixedCurrencyAggregationError: If the matched cost records
            span more than one currency.  Aggregation is rejected
            rather than silently summing across currencies; the
            controller maps this to HTTP 409.  Filter by
            ``agent_id``/``task_id`` (or by date window in caller
            code) to scope the aggregation to a single currency.
        PersistenceError: If the operation fails.
    """
    ...

decision_protocol

Decision records repository protocol.

DecisionRole module-attribute

DecisionRole = Literal['executor', 'reviewer']

Valid role filters for DecisionRepository.query.

DecisionFilterSpec pydantic-model

Bases: BaseModel

Filter spec for DecisionRepository.query (ADR-0001).

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

task_id pydantic-field

task_id = None

Filter by task identifier

agent_id pydantic-field

agent_id = None

Filter by agent identifier

role pydantic-field

role = None

Filter by agent role ('executor' or 'reviewer')

DecisionRepository

Bases: AppendOnlyRepository['DecisionRecord', DecisionFilterSpec], Protocol

Append-only persistence + query interface for DecisionRecord.

Decision records are immutable audit entries of review gate decisions. No update or delete operations are provided to preserve audit integrity.

Composes :class:AppendOnlyRepository (ADR-0001). Bespoke per D7:

  • append_with_next_version atomically computes version via SQL subquery to eliminate the TOCTOU race under concurrent reviewers.
  • get(record_id) is kept because AppendOnlyRepository has no per-record retrieval (append-only logs are typically queried only via filtered scans); callers need direct ID-based lookups.
  • list_by_task and list_by_agent are kept because they serve different consumers with different sort orders: list_by_task returns oldest-first (chronological), while list_by_agent returns newest-first (cursor-pagination stable under concurrent appends).

append_with_next_version async

append_with_next_version(
    *,
    record_id,
    task_id,
    approval_id,
    executing_agent_id,
    reviewer_agent_id,
    decision,
    reason,
    criteria_snapshot,
    recorded_at,
    metadata=None,
)

Atomically append a decision record computing version in SQL.

Computes version = COALESCE(MAX(version), 0) + 1 for the given task_id inside a single INSERT statement (atomic under aiosqlite's per-statement serialization), eliminating the TOCTOU race that a list_by_task + len(...) + 1 pattern would create under concurrent reviewers.

Parameters:

Name Type Description Default
record_id NotBlankStr

Unique record identifier (UUID recommended).

required
task_id NotBlankStr

Task that was reviewed.

required
approval_id NotBlankStr | None

Associated ApprovalItem identifier, or None.

required
executing_agent_id NotBlankStr

Agent that performed the work.

required
reviewer_agent_id NotBlankStr

Agent or human that reviewed.

required
decision DecisionOutcome

Outcome of the review.

required
reason str | None

Optional rationale.

required
criteria_snapshot tuple[NotBlankStr, ...]

Acceptance criteria at decision time.

required
recorded_at AwareDatetime

Decision timestamp (must be timezone-aware). Normalized to UTC before storage so records read back via get / list_by_task / list_by_agent will always carry UTC timestamps.

required
metadata dict[str, object] | None

Forward-compatible metadata. Defaults to {} when not supplied; callers that do not attach metadata do not have to pass an empty dict.

None

Returns:

Type Description
DecisionRecord

The persisted DecisionRecord with the server-assigned

DecisionRecord

version.

Raises:

Type Description
DuplicateRecordError

If a record with record_id already exists, or a concurrent writer won the UNIQUE(task_id, version) race.

QueryError

If the operation fails.

Source code in src/synthorg/persistence/decision_protocol.py
async def append_with_next_version(  # noqa: PLR0913
    self,
    *,
    record_id: NotBlankStr,
    task_id: NotBlankStr,
    approval_id: NotBlankStr | None,
    executing_agent_id: NotBlankStr,
    reviewer_agent_id: NotBlankStr,
    decision: DecisionOutcome,
    reason: str | None,
    criteria_snapshot: tuple[NotBlankStr, ...],
    recorded_at: AwareDatetime,
    metadata: dict[str, object] | None = None,
) -> DecisionRecord:
    """Atomically append a decision record computing version in SQL.

    Computes ``version = COALESCE(MAX(version), 0) + 1`` for the
    given ``task_id`` inside a single ``INSERT`` statement (atomic
    under aiosqlite's per-statement serialization), eliminating the
    TOCTOU race that a ``list_by_task`` + ``len(...) + 1`` pattern
    would create under concurrent reviewers.

    Args:
        record_id: Unique record identifier (UUID recommended).
        task_id: Task that was reviewed.
        approval_id: Associated ``ApprovalItem`` identifier, or ``None``.
        executing_agent_id: Agent that performed the work.
        reviewer_agent_id: Agent or human that reviewed.
        decision: Outcome of the review.
        reason: Optional rationale.
        criteria_snapshot: Acceptance criteria at decision time.
        recorded_at: Decision timestamp (must be timezone-aware).
            Normalized to UTC before storage so records read back
            via ``get`` / ``list_by_task`` / ``list_by_agent`` will
            always carry UTC timestamps.
        metadata: Forward-compatible metadata.  Defaults to ``{}``
            when not supplied; callers that do not attach
            metadata do not have to pass an empty dict.

    Returns:
        The persisted ``DecisionRecord`` with the server-assigned
        ``version``.

    Raises:
        DuplicateRecordError: If a record with ``record_id`` already
            exists, or a concurrent writer won the
            ``UNIQUE(task_id, version)`` race.
        QueryError: If the operation fails.
    """
    ...

append async

append(event)

Append a decision record via precomputed version.

Normally callers use append_with_next_version which atomically computes the version in SQL. This method is provided for completeness of the :class:AppendOnlyRepository interface when the version is already known (rare).

Parameters:

Name Type Description Default
event DecisionRecord

The decision record to persist.

required

Raises:

Type Description
DuplicateRecordError

If a record with the same ID exists.

QueryError

If the operation fails.

Source code in src/synthorg/persistence/decision_protocol.py
async def append(self, event: DecisionRecord) -> None:
    """Append a decision record via precomputed version.

    Normally callers use ``append_with_next_version`` which
    atomically computes the version in SQL. This method is provided
    for completeness of the :class:`AppendOnlyRepository` interface
    when the version is already known (rare).

    Args:
        event: The decision record to persist.

    Raises:
        DuplicateRecordError: If a record with the same ID exists.
        QueryError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Query decision records with optional filters and pagination.

Results are ordered by timestamp and ID. When both agent_id and role are specified, returns records where the agent acted in that role (query(DecisionFilterSpec(agent_id="a1", role="reviewer")) returns decisions reviewed by agent "a1").

Parameters:

Name Type Description Default
filter_spec DecisionFilterSpec

Carries optional task_id, agent_id, and role filters. All filters are optional and combined with AND.

required
limit int

Maximum rows to return (>= 1).

DEFAULT_PAGE_SIZE
offset int

Rows to skip (>= 0).

0

Returns:

Type Description
DecisionRecord

Matching decision records as a tuple. When task_id is

...

specified, results are oldest-first (ascending recorded_at).

tuple[DecisionRecord, ...]

When agent_id / role are specified without

tuple[DecisionRecord, ...]

task_id, results are newest-first (descending

tuple[DecisionRecord, ...]

recorded_at). Mixed filters default to task-oriented ordering

tuple[DecisionRecord, ...]

(oldest-first).

Raises:

Type Description
QueryError

If the operation fails or pagination args are out of range.

Source code in src/synthorg/persistence/decision_protocol.py
async def query(
    self,
    filter_spec: DecisionFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[DecisionRecord, ...]:
    """Query decision records with optional filters and pagination.

    Results are ordered by timestamp and ID. When both ``agent_id``
    and ``role`` are specified, returns records where the agent
    acted in that role (``query(DecisionFilterSpec(agent_id="a1",
    role="reviewer"))`` returns decisions reviewed by agent "a1").

    Args:
        filter_spec: Carries optional task_id, agent_id, and role
            filters. All filters are optional and combined with AND.
        limit: Maximum rows to return (>= 1).
        offset: Rows to skip (>= 0).

    Returns:
        Matching decision records as a tuple. When ``task_id`` is
        specified, results are oldest-first (ascending recorded_at).
        When ``agent_id`` / ``role`` are specified without
        ``task_id``, results are newest-first (descending
        recorded_at). Mixed filters default to task-oriented ordering
        (oldest-first).

    Raises:
        QueryError: If the operation fails or pagination args are
            out of range.
    """
    ...

get async

get(record_id)

Retrieve a decision record by ID.

Parameters:

Name Type Description Default
record_id NotBlankStr

The record identifier.

required

Returns:

Type Description
DecisionRecord | None

The record, or None if not found.

Raises:

Type Description
QueryError

If the operation fails.

Source code in src/synthorg/persistence/decision_protocol.py
async def get(self, record_id: NotBlankStr) -> DecisionRecord | None:
    """Retrieve a decision record by ID.

    Args:
        record_id: The record identifier.

    Returns:
        The record, or ``None`` if not found.

    Raises:
        QueryError: If the operation fails.
    """
    ...

list_by_task async

list_by_task(task_id, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List decision records for a task (paginated, oldest first).

Parameters:

Name Type Description Default
task_id NotBlankStr

The task identifier.

required
limit int

Maximum rows to return (>= 1).

DEFAULT_PAGE_SIZE
offset int

Rows to skip (>= 0).

0

Returns:

Type Description
tuple[DecisionRecord, ...]

Matching records as a tuple (oldest first).

Raises:

Type Description
QueryError

If the operation fails or pagination args are out of range.

Source code in src/synthorg/persistence/decision_protocol.py
async def list_by_task(
    self,
    task_id: NotBlankStr,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[DecisionRecord, ...]:
    """List decision records for a task (paginated, oldest first).

    Args:
        task_id: The task identifier.
        limit: Maximum rows to return (>= 1).
        offset: Rows to skip (>= 0).

    Returns:
        Matching records as a tuple (oldest first).

    Raises:
        QueryError: If the operation fails or pagination args are
            out of range.
    """
    ...

list_by_agent async

list_by_agent(agent_id, *, role, limit=DEFAULT_PAGE_SIZE, offset=0)

List decision records by agent role (paginated, newest first).

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required
role DecisionRole

Either "executor" or "reviewer".

required
limit int

Maximum rows to return (>= 1).

DEFAULT_PAGE_SIZE
offset int

Rows to skip (>= 0).

0

Returns:

Type Description
DecisionRecord

Matching records as a tuple, ordered by

...

(recorded_at DESC, id DESC) so cursor pagination is

tuple[DecisionRecord, ...]

stable under concurrent inserts.

Raises:

Type Description
QueryError

If the operation fails, pagination args are out of range, or role is not a recognised value.

Source code in src/synthorg/persistence/decision_protocol.py
async def list_by_agent(
    self,
    agent_id: NotBlankStr,
    *,
    role: DecisionRole,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[DecisionRecord, ...]:
    """List decision records by agent role (paginated, newest first).

    Args:
        agent_id: The agent identifier.
        role: Either ``"executor"`` or ``"reviewer"``.
        limit: Maximum rows to return (>= 1).
        offset: Rows to skip (>= 0).

    Returns:
        Matching records as a tuple, ordered by
        ``(recorded_at DESC, id DESC)`` so cursor pagination is
        stable under concurrent inserts.

    Raises:
        QueryError: If the operation fails, pagination args are
            out of range, or ``role`` is not a recognised value.
    """
    ...

purge_before async

purge_before(threshold)

Delete decision records older than threshold (retention).

Parameters:

Name Type Description Default
threshold datetime

Records older than this are deleted.

required

Returns:

Type Description
int

Number of rows removed.

Raises:

Type Description
QueryError

If the operation fails.

Source code in src/synthorg/persistence/decision_protocol.py
async def purge_before(self, threshold: datetime) -> int:
    """Delete decision records older than threshold (retention).

    Args:
        threshold: Records older than this are deleted.

    Returns:
        Number of rows removed.

    Raises:
        QueryError: If the operation fails.
    """
    ...

message_protocol

Message repository protocol.

MessageFilterSpec pydantic-model

Bases: BaseModel

Filter spec for MessageRepository.query.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

channel pydantic-field

channel = None

Filter to messages on this channel

MessageRepository

Bases: AppendOnlyRepository[Message, MessageFilterSpec], Protocol

Write + history query interface for Message persistence.

Composes :class:AppendOnlyRepository.

  • get_history returns newest-first within one channel with the project's canonical limit default; it is the dashboard hot path and the existing controller calls already pass a channel name positionally.
  • delete supports per-message moderation / redaction; the generic purge_before(threshold) is the retention sweeper.

append async

append(message)

Persist a message (append-only).

Parameters:

Name Type Description Default
message Message

The message to persist.

required

Raises:

Type Description
DuplicateRecordError

If a message with the same ID exists.

PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/message_protocol.py
async def append(self, message: Message) -> None:
    """Persist a message (append-only).

    Args:
        message: The message to persist.

    Raises:
        DuplicateRecordError: If a message with the same ID exists.
        PersistenceError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Return messages matching the filter spec, newest first.

Parameters:

Name Type Description Default
filter_spec MessageFilterSpec

Carries optional channel filter.

required
limit int

Maximum number of messages to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
tuple[Message, ...]

Messages ordered by timestamp descending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/message_protocol.py
async def query(
    self,
    filter_spec: MessageFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Message, ...]:
    """Return messages matching the filter spec, newest first.

    Args:
        filter_spec: Carries optional ``channel`` filter.
        limit: Maximum number of messages to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        Messages ordered by timestamp descending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

purge_before async

purge_before(threshold)

Delete messages with timestamp < threshold.

Returns:

Type Description
int

Number of rows removed.

Source code in src/synthorg/persistence/message_protocol.py
async def purge_before(self, threshold: datetime) -> int:
    """Delete messages with ``timestamp < threshold``.

    Returns:
        Number of rows removed.
    """
    ...

get_history async

get_history(channel, *, limit=DEFAULT_LIST_LIMIT)

Retrieve message history for a channel (dashboard hot path).

Parameters:

Name Type Description Default
channel NotBlankStr

Channel name to query.

required
limit int

Maximum number of messages to return (newest first).

DEFAULT_LIST_LIMIT

Returns:

Type Description
tuple[Message, ...]

Messages ordered by timestamp descending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/message_protocol.py
async def get_history(
    self,
    channel: NotBlankStr,
    *,
    limit: int = DEFAULT_LIST_LIMIT,
) -> tuple[Message, ...]:
    """Retrieve message history for a channel (dashboard hot path).

    Args:
        channel: Channel name to query.
        limit: Maximum number of messages to return (newest first).

    Returns:
        Messages ordered by timestamp descending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_id async

get_by_id(channel, message_id)

Fetch a single message by (channel, id).

messages.id is the primary key (globally unique), so the lookup is an indexed point read; channel is an additional scoping predicate so a caller cannot read a message off a channel it did not address. Replaces the prior get_history full-channel scan in :meth:MessageService.get_message.

Parameters:

Name Type Description Default
channel NotBlankStr

Channel the message must belong to.

required
message_id NotBlankStr

The unique message identifier.

required

Returns:

Type Description
Message | None

The matching :class:Message, or None when no message

Message | None

with that id exists on that channel.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/message_protocol.py
async def get_by_id(
    self,
    channel: NotBlankStr,
    message_id: NotBlankStr,
) -> Message | None:
    """Fetch a single message by ``(channel, id)``.

    ``messages.id`` is the primary key (globally unique), so the
    lookup is an indexed point read; ``channel`` is an additional
    scoping predicate so a caller cannot read a message off a
    channel it did not address. Replaces the prior
    ``get_history`` full-channel scan in
    :meth:`MessageService.get_message`.

    Args:
        channel: Channel the message must belong to.
        message_id: The unique message identifier.

    Returns:
        The matching :class:`Message`, or ``None`` when no message
        with that id exists on that channel.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(message_id)

Delete a message by id (moderation / redaction).

Parameters:

Name Type Description Default
message_id NotBlankStr

The unique message identifier.

required

Returns:

Type Description
bool

True if a row was deleted, False if the id did not

bool

exist.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/message_protocol.py
async def delete(self, message_id: NotBlankStr) -> bool:
    """Delete a message by id (moderation / redaction).

    Args:
        message_id: The unique message identifier.

    Returns:
        ``True`` if a row was deleted, ``False`` if the id did not
        exist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

parked_context_protocol

ParkedContext repository protocol.

ParkedContextRepository

Bases: IdKeyedRepository[ParkedContext, NotBlankStr], Protocol

CRUD interface for parked agent execution contexts.

Composes :class:IdKeyedRepository (ADR-0001). Bespoke per D7: :meth:get_by_approval is a unique-key lookup (each approval has at most one parked context) and :meth:get_by_agent returns rows ordered parked_at DESC, both of which are simpler/cheaper than routing through a FilteredQueryRepository.query call.

save async

save(entity)

Persist a parked context.

Parameters:

Name Type Description Default
entity ParkedContext

The parked context to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/parked_context_protocol.py
async def save(self, entity: ParkedContext) -> None:
    """Persist a parked context.

    Args:
        entity: The parked context to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(entity_id)

Retrieve a parked context by ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The parked context identifier.

required

Returns:

Type Description
ParkedContext | None

The parked context, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/parked_context_protocol.py
async def get(self, entity_id: NotBlankStr) -> ParkedContext | None:
    """Retrieve a parked context by ID.

    Args:
        entity_id: The parked context identifier.

    Returns:
        The parked context, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List parked contexts in id order.

Parameters:

Name Type Description Default
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
tuple[ParkedContext, ...]

Parked contexts in ascending id order.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/parked_context_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[ParkedContext, ...]:
    """List parked contexts in id order.

    Args:
        limit: Maximum rows to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        Parked contexts in ascending id order.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_approval async

get_by_approval(approval_id)

Retrieve a parked context by approval ID.

Parameters:

Name Type Description Default
approval_id NotBlankStr

The approval item identifier.

required

Returns:

Type Description
ParkedContext | None

The parked context, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/parked_context_protocol.py
async def get_by_approval(self, approval_id: NotBlankStr) -> ParkedContext | None:
    """Retrieve a parked context by approval ID.

    Args:
        approval_id: The approval item identifier.

    Returns:
        The parked context, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_agent async

get_by_agent(agent_id, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Retrieve a bounded page of parked contexts for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
ParkedContext

A page of parked contexts for the agent, ordered by

...

parked_at DESC then id ascending (stable secondary

tuple[ParkedContext, ...]

key for deterministic paging). Callers that need every

tuple[ParkedContext, ...]

parked context drain via

tuple[ParkedContext, ...]

func:synthorg.persistence._shared.collect_all.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/parked_context_protocol.py
async def get_by_agent(
    self,
    agent_id: NotBlankStr,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[ParkedContext, ...]:
    """Retrieve a bounded page of parked contexts for an agent.

    Args:
        agent_id: The agent identifier.
        limit: Maximum rows to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        A page of parked contexts for the agent, ordered by
        ``parked_at`` DESC then ``id`` ascending (stable secondary
        key for deterministic paging). Callers that need every
        parked context drain via
        :func:`synthorg.persistence._shared.collect_all`.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(entity_id)

Delete a parked context by ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The parked context identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/parked_context_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete a parked context by ID.

    Args:
        entity_id: The parked context identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

project_protocol

Project repository protocol.

ProjectFilterSpec pydantic-model

Bases: BaseModel

Filter spec for ProjectRepository.query.

All fields are optional; an empty spec matches all projects.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

ProjectRepository

Bases: IdKeyedRepository[Project, NotBlankStr], FilteredQueryRepository[Project, ProjectFilterSpec], Protocol

CRUD + query interface for Project persistence.

The mutation surface is split into atomic create/update methods so the service layer can attach the correct API_PROJECT_CREATED / API_PROJECT_UPDATED audit event without a TOCTOU get + save race. save remains as an upsert convenience for callers that genuinely need "persist regardless of prior state" semantics (migration / import paths); production CRUD must go through the explicit pair.

Composes :class:IdKeyedRepository + :class:FilteredQueryRepository. create and update are atomic lifecycle transitions that the generic save (upsert) cannot distinguish; they preserve separate create-vs-update audit semantics that the service layer depends on.

create async

create(project)

Insert a new project, failing if the id already exists.

Atomic insert-only operation paired with :meth:update to preserve distinct create vs. update audit events. See :meth:save for the upsert convenience when lifecycle is unknown.

Parameters:

Name Type Description Default
project Project

The project to insert.

required

Raises:

Type Description
DuplicateRecordError

A project with the same id is already persisted.

QueryError

If the database operation fails.

Source code in src/synthorg/persistence/project_protocol.py
async def create(self, project: Project) -> None:
    """Insert a new project, failing if the id already exists.

    Atomic insert-only operation paired with :meth:`update` to
    preserve distinct create vs. update audit events. See :meth:`save`
    for the upsert convenience when lifecycle is unknown.

    Args:
        project: The project to insert.

    Raises:
        DuplicateRecordError: A project with the same id is
            already persisted.
        QueryError: If the database operation fails.
    """
    ...

update async

update(project)

Update an existing project, failing if no row matches.

Atomic update-only operation paired with :meth:create to preserve distinct create vs. update audit events. See :meth:save for the upsert convenience when lifecycle is unknown.

Parameters:

Name Type Description Default
project Project

The project to update. project.id selects the row.

required

Raises:

Type Description
RecordNotFoundError

No project with this id exists.

QueryError

If the database operation fails.

Source code in src/synthorg/persistence/project_protocol.py
async def update(self, project: Project) -> None:
    """Update an existing project, failing if no row matches.

    Atomic update-only operation paired with :meth:`create` to
    preserve distinct create vs. update audit events. See :meth:`save`
    for the upsert convenience when lifecycle is unknown.

    Args:
        project: The project to update.  ``project.id`` selects
            the row.

    Raises:
        RecordNotFoundError: No project with this id exists.
        QueryError: If the database operation fails.
    """
    ...

save async

save(entity)

Persist a project via upsert (insert or update).

Used for migration / import paths that legitimately do not know whether the row exists. Production CRUD endpoints must use :meth:create / :meth:update so the API audit event reflects the actual lifecycle.

Parameters:

Name Type Description Default
entity Project

The project to persist.

required

Raises:

Type Description
QueryError

If the database operation fails.

Source code in src/synthorg/persistence/project_protocol.py
async def save(self, entity: Project) -> None:
    """Persist a project via upsert (insert or update).

    Used for migration / import paths that legitimately do not
    know whether the row exists.  Production CRUD endpoints must
    use :meth:`create` / :meth:`update` so the API audit event
    reflects the actual lifecycle.

    Args:
        entity: The project to persist.

    Raises:
        QueryError: If the database operation fails.
    """
    ...

get async

get(entity_id)

Retrieve a project by its ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The project identifier.

required

Returns:

Type Description
Project | None

The project, or None if not found.

Raises:

Type Description
QueryError

If the operation fails.

Source code in src/synthorg/persistence/project_protocol.py
async def get(self, entity_id: NotBlankStr) -> Project | None:
    """Retrieve a project by its ID.

    Args:
        entity_id: The project identifier.

    Returns:
        The project, or ``None`` if not found.

    Raises:
        QueryError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List all projects in ID order.

Parameters:

Name Type Description Default
limit int

Maximum projects to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
tuple[Project, ...]

Projects in ascending ID order, capped at limit rows.

Raises:

Type Description
QueryError

If the operation fails.

Source code in src/synthorg/persistence/project_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Project, ...]:
    """List all projects in ID order.

    Args:
        limit: Maximum projects to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        Projects in ascending ID order, capped at *limit* rows.

    Raises:
        QueryError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List projects matching the filter spec.

Results are ordered by project ID ascending to ensure deterministic pagination across backends.

Parameters:

Name Type Description Default
filter_spec ProjectFilterSpec

Carries optional status and lead filters.

required
limit int

Maximum projects to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
tuple[Project, ...]

Matching projects ordered by ID, capped at limit rows.

Raises:

Type Description
QueryError

If the operation fails.

Source code in src/synthorg/persistence/project_protocol.py
async def query(
    self,
    filter_spec: ProjectFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Project, ...]:
    """List projects matching the filter spec.

    Results are ordered by project ID ascending to ensure
    deterministic pagination across backends.

    Args:
        filter_spec: Carries optional ``status`` and ``lead`` filters.
        limit: Maximum projects to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        Matching projects ordered by ID, capped at *limit* rows.

    Raises:
        QueryError: If the operation fails.
    """
    ...

count async

count(filter_spec)

Count projects matching the filter spec.

Source code in src/synthorg/persistence/project_protocol.py
async def count(self, filter_spec: ProjectFilterSpec) -> int:
    """Count projects matching the filter spec."""
    ...

delete async

delete(entity_id)

Delete a project by ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The project identifier.

required

Returns:

Type Description
bool

True if the project was deleted, False if not found.

Raises:

Type Description
QueryError

If the operation fails.

Source code in src/synthorg/persistence/project_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete a project by ID.

    Args:
        entity_id: The project identifier.

    Returns:
        ``True`` if the project was deleted, ``False`` if not found.

    Raises:
        QueryError: If the operation fails.
    """
    ...

settings_protocol

Settings repository protocol.

SettingRowKey module-attribute

SettingRowKey = tuple[NotBlankStr, NotBlankStr]

Composite primary key: (namespace, key).

SettingRow pydantic-model

Bases: BaseModel

Persistent settings record.

Attributes:

Name Type Description
namespace NotBlankStr

Setting namespace (part of composite primary key).

key NotBlankStr

Setting key within the namespace (part of composite primary key).

value str

Setting value as a string.

updated_at str

ISO 8601 timestamp of the last update.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

namespace pydantic-field

namespace

Setting namespace

key pydantic-field

key

Setting key

value pydantic-field

value

Setting value

updated_at pydantic-field

updated_at

ISO 8601 timestamp

SettingsRepository

Bases: IdKeyedRepository[SettingRow, SettingRowKey], Protocol

CRUD interface for namespaced settings persistence.

Composes :class:IdKeyedRepository (ADR-0001) with composite key (namespace, key) per D8. Bespoke per D7: :meth:get_namespace, :meth:set_many, :meth:delete_namespace, and :meth:delete_namespace_returning_keys encode atomic multi-row and namespace-scoped operations that the generic surface cannot express.

save async

save(entity)

Persist a setting (upsert by composite key).

Parameters:

Name Type Description Default
entity SettingRow

The setting to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def save(self, entity: SettingRow) -> None:
    """Persist a setting (upsert by composite key).

    Args:
        entity: The setting to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(entity_id)

Retrieve a setting by composite key.

Parameters:

Name Type Description Default
entity_id SettingRowKey

(namespace, key) tuple.

required

Returns:

Type Description
SettingRow | None

The setting, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def get(self, entity_id: SettingRowKey) -> SettingRow | None:
    """Retrieve a setting by composite key.

    Args:
        entity_id: ``(namespace, key)`` tuple.

    Returns:
        The setting, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List settings across all namespaces (paginated).

Parameters:

Name Type Description Default
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip from the head of the ordering.

0

Returns:

Type Description
tuple[SettingRow, ...]

Paginated settings ordered by (namespace, key) ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[SettingRow, ...]:
    """List settings across all namespaces (paginated).

    Args:
        limit: Maximum rows to return.
        offset: Rows to skip from the head of the ordering.

    Returns:
        Paginated settings ordered by ``(namespace, key)`` ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(entity_id)

Delete a setting by composite key.

Parameters:

Name Type Description Default
entity_id SettingRowKey

(namespace, key) tuple.

required

Returns:

Type Description
bool

True if a setting was deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def delete(self, entity_id: SettingRowKey) -> bool:
    """Delete a setting by composite key.

    Args:
        entity_id: ``(namespace, key)`` tuple.

    Returns:
        ``True`` if a setting was deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_namespace async

get_namespace(namespace)

Retrieve all settings in a namespace (bespoke per ADR-0001 D7).

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required

Returns:

Type Description
tuple[SettingRow, ...]

All settings in the namespace, sorted by key ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def get_namespace(
    self,
    namespace: NotBlankStr,
) -> tuple[SettingRow, ...]:
    """Retrieve all settings in a namespace (bespoke per ADR-0001 D7).

    Args:
        namespace: Setting namespace.

    Returns:
        All settings in the namespace, sorted by key ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

set_if_unchanged async

set_if_unchanged(entity, expected_updated_at=None)

Upsert a setting with optional compare-and-swap (bespoke per D7).

Parameters:

Name Type Description Default
entity SettingRow

The setting to upsert.

required
expected_updated_at str | None

When provided, enforces atomic CAS -- the row is only updated if the current updated_at matches. Empty string "" signals "only insert if no row exists".

None

Returns:

Type Description
bool

True if the write succeeded, False if the CAS condition

bool

was not met.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def set_if_unchanged(
    self,
    entity: SettingRow,
    expected_updated_at: str | None = None,
) -> bool:
    """Upsert a setting with optional compare-and-swap (bespoke per D7).

    Args:
        entity: The setting to upsert.
        expected_updated_at: When provided, enforces atomic CAS -- the
            row is only updated if the current ``updated_at`` matches.
            Empty string ``""`` signals "only insert if no row exists".

    Returns:
        ``True`` if the write succeeded, ``False`` if the CAS condition
        was not met.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

set_many async

set_many(items, *, expected_updated_at_map=None)

Atomically upsert multiple settings (bespoke per ADR-0001 D7).

Each element of items is a SettingRow instance. expected_updated_at_map optionally supplies a compare-and-swap expected version per (namespace, key) composite key; keys absent from the map are upserted unconditionally. Pass an empty string "" in the map for first-write CAS semantics (the row must not exist yet).

The whole operation is atomic: if any CAS check fails, the transaction rolls back and no rows are modified.

Parameters:

Name Type Description Default
items Sequence[SettingRow]

Settings to upsert.

required
expected_updated_at_map Mapping[SettingRowKey, str] | None

Optional CAS version map keyed by composite (namespace, key).

None

Returns:

Type Description
bool

True if every write succeeded. False if any CAS

bool

check failed; callers should re-read versions and retry

bool

if they need to recover.

Raises:

Type Description
PersistenceError

On DB-level failures (not CAS misses).

Source code in src/synthorg/persistence/settings_protocol.py
async def set_many(
    self,
    items: Sequence[SettingRow],
    *,
    expected_updated_at_map: (Mapping[SettingRowKey, str] | None) = None,
) -> bool:
    """Atomically upsert multiple settings (bespoke per ADR-0001 D7).

    Each element of ``items`` is a ``SettingRow`` instance.
    ``expected_updated_at_map`` optionally supplies a
    compare-and-swap expected version per ``(namespace, key)``
    composite key; keys absent from the map are upserted
    unconditionally. Pass an empty string ``""`` in the map for
    first-write CAS semantics (the row must not exist yet).

    The whole operation is atomic: if any CAS check fails, the
    transaction rolls back and no rows are modified.

    Args:
        items: Settings to upsert.
        expected_updated_at_map: Optional CAS version map keyed by
            composite ``(namespace, key)``.

    Returns:
        ``True`` if every write succeeded. ``False`` if any CAS
        check failed; callers should re-read versions and retry
        if they need to recover.

    Raises:
        PersistenceError: On DB-level failures (not CAS misses).
    """
    ...

delete_namespace async

delete_namespace(namespace)

Delete all settings in a namespace (bespoke per ADR-0001 D7).

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required

Returns:

Type Description
int

Number of settings deleted.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def delete_namespace(self, namespace: NotBlankStr) -> int:
    """Delete all settings in a namespace (bespoke per ADR-0001 D7).

    Args:
        namespace: Setting namespace.

    Returns:
        Number of settings deleted.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete_namespace_returning_keys async

delete_namespace_returning_keys(namespace)

Atomically delete namespace, returning deleted keys (bespoke per D7).

Equivalent to :meth:delete_namespace but returns the keys whose rows were actually removed in a single transaction; callers (notably :class:SettingsService.delete_namespace) rely on this to scope per-key change-publish notifications to the subset that genuinely changed, without a TOCTOU get_namespace + delete_namespace race.

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required

Returns:

Type Description
NotBlankStr

Tuple of keys (within namespace) whose override row

...

was removed by this call, in implementation-defined order.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/settings_protocol.py
async def delete_namespace_returning_keys(
    self,
    namespace: NotBlankStr,
) -> tuple[NotBlankStr, ...]:
    """Atomically delete namespace, returning deleted keys (bespoke per D7).

    Equivalent to :meth:`delete_namespace` but returns the keys
    whose rows were actually removed in a single transaction;
    callers (notably :class:`SettingsService.delete_namespace`)
    rely on this to scope per-key change-publish notifications to
    the subset that genuinely changed, without a TOCTOU
    ``get_namespace`` + ``delete_namespace`` race.

    Args:
        namespace: Setting namespace.

    Returns:
        Tuple of keys (within *namespace*) whose override row
        was removed by this call, in implementation-defined order.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

task_protocol

Task repository protocol.

TaskFilterSpec pydantic-model

Bases: BaseModel

Filter spec for TaskRepository.query (ADR-0001).

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

status pydantic-field

status = None

Filter by task status

assigned_to pydantic-field

assigned_to = None

Filter by assignee agent ID

project pydantic-field

project = None

Filter by project ID

TaskRepository

Bases: IdKeyedRepository[Task, NotBlankStr], FilteredQueryRepository[Task, TaskFilterSpec], Protocol

CRUD + query interface for Task persistence.

Composes :class:IdKeyedRepository + :class:FilteredQueryRepository (ADR-0001).

save async

save(entity)

Persist a task (insert or update by id).

Parameters:

Name Type Description Default
entity Task

The task to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/task_protocol.py
async def save(self, entity: Task) -> None:
    """Persist a task (insert or update by id).

    Args:
        entity: The task to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(entity_id)

Retrieve a task by its ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The task identifier.

required

Returns:

Type Description
Task | None

The task, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/task_protocol.py
async def get(self, entity_id: NotBlankStr) -> Task | None:
    """Retrieve a task by its ID.

    Args:
        entity_id: The task identifier.

    Returns:
        The task, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List tasks with pagination.

Parameters:

Name Type Description Default
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[Task, ...]

Tasks ordered by id ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/task_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Task, ...]:
    """List tasks with pagination.

    Args:
        limit: Maximum rows to return.
        offset: Rows to skip before the window.

    Returns:
        Tasks ordered by id ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List tasks matching the filter spec.

Parameters:

Name Type Description Default
filter_spec TaskFilterSpec

Carries optional filters for status, assigned_to, project.

required
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[Task, ...]

Matching tasks ordered by id ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/task_protocol.py
async def query(
    self,
    filter_spec: TaskFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Task, ...]:
    """List tasks matching the filter spec.

    Args:
        filter_spec: Carries optional filters for status, assigned_to, project.
        limit: Maximum rows to return.
        offset: Rows to skip before the window.

    Returns:
        Matching tasks ordered by id ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

count async

count(filter_spec)

Count tasks matching the filter spec.

Parameters:

Name Type Description Default
filter_spec TaskFilterSpec

Carries optional filters.

required

Returns:

Type Description
int

Total number of matching tasks.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/task_protocol.py
async def count(self, filter_spec: TaskFilterSpec) -> int:
    """Count tasks matching the filter spec.

    Args:
        filter_spec: Carries optional filters.

    Returns:
        Total number of matching tasks.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(entity_id)

Delete a task by ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The task identifier.

required

Returns:

Type Description
bool

True if the task was deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/task_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete a task by ID.

    Args:
        entity_id: The task identifier.

    Returns:
        ``True`` if the task was deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

user_protocol

User and ApiKey repository protocols.

Co-located because every API key belongs to a user (FK) and the two repositories share the auth admin surface.

UserFilterSpec pydantic-model

Bases: BaseModel

Filter spec for UserRepository.query.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

role pydantic-field

role = None

Filter by user role

ApiKeyFilterSpec pydantic-model

Bases: BaseModel

Filter spec for ApiKeyRepository.query.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

user_id pydantic-field

user_id = None

Filter by owner user ID

revoked_only pydantic-field

revoked_only = False

If True, return only revoked keys; if False, return all keys

UserRepository

Bases: IdKeyedRepository[User, NotBlankStr], FilteredQueryRepository[User, UserFilterSpec], Protocol

CRUD + query interface for User persistence.

Composes :class:IdKeyedRepository + :class:FilteredQueryRepository. Bespoke methods: - get_by_username: alternate-key lookup on indexed username column - count_by_role: domain invariant (callers need role-count aggregate) - list_after_id: keyset cursor pagination for the dashboard

save async

save(entity)

Persist a user (insert or update by id).

Parameters:

Name Type Description Default
entity User

The user to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def save(self, entity: User) -> None:
    """Persist a user (insert or update by id).

    Args:
        entity: The user to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(entity_id)

Retrieve a user by its ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The user identifier.

required

Returns:

Type Description
User | None

The user, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def get(self, entity_id: NotBlankStr) -> User | None:
    """Retrieve a user by its ID.

    Args:
        entity_id: The user identifier.

    Returns:
        The user, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_username async

get_by_username(username)

Retrieve a user by username (D7: alternate-key performance).

Parameters:

Name Type Description Default
username NotBlankStr

The login username.

required

Returns:

Type Description
User | None

The user, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def get_by_username(self, username: NotBlankStr) -> User | None:
    """Retrieve a user by username (D7: alternate-key performance).

    Args:
        username: The login username.

    Returns:
        The user, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List human users (excludes the system user) with pagination.

Parameters:

Name Type Description Default
limit int

Maximum users to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[User, ...]

Human users ordered by id ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[User, ...]:
    """List human users (excludes the system user) with pagination.

    Args:
        limit: Maximum users to return.
        offset: Rows to skip before the window.

    Returns:
        Human users ordered by id ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_after_id async

list_after_id(*, after_id=None, limit=DEFAULT_PAGE_SIZE)

Keyset page of human users with id > after_id.

Pushes the cursor predicate into the SQL layer so dashboard pagination stays O(page) regardless of table size, instead of loading a capped prefix and filtering in memory.

Parameters:

Name Type Description Default
after_id NotBlankStr | None

Exclusive lower bound on id; None returns the first page.

None
limit int

Maximum users to return (ascending id order).

DEFAULT_PAGE_SIZE

Returns:

Type Description
User

Up to limit human users with id strictly greater than

...

after_id, excluding the system user.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def list_after_id(
    self,
    *,
    after_id: NotBlankStr | None = None,
    limit: int = DEFAULT_PAGE_SIZE,
) -> tuple[User, ...]:
    """Keyset page of human users with ``id > after_id``.

    Pushes the cursor predicate into the SQL layer so dashboard
    pagination stays O(page) regardless of table size, instead of
    loading a capped prefix and filtering in memory.

    Args:
        after_id: Exclusive lower bound on ``id``; ``None`` returns
            the first page.
        limit: Maximum users to return (ascending ``id`` order).

    Returns:
        Up to *limit* human users with ``id`` strictly greater than
        ``after_id``, excluding the system user.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List users matching the filter spec.

Parameters:

Name Type Description Default
filter_spec UserFilterSpec

Carries optional filter for role.

required
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[User, ...]

Matching users ordered by id ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def query(
    self,
    filter_spec: UserFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[User, ...]:
    """List users matching the filter spec.

    Args:
        filter_spec: Carries optional filter for role.
        limit: Maximum rows to return.
        offset: Rows to skip before the window.

    Returns:
        Matching users ordered by id ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

count async

count(filter_spec)

Count users matching the filter spec.

Parameters:

Name Type Description Default
filter_spec UserFilterSpec

Carries optional filter for role.

required

Returns:

Type Description
int

Total number of matching users.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def count(self, filter_spec: UserFilterSpec) -> int:
    """Count users matching the filter spec.

    Args:
        filter_spec: Carries optional filter for role.

    Returns:
        Total number of matching users.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

count_by_role async

count_by_role(role)

Count users with a specific role (D7: domain invariant).

Parameters:

Name Type Description Default
role HumanRole

The role to filter by.

required

Returns:

Type Description
int

Number of users with the given role.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def count_by_role(self, role: HumanRole) -> int:
    """Count users with a specific role (D7: domain invariant).

    Args:
        role: The role to filter by.

    Returns:
        Number of users with the given role.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(entity_id)

Delete a user by ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The user identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete a user by ID.

    Args:
        entity_id: The user identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

ApiKeyRepository

Bases: IdKeyedRepository[ApiKey, NotBlankStr], FilteredQueryRepository[ApiKey, ApiKeyFilterSpec], Protocol

CRUD + query interface for API key persistence.

Composes :class:IdKeyedRepository + :class:FilteredQueryRepository (ADR-0001). Bespoke method kept per D7: - get_by_hash: alternate-key lookup on indexed key_hash column

save async

save(entity)

Persist an API key (insert or update by id).

Parameters:

Name Type Description Default
entity ApiKey

The API key to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def save(self, entity: ApiKey) -> None:
    """Persist an API key (insert or update by id).

    Args:
        entity: The API key to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(entity_id)

Retrieve an API key by its ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The key identifier.

required

Returns:

Type Description
ApiKey | None

The API key, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def get(self, entity_id: NotBlankStr) -> ApiKey | None:
    """Retrieve an API key by its ID.

    Args:
        entity_id: The key identifier.

    Returns:
        The API key, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_hash async

get_by_hash(key_hash)

Retrieve an API key by its hash (D7: alternate-key performance).

Parameters:

Name Type Description Default
key_hash NotBlankStr

HMAC-SHA256 hex digest.

required

Returns:

Type Description
ApiKey | None

The API key, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def get_by_hash(self, key_hash: NotBlankStr) -> ApiKey | None:
    """Retrieve an API key by its hash (D7: alternate-key performance).

    Args:
        key_hash: HMAC-SHA256 hex digest.

    Returns:
        The API key, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List API keys with pagination.

Parameters:

Name Type Description Default
limit int

Maximum keys to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[ApiKey, ...]

API keys ordered by id ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[ApiKey, ...]:
    """List API keys with pagination.

    Args:
        limit: Maximum keys to return.
        offset: Rows to skip before the window.

    Returns:
        API keys ordered by id ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

List API keys matching the filter spec.

Parameters:

Name Type Description Default
filter_spec ApiKeyFilterSpec

Carries optional filters for user_id and revoked_only.

required
limit int

Maximum rows to return.

DEFAULT_PAGE_SIZE
offset int

Rows to skip before the window.

0

Returns:

Type Description
tuple[ApiKey, ...]

Matching API keys ordered by id ascending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def query(
    self,
    filter_spec: ApiKeyFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[ApiKey, ...]:
    """List API keys matching the filter spec.

    Args:
        filter_spec: Carries optional filters for user_id and revoked_only.
        limit: Maximum rows to return.
        offset: Rows to skip before the window.

    Returns:
        Matching API keys ordered by id ascending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

count async

count(filter_spec)

Count API keys matching the filter spec.

Parameters:

Name Type Description Default
filter_spec ApiKeyFilterSpec

Carries optional filters.

required

Returns:

Type Description
int

Total number of matching API keys.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def count(self, filter_spec: ApiKeyFilterSpec) -> int:
    """Count API keys matching the filter spec.

    Args:
        filter_spec: Carries optional filters.

    Returns:
        Total number of matching API keys.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(entity_id)

Delete an API key by ID.

Parameters:

Name Type Description Default
entity_id NotBlankStr

The key identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/user_protocol.py
async def delete(self, entity_id: NotBlankStr) -> bool:
    """Delete an API key by ID.

    Args:
        entity_id: The key identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

Factory

factory

Factory for creating persistence backends from configuration.

Each company gets its own PersistenceBackend instance, which maps to its own database. This enables multi-tenancy: one database per company, selectable via the PersistenceConfig embedded in each company's RootConfig.

default_registry

default_registry()

Return the module-level registry containing the built-in backends.

Source code in src/synthorg/persistence/factory.py
def default_registry() -> PersistenceBackendRegistry:
    """Return the module-level registry containing the built-in backends."""
    return _REGISTRY

create_backend

create_backend(config)

Create a persistence backend from configuration.

Factory function that maps config.backend to the correct concrete backend class via :class:PersistenceBackendRegistry. Each call returns a new, disconnected backend instance -- the caller is responsible for calling connect() and migrate().

Parameters:

Name Type Description Default
config PersistenceConfig

Persistence configuration (includes backend selection and backend-specific settings).

required

Returns:

Type Description
PersistenceBackend

A new, disconnected backend instance.

Raises:

Type Description
PersistenceConnectionError

If the backend name is not registered, the optional dependency is missing, or backend-specific configuration is absent.

Example::

config = PersistenceConfig(
    backend="sqlite",
    sqlite=SQLiteConfig(path="data/company-a.db"),
)
backend = create_backend(config)
await backend.connect()
await backend.migrate()
Source code in src/synthorg/persistence/factory.py
def create_backend(config: PersistenceConfig) -> PersistenceBackend:
    """Create a persistence backend from configuration.

    Factory function that maps ``config.backend`` to the correct
    concrete backend class via :class:`PersistenceBackendRegistry`.
    Each call returns a new, disconnected backend instance -- the
    caller is responsible for calling ``connect()`` and ``migrate()``.

    Args:
        config: Persistence configuration (includes backend selection
            and backend-specific settings).

    Returns:
        A new, disconnected backend instance.

    Raises:
        PersistenceConnectionError: If the backend name is not
            registered, the optional dependency is missing, or
            backend-specific configuration is absent.

    Example::

        config = PersistenceConfig(
            backend="sqlite",
            sqlite=SQLiteConfig(path="data/company-a.db"),
        )
        backend = create_backend(config)
        await backend.connect()
        await backend.migrate()
    """
    try:
        return _REGISTRY.build(config)
    except StrategyFactoryNotFoundError as exc:
        msg = f"Unknown persistence backend: {config.backend!r}"
        logger.warning(
            PERSISTENCE_BACKEND_UNKNOWN,
            backend=config.backend,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise PersistenceConnectionError(msg) from exc

Errors

persistence_errors

Persistence error hierarchy.

All persistence-related errors inherit from :class:PersistenceError so callers can catch the entire family with a single except clause. PersistenceError itself is rooted in :class:DomainError so the API layer's centralised RFC 9457 dispatch picks up every subtype; the existing handle_record_not_found / handle_persistence_integrity_error / handle_duplicate_record / handle_persistence_error handlers remain registered so persistence-layer 4xx responses keep their fixed public messages ("Resource not found", etc.) instead of leaking record identifiers from str(exc).

Each concrete exception carries an is_retryable class attribute mirroring the provider-layer convention in :mod:synthorg.providers.errors. Callers that implement bounded retry/backoff (e.g. a repository middleware) can branch on this flag without string-matching the driver exception. Default: False. Transient I/O failures override to True.

PersistenceError

PersistenceError(message=None)

Bases: DomainError

Base exception for all persistence operations.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PersistenceConnectionError

PersistenceConnectionError(message=None)

Bases: PersistenceError

Raised when a backend connection cannot be established or is lost.

Network drops, pool exhaustion, and connect timeouts are transient by default -- callers can retry with backoff.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MigrationError

MigrationError(message=None)

Bases: PersistenceError

Raised when a database migration fails.

Non-retryable: a failed migration indicates schema drift or a logic bug, not a transient condition.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

RecordNotFoundError

RecordNotFoundError(message=None)

Bases: PersistenceError

Raised when a requested record does not exist.

Used by ArtifactStorageBackend.retrieve() when no content exists for the given artifact ID. Repository get() methods return None on miss instead of raising.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

DuplicateRecordError

DuplicateRecordError(message=None)

Bases: PersistenceError

Raised when inserting a record that already exists.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

QueryError

QueryError(message=None)

Bases: PersistenceError

Raised when a query fails due to invalid parameters or backend issues.

Transient by default: connection drops and deadlocks during a query surface here and are safe to retry. Deterministic failures (bad SQL, invalid params) use :class:ConstraintViolationError or :class:PersistenceVersionConflictError which override to non-retryable.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ConstraintViolationError

ConstraintViolationError(message, *, constraint)

Bases: QueryError

Raised when a DB constraint (unique, check, trigger) is violated.

Carries a constraint attribute that identifies the violated constraint by its DB-side name (for Postgres) or by a stable token parsed from the error message (for SQLite). Callers can check this attribute to map the violation to a domain error without parsing error strings.

Non-retryable: constraint violations are deterministic for a given input and will not succeed on a bare retry.

Blank constraint (empty / whitespace-only) is normalised to the sentinel "<unknown>" rather than raising. Raising :class:ValueError from __init__ would bypass downstream except PersistenceError handlers; the sentinel keeps the construction inside the persistence-error family so callers that branch on constraint see a known token they can detect.

Source code in src/synthorg/core/persistence_errors.py
def __init__(self, message: str, *, constraint: str) -> None:
    super().__init__(message)
    stripped = constraint.strip()
    self.constraint: str = stripped or self.UNKNOWN_CONSTRAINT

PersistenceVersionConflictError

PersistenceVersionConflictError(message=None)

Bases: QueryError

Raised when an optimistic concurrency version check fails.

Non-retryable at this layer: the caller must re-read, re-apply its intended change, and resubmit with the fresh version. A blind retry would just lose the racing write.

The API layer translates this to :class:synthorg.core.domain_errors.VersionConflictError (the HTTP-aware sibling) so that controllers raise / catch consistently with other 409 paths.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MalformedRowError

MalformedRowError(message=None)

Bases: QueryError

Raised when a persisted row cannot be deserialized into its model.

JSON decode failures, validation errors, and missing-key errors on rows already committed to the database are deterministic data-integrity problems, not transient query failures. Retrying the same read returns the same corrupt row -- it just burns the budget and obscures the underlying integrity issue.

Non-retryable: callers must investigate the source row, not retry.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ArtifactTooLargeError

ArtifactTooLargeError(message=None)

Bases: PersistenceError

Raised when a single artifact exceeds the maximum allowed size.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ArtifactStorageFullError

ArtifactStorageFullError(message=None)

Bases: PersistenceError

Raised when total artifact storage exceeds capacity.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

SQLite Backend

backend

SQLite persistence backend implementation.

SQLitePersistenceBackend

SQLitePersistenceBackend(config)

Bases: _BackendRepositoryAccessors

SQLite implementation of the PersistenceBackend protocol.

Uses a single aiosqlite.Connection with WAL mode enabled by default for file-based databases (in-memory databases do not support WAL). Configurable via SQLiteConfig.wal_mode.

Parameters:

Name Type Description Default
config SQLiteConfig

SQLite-specific configuration.

required
Source code in src/synthorg/persistence/sqlite/backend.py
def __init__(self, config: SQLiteConfig) -> None:  # noqa: PLR0915 -- repo registry setup intentionally enumerates every attribute
    self._config = config
    self._lifecycle_lock = asyncio.Lock()
    # Serializes multi-statement transactions on the single
    # aiosqlite connection. Exposed to repos via ``write_context``.
    self._write_lock = asyncio.Lock()
    self._db: aiosqlite.Connection | None = None
    self._artifacts: SQLiteArtifactRepository | None = None
    self._projects: SQLiteProjectRepository | None = None
    self._tasks: SQLiteTaskRepository | None = None
    self._cost_records: SQLiteCostRecordRepository | None = None
    self._messages: SQLiteMessageRepository | None = None
    self._lifecycle_events: SQLiteLifecycleEventRepository | None = None
    self._task_metrics: SQLiteTaskMetricRepository | None = None
    self._collaboration_metrics: SQLiteCollaborationMetricRepository | None = None
    self._parked_contexts: SQLiteParkedContextRepository | None = None
    self._audit_entries: SQLiteAuditRepository | None = None
    self._provider_audit_events: SQLiteProviderAuditRepo | None = None
    self._preset_overrides: SQLitePresetOverrideRepo | None = None
    self._users: SQLiteUserRepository | None = None
    self._api_keys: SQLiteApiKeyRepository | None = None
    self._checkpoints: SQLiteCheckpointRepository | None = None
    self._heartbeats: SQLiteHeartbeatRepository | None = None
    self._agent_states: SQLiteAgentStateRepository | None = None
    self._settings: SQLiteSettingsRepository | None = None
    self._custom_presets: SQLitePersonalityPresetRepository | None = None
    self._workflow_definitions: SQLiteWorkflowDefinitionRepository | None = None
    self._workflow_executions: SQLiteWorkflowExecutionRepository | None = None
    self._subworkflows: SQLiteSubworkflowRepository | None = None
    self._workflow_versions: VersionRepository[WorkflowDefinition] | None = None
    self._identity_versions: VersionRepository[AgentIdentity] | None = None
    self._evaluation_config_versions: VersionRepository[EvaluationConfig] | None = (
        None
    )
    self._budget_config_versions: VersionRepository[BudgetConfig] | None = None
    self._company_versions: VersionRepository[Company] | None = None
    self._role_versions: VersionRepository[Role] | None = None
    self._decision_records: SQLiteDecisionRepository | None = None
    self._risk_overrides: SQLiteRiskOverrideRepository | None = None
    self._ssrf_violations: SQLiteSsrfViolationRepository | None = None
    self._circuit_breaker_state: SQLiteCircuitBreakerStateRepository | None = None
    self._ceremony_scheduler_state: (
        SQLiteCeremonySchedulerStateRepository | None
    ) = None
    self._meeting_cooldown: SQLiteMeetingCooldownRepository | None = None
    self._tracked_containers: SQLiteTrackedContainerRepository | None = None
    self._project_cost_aggregates: SQLiteProjectCostAggregateRepository | None = (
        None
    )
    self._fine_tune_checkpoints: SQLiteFineTuneCheckpointRepository | None = None
    self._fine_tune_runs: SQLiteFineTuneRunRepository | None = None
    self._training_plans: SQLiteTrainingPlanRepository | None = None
    self._training_results: SQLiteTrainingResultRepository | None = None
    self._custom_rules: SQLiteCustomRuleRepository | None = None
    self._sessions: SQLiteSessionRepository | None = None
    self._refresh_tokens: SQLiteRefreshTokenRepository | None = None
    self._idempotency_keys: SQLiteIdempotencyRepository | None = None
    self._seen_claims: SQLiteSeenClaimsRepository | None = None
    self._principle_overrides: SQLitePrincipleOverrideRepository | None = None
    self._mcp_installations: SQLiteMcpInstallationRepository | None = None
    self._org_facts: SQLiteOrgFactRepository | None = None
    self._ontology_entities: SQLiteOntologyEntityRepository | None = None
    self._ontology_drift: SQLiteOntologyDriftReportRepository | None = None
    # Cached lockout repository -- in-memory cache must survive
    # across ``build_lockouts`` calls, otherwise ``is_locked`` is
    # always False on a freshly-built instance.
    self._lockouts: SQLiteLockoutRepository | None = None
    self._connections: SQLiteConnectionRepository | None = None
    self._connection_secrets: SQLiteConnectionSecretRepository | None = None
    self._oauth_states: SQLiteOAuthStateRepository | None = None
    self._webhook_receipts: SQLiteWebhookReceiptRepository | None = None

kind property

kind

Return the backend discriminator ("sqlite").

config property

config

Public read-only view of the backend's config.

Exposed so callers that need backend-specific details (the backup-handler factory walks the path; tests assert against the resolved sqlite path) do not have to reach for the private _config attribute.

is_connected property

is_connected

Whether the backend has an active connection.

connect async

connect()

Open the SQLite database and configure WAL mode.

Source code in src/synthorg/persistence/sqlite/backend.py
async def connect(self) -> None:
    """Open the SQLite database and configure WAL mode."""
    async with self._lifecycle_lock:
        if self._db is not None:
            logger.debug(PERSISTENCE_BACKEND_ALREADY_CONNECTED)
            return

        logger.info(
            PERSISTENCE_BACKEND_CONNECTING,
            path=self._config.path,
        )
        try:
            self._db = await aiosqlite.connect(self._config.path)
            self._db.row_factory = aiosqlite.Row

            # Enable foreign key enforcement (off by default in SQLite).
            await self._db.execute("PRAGMA foreign_keys = ON")

            if self._config.wal_mode:
                await self._configure_wal()

            self._create_repositories()
        except (sqlite3.Error, OSError) as exc:
            await self._cleanup_failed_connect(exc)

        logger.info(
            PERSISTENCE_BACKEND_CONNECTED,
            path=self._config.path,
        )

get_db

get_db()

Return the shared database connection.

Raises:

Type Description
PersistenceConnectionError

If not yet connected.

Source code in src/synthorg/persistence/sqlite/backend.py
def get_db(self) -> aiosqlite.Connection:
    """Return the shared database connection.

    Raises:
        PersistenceConnectionError: If not yet connected.
    """
    if self._db is None:
        msg = "Database not connected"
        raise PersistenceConnectionError(msg)
    return self._db

write_context async

write_context()

Acquire the shared write lock for the lifetime of the block.

Multi-statement transactions on the single aiosqlite.Connection must serialize so a sibling repo's INSERT cannot interleave between this repo's INSERT and COMMIT. See PersistenceBackend.write_context for the cross-backend contract.

Source code in src/synthorg/persistence/sqlite/backend.py
@asynccontextmanager
async def write_context(self) -> AsyncIterator[None]:
    """Acquire the shared write lock for the lifetime of the block.

    Multi-statement transactions on the single ``aiosqlite.Connection``
    must serialize so a sibling repo's INSERT cannot interleave
    between this repo's INSERT and COMMIT. See
    ``PersistenceBackend.write_context`` for the cross-backend
    contract.
    """
    async with self._write_lock:
        yield

disconnect async

disconnect()

Close the database connection.

Source code in src/synthorg/persistence/sqlite/backend.py
async def disconnect(self) -> None:
    """Close the database connection."""
    async with self._lifecycle_lock:
        if self._db is None:
            return

        logger.info(PERSISTENCE_BACKEND_DISCONNECTING, path=self._config.path)
        try:
            await self._db.close()
            logger.info(
                PERSISTENCE_BACKEND_DISCONNECTED,
                path=self._config.path,
            )
        except (sqlite3.Error, OSError) as exc:
            logger.warning(
                PERSISTENCE_BACKEND_DISCONNECT_ERROR,
                path=self._config.path,
                error=safe_error_description(exc),
                error_type=type(exc).__name__,
            )
        finally:
            self._clear_state()

health_check async

health_check()

Check database connectivity.

Source code in src/synthorg/persistence/sqlite/backend.py
async def health_check(self) -> bool:
    """Check database connectivity."""
    if self._db is None:
        return False
    try:
        cursor = await self._db.execute("SELECT 1")
        row = await cursor.fetchone()
        healthy = row is not None
    except (sqlite3.Error, aiosqlite.Error) as exc:
        logger.warning(
            PERSISTENCE_BACKEND_HEALTH_CHECK,
            healthy=False,
            error=safe_error_description(exc),
            error_type=type(exc).__name__,
        )
        return False
    logger.debug(PERSISTENCE_BACKEND_HEALTH_CHECK, healthy=healthy)
    return healthy

migrate async

migrate()

Apply pending schema migrations via yoyo-migrations.

On failure the backend's repositories are reset so callers cannot reuse a half-initialised state machine (mirrors the postgres backend's pool-close-on-failure behaviour).

Raises:

Type Description
PersistenceConnectionError

If not connected.

MigrationError

If migration application fails.

Source code in src/synthorg/persistence/sqlite/backend.py
async def migrate(self) -> None:
    """Apply pending schema migrations via yoyo-migrations.

    On failure the backend's repositories are reset so callers
    cannot reuse a half-initialised state machine (mirrors the
    postgres backend's pool-close-on-failure behaviour).

    Raises:
        PersistenceConnectionError: If not connected.
        MigrationError: If migration application fails.
    """
    async with self._lifecycle_lock:
        if self._db is None:
            msg = "Cannot migrate: not connected"
            logger.warning(PERSISTENCE_BACKEND_NOT_CONNECTED, error=msg)
            raise PersistenceConnectionError(msg)
        db_url = migrations.to_sqlite_url(self._config.path)
        try:
            await migrations.migrate_apply(db_url)
        except BaseException:
            db = self._db
            if db is not None:
                try:
                    await db.close()
                except (sqlite3.Error, aiosqlite.Error, OSError) as cleanup_exc:
                    logger.warning(
                        PERSISTENCE_BACKEND_DISCONNECT_ERROR,
                        path=self._config.path,
                        error_type=type(cleanup_exc).__name__,
                        error=safe_error_description(cleanup_exc),
                        context="cleanup_after_migration_failure",
                    )
            self._clear_state()
            raise

build_lockouts

build_lockouts(auth_config)

Return the cached lockout repository (built once per connection).

The lockout repo maintains a process-local in-memory cache (_locked) on the auth hot path. Returning a fresh instance on every call would reset that cache and silently "unlock" every user. The cache is cleared on disconnect via _clear_state. The backend's write_context is passed through so lockout transactions serialize with other repositories writing to the same aiosqlite connection.

Source code in src/synthorg/persistence/sqlite/backend.py
def build_lockouts(self, auth_config: AuthConfig) -> LockoutRepository:
    """Return the cached lockout repository (built once per connection).

    The lockout repo maintains a process-local in-memory cache
    (``_locked``) on the auth hot path.  Returning a fresh instance
    on every call would reset that cache and silently "unlock"
    every user.  The cache is cleared on ``disconnect`` via
    ``_clear_state``.  The backend's ``write_context`` is passed
    through so lockout transactions serialize with other
    repositories writing to the same aiosqlite connection.
    """
    if self._lockouts is None:
        self._lockouts = SQLiteLockoutRepository(
            self.get_db(),
            auth_config,
            write_context=self.write_context,
        )
    return self._lockouts

build_escalations

build_escalations(*, notify_channel=None)

Construct an escalation queue repository.

notify_channel is ignored by SQLite (no cross-instance NOTIFY/LISTEN). The backend's write_context is passed through so escalation transactions serialize with other repositories writing to the same aiosqlite connection.

Source code in src/synthorg/persistence/sqlite/backend.py
def build_escalations(
    self,
    *,
    notify_channel: str | None = None,  # noqa: ARG002
) -> EscalationQueueRepository:
    """Construct an escalation queue repository.

    ``notify_channel`` is ignored by SQLite (no cross-instance
    NOTIFY/LISTEN). The backend's ``write_context`` is passed
    through so escalation transactions serialize with other
    repositories writing to the same aiosqlite connection.
    """
    from synthorg.persistence.sqlite.escalation_repo import (  # noqa: PLC0415
        SQLiteEscalationRepository,
    )

    db = self.get_db()
    return SQLiteEscalationRepository(db, write_context=self.write_context)

build_ontology_versioning

build_ontology_versioning()

Construct the ontology versioning service bound to this backend.

Source code in src/synthorg/persistence/sqlite/backend.py
def build_ontology_versioning(
    self,
) -> VersioningService[EntityDefinition]:
    """Construct the ontology versioning service bound to this backend."""
    from synthorg.persistence.sqlite.ontology_versioning import (  # noqa: PLC0415
        create_ontology_versioning,
    )

    return create_ontology_versioning(
        self.get_db(),
        write_context=self.write_context,
    )

get_setting async

get_setting(key)

Retrieve a setting value by key from the _system namespace.

Delegates to self.settings (the SettingsRepository).

Raises:

Type Description
PersistenceConnectionError

If not connected.

Source code in src/synthorg/persistence/sqlite/backend.py
async def get_setting(self, key: NotBlankStr) -> str | None:
    """Retrieve a setting value by key from the ``_system`` namespace.

    Delegates to ``self.settings`` (the ``SettingsRepository``).

    Raises:
        PersistenceConnectionError: If not connected.
    """
    result = await self.settings.get((NotBlankStr("_system"), key))
    return result.value if result is not None else None

set_setting async

set_setting(key, value)

Store a setting value (upsert) in the _system namespace.

Delegates to self.settings (the SettingsRepository).

Raises:

Type Description
PersistenceConnectionError

If not connected.

Source code in src/synthorg/persistence/sqlite/backend.py
async def set_setting(self, key: NotBlankStr, value: str) -> None:
    """Store a setting value (upsert) in the ``_system`` namespace.

    Delegates to ``self.settings`` (the ``SettingsRepository``).

    Raises:
        PersistenceConnectionError: If not connected.
    """
    from synthorg.persistence.settings_protocol import SettingRow  # noqa: PLC0415

    updated_at = format_iso_utc(datetime.now(UTC))
    await self.settings.save(
        SettingRow(
            namespace=NotBlankStr("_system"),
            key=key,
            value=value,
            updated_at=updated_at,
        ),
    )

repositories

SQLite repository implementations for Task, CostRecord, and Message.

HR-related repositories (LifecycleEvent, TaskMetric, CollaborationMetric) are in hr_repositories.py within this package.

SQLiteTaskRepository

SQLiteTaskRepository(db, *, write_context)

SQLite implementation of the TaskRepository protocol.

Parameters:

Name Type Description Default
db Connection

An open aiosqlite connection.

required
write_context WriteContext

Async context manager that serializes writes on the shared connection. Supplied by SQLitePersistenceBackend.write_context in production; tests can pass tests._shared.persistence.make_private_write_context() for standalone construction.

required
Source code in src/synthorg/persistence/sqlite/repositories.py
def __init__(
    self,
    db: aiosqlite.Connection,
    *,
    write_context: WriteContext,
) -> None:
    self._db = db
    self._write_context = write_context

save async

save(task)

Persist a task (upsert semantics).

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def save(self, task: Task) -> None:
        """Persist a task (upsert semantics)."""
        async with self._write_context():
            try:
                params = task.model_dump(mode="json")
                # Tuple fields must be stored as JSON strings.
                params["reviewers"] = _json_list(task.reviewers)
                params["dependencies"] = _json_list(task.dependencies)
                params["artifacts_expected"] = _json_list(task.artifacts_expected)
                params["acceptance_criteria"] = _json_list(
                    task.acceptance_criteria,
                )
                params["delegation_chain"] = _json_list(task.delegation_chain)

                await self._db.execute(
                    """\
INSERT INTO tasks (
    id, title, description, type, priority, project, created_by,
    assigned_to, status, estimated_complexity, budget_limit, deadline,
    max_retries, parent_task_id, task_structure, coordination_topology,
    reviewers, dependencies, artifacts_expected, acceptance_criteria,
    delegation_chain
) VALUES (
    :id, :title, :description, :type, :priority, :project, :created_by,
    :assigned_to, :status, :estimated_complexity, :budget_limit, :deadline,
    :max_retries, :parent_task_id, :task_structure, :coordination_topology,
    :reviewers, :dependencies, :artifacts_expected, :acceptance_criteria,
    :delegation_chain
)
ON CONFLICT(id) DO UPDATE SET
    title=excluded.title,
    description=excluded.description,
    type=excluded.type,
    priority=excluded.priority,
    project=excluded.project,
    created_by=excluded.created_by,
    assigned_to=excluded.assigned_to,
    status=excluded.status,
    estimated_complexity=excluded.estimated_complexity,
    budget_limit=excluded.budget_limit,
    deadline=excluded.deadline,
    max_retries=excluded.max_retries,
    parent_task_id=excluded.parent_task_id,
    task_structure=excluded.task_structure,
    coordination_topology=excluded.coordination_topology,
    reviewers=excluded.reviewers,
    dependencies=excluded.dependencies,
    artifacts_expected=excluded.artifacts_expected,
    acceptance_criteria=excluded.acceptance_criteria,
    delegation_chain=excluded.delegation_chain
""",
                    params,
                )
                await self._db.commit()
            except (sqlite3.Error, aiosqlite.Error) as exc:
                msg = f"Failed to save task {task.id!r}"
                logger.warning(
                    PERSISTENCE_TASK_SAVE_FAILED,
                    task_id=task.id,
                    error_type=type(exc).__name__,
                    error=safe_error_description(exc),
                )
                raise QueryError(msg) from exc

get async

get(task_id)

Retrieve a task by its ID.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def get(self, task_id: str) -> Task | None:
    """Retrieve a task by its ID."""
    try:
        cursor = await self._db.execute(
            f"SELECT {self._TASK_COLUMNS} FROM tasks WHERE id = ?",  # noqa: S608
            (task_id,),
        )
        row = await cursor.fetchone()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = f"Failed to fetch task {task_id!r}"
        logger.warning(
            PERSISTENCE_TASK_FETCH_FAILED,
            task_id=task_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    if row is None:
        logger.debug(PERSISTENCE_TASK_FETCHED, task_id=task_id, found=False)
        return None
    logger.debug(PERSISTENCE_TASK_FETCHED, task_id=task_id, found=True)
    return self._row_to_task(row)

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List tasks with pagination (no filters).

Ordering is deterministic on the primary key id so paginated callers see stable windows.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Task, ...]:
    """List tasks with pagination (no filters).

    Ordering is deterministic on the primary key ``id`` so paginated
    callers see stable windows.
    """
    limit = validate_pagination_args(
        limit, offset, event=PERSISTENCE_TASK_LIST_FAILED
    )
    query = (
        f"SELECT {self._TASK_COLUMNS} FROM tasks ORDER BY id ASC LIMIT ? OFFSET ?"  # noqa: S608
    )
    params: list[object] = [limit, offset]

    try:
        cursor = await self._db.execute(query, params)
        rows = await cursor.fetchall()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = "Failed to list tasks"
        logger.warning(
            PERSISTENCE_TASK_LIST_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    tasks = tuple(self._row_to_task(row) for row in rows)
    logger.debug(PERSISTENCE_TASK_LISTED, count=len(tasks))
    return tasks

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Query tasks matching the filter spec.

Ordering is deterministic on the primary key id so paginated callers see stable windows.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def query(
    self,
    filter_spec: TaskFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Task, ...]:
    """Query tasks matching the filter spec.

    Ordering is deterministic on the primary key ``id`` so paginated
    callers see stable windows.
    """
    limit = validate_pagination_args(
        limit, offset, event=PERSISTENCE_TASK_LIST_FAILED
    )
    clauses: list[str] = []
    params: list[object] = []
    if filter_spec.status is not None:
        clauses.append("status = ?")
        params.append(filter_spec.status.value)
    if filter_spec.assigned_to is not None:
        clauses.append("assigned_to = ?")
        params.append(filter_spec.assigned_to)
    if filter_spec.project is not None:
        clauses.append("project = ?")
        params.append(filter_spec.project)

    query = f"SELECT {self._TASK_COLUMNS} FROM tasks"  # noqa: S608
    if clauses:
        query += " WHERE " + " AND ".join(clauses)
    query += " ORDER BY id ASC LIMIT ? OFFSET ?"
    params.extend([limit, offset])

    try:
        cursor = await self._db.execute(query, params)
        rows = await cursor.fetchall()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = "Failed to list tasks"
        logger.warning(
            PERSISTENCE_TASK_LIST_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    tasks = tuple(self._row_to_task(row) for row in rows)
    logger.debug(PERSISTENCE_TASK_LISTED, count=len(tasks))
    return tasks

count async

count(filter_spec)

Count tasks matching the given filter spec.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def count(self, filter_spec: TaskFilterSpec) -> int:
    """Count tasks matching the given filter spec."""
    clauses: list[str] = []
    params: list[object] = []
    if filter_spec.status is not None:
        clauses.append("status = ?")
        params.append(filter_spec.status.value)
    if filter_spec.assigned_to is not None:
        clauses.append("assigned_to = ?")
        params.append(filter_spec.assigned_to)
    if filter_spec.project is not None:
        clauses.append("project = ?")
        params.append(filter_spec.project)

    query = "SELECT COUNT(*) FROM tasks"
    if clauses:
        query += " WHERE " + " AND ".join(clauses)

    try:
        cursor = await self._db.execute(query, params)
        row = await cursor.fetchone()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = "Failed to count tasks"
        logger.warning(
            PERSISTENCE_TASK_COUNT_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    total = int(row[0]) if row is not None else 0
    logger.debug(PERSISTENCE_TASK_COUNTED, count=total)
    return total

delete async

delete(task_id)

Delete a task by ID.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def delete(self, task_id: str) -> bool:
    """Delete a task by ID."""
    async with self._write_context():
        try:
            cursor = await self._db.execute(
                "DELETE FROM tasks WHERE id = ?", (task_id,)
            )
            await self._db.commit()
            deleted = cursor.rowcount > 0
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to delete task {task_id!r}"
            logger.warning(
                PERSISTENCE_TASK_DELETE_FAILED,
                task_id=task_id,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
    return deleted

SQLiteCostRecordRepository

SQLiteCostRecordRepository(db, *, write_context)

SQLite implementation of the CostRecordRepository protocol.

Parameters:

Name Type Description Default
db Connection

An open aiosqlite connection.

required
write_context WriteContext

Async context manager that serializes writes on the shared connection. Supplied by SQLitePersistenceBackend.write_context in production; tests can pass tests._shared.persistence.make_private_write_context() for standalone construction.

required
Source code in src/synthorg/persistence/sqlite/repositories.py
def __init__(
    self,
    db: aiosqlite.Connection,
    *,
    write_context: WriteContext,
) -> None:
    self._db = db
    self._write_context = write_context

append async

append(event)

Persist a cost record (append-only per AppendOnlyRepository).

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def append(self, event: CostRecord) -> None:
        """Persist a cost record (append-only per AppendOnlyRepository)."""
        async with self._write_context():
            try:
                data = event.model_dump(mode="json")
                # Store a UTC-normalised ISO string so the string
                # comparison in ``purge_before`` (which formats its
                # threshold the same way) is correct regardless of the
                # caller's original offset.
                data["timestamp"] = format_iso_utc(
                    normalize_utc(event.timestamp),
                )
                await self._db.execute(
                    """\
INSERT INTO cost_records (
    agent_id, task_id, provider, model, input_tokens,
    output_tokens, cost, currency, timestamp, call_category
) VALUES (
    :agent_id, :task_id, :provider, :model, :input_tokens,
    :output_tokens, :cost, :currency, :timestamp, :call_category
)""",
                    data,
                )
                await self._db.commit()
            except (sqlite3.Error, aiosqlite.Error) as exc:
                msg = f"Failed to save cost record for agent {event.agent_id!r}"
                logger.warning(
                    PERSISTENCE_COST_RECORD_SAVE_FAILED,
                    agent_id=event.agent_id,
                    task_id=event.task_id,
                    error_type=type(exc).__name__,
                    error=safe_error_description(exc),
                )
                raise QueryError(msg) from exc

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Query cost records matching filter spec with pagination.

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def query(
        self,
        filter_spec: CostRecordFilterSpec,
        *,
        limit: int = DEFAULT_PAGE_SIZE,
        offset: int = 0,
    ) -> tuple[CostRecord, ...]:
        """Query cost records matching filter spec with pagination."""
        limit = validate_pagination_args(
            limit, offset, event=PERSISTENCE_COST_RECORD_QUERY_FAILED
        )
        clauses: list[str] = []
        params: list[object] = []
        if filter_spec.agent_id is not None:
            clauses.append("agent_id = ?")
            params.append(filter_spec.agent_id)
        if filter_spec.task_id is not None:
            clauses.append("task_id = ?")
            params.append(filter_spec.task_id)

        sql = """\
SELECT agent_id, task_id, provider, model, input_tokens,
       output_tokens, cost, currency, timestamp, call_category
FROM cost_records"""
        if clauses:
            sql += " WHERE " + " AND ".join(clauses)
        sql += " ORDER BY timestamp DESC, agent_id ASC, rowid ASC"
        sql += " LIMIT ? OFFSET ?"
        params.extend([limit, offset])

        try:
            cursor = await self._db.execute(sql, params)
            rows = await cursor.fetchall()
            records = tuple(CostRecord.model_validate(dict(row)) for row in rows)
        except (
            sqlite3.Error,
            aiosqlite.Error,
            json.JSONDecodeError,
            ValidationError,
        ) as exc:
            msg = "Failed to query cost records"
            logger.warning(
                PERSISTENCE_COST_RECORD_QUERY_FAILED,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
        logger.debug(PERSISTENCE_COST_RECORD_QUERIED, count=len(records))
        return records

aggregate async

aggregate(*, agent_id=None, task_id=None)

Sum total cost, optionally filtered by agent and/or task.

Raises :class:MixedCurrencyAggregationError when the matched rows span multiple currencies. The distinct-currency probe and the SUM run in a single aggregating query (COUNT(DISTINCT) + GROUP_CONCAT(DISTINCT) + SUM) so the two observations share one snapshot and a concurrent insert cannot change the result between them.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def aggregate(
    self,
    *,
    agent_id: str | None = None,
    task_id: str | None = None,
) -> float:
    """Sum total cost, optionally filtered by agent and/or task.

    Raises :class:`MixedCurrencyAggregationError` when the matched rows
    span multiple currencies.  The distinct-currency probe and the
    ``SUM`` run in a **single** aggregating query (``COUNT(DISTINCT)``
    + ``GROUP_CONCAT(DISTINCT)`` + ``SUM``) so the two observations
    share one snapshot and a concurrent insert cannot change the
    result between them.
    """
    try:
        conditions: list[str] = []
        params: list[str] = []
        if agent_id is not None:
            conditions.append("agent_id = ?")
            params.append(agent_id)
        if task_id is not None:
            conditions.append("task_id = ?")
            params.append(task_id)
        where_clause = (" WHERE " + " AND ".join(conditions)) if conditions else ""

        # where_clause is built from fixed column names only; user
        # values go through bound parameters.
        agg_select = (
            "SELECT "
            "COUNT(DISTINCT currency) AS distinct_count, "
            "GROUP_CONCAT(DISTINCT currency) AS currencies, "
            "COALESCE(SUM(cost), 0.0) AS total_cost "
            "FROM cost_records"
        )
        agg_sql = f"{agg_select}{where_clause}"
        cursor = await self._db.execute(agg_sql, tuple(params))
        row = await cursor.fetchone()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = "Failed to aggregate cost records"
        logger.warning(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    if row is None:
        msg = "aggregate query returned no rows"
        logger.error(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise QueryError(msg)
    distinct_count = safe_int(row[0], default=0)
    currencies_csv = row[1]
    total = safe_float(row[2], default=0.0)
    if distinct_count > 1:
        distinct = frozenset(parse_comma_list(currencies_csv))
        logger.error(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            task_id=task_id,
            currencies=sorted(distinct),
            error="mixed-currency aggregation rejected",
        )
        mixed_msg = "Cannot aggregate costs across mixed currencies"
        raise MixedCurrencyAggregationError(
            mixed_msg,
            currencies=distinct,
            agent_id=agent_id,
            task_id=task_id,
        )
    logger.debug(
        PERSISTENCE_COST_RECORD_AGGREGATED,
        agent_id=agent_id,
        total_cost=total,
    )
    return total

purge_before async

purge_before(threshold)

Delete cost records with timestamp before threshold (retention).

threshold must be timezone-aware: a naive value compared against UTC-formatted stored timestamps would silently delete the wrong window.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def purge_before(self, threshold: datetime) -> int:
    """Delete cost records with timestamp before threshold (retention).

    ``threshold`` must be timezone-aware: a naive value compared
    against UTC-formatted stored timestamps would silently delete
    the wrong window.
    """
    if threshold.tzinfo is None:
        msg = f"threshold must be timezone-aware, got naive {threshold!r}"
        logger.warning(
            PERSISTENCE_COST_RECORD_QUERY_FAILED,
            error="naive_threshold",
            error_type="ValueError",
        )
        raise QueryError(msg)
    aware_threshold = normalize_utc(threshold)
    async with self._write_context():
        try:
            cursor = await self._db.execute(
                "DELETE FROM cost_records WHERE timestamp < ?",
                (format_iso_utc(aware_threshold),),
            )
            await self._db.commit()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = "Failed to purge cost records by threshold"
            logger.warning(
                PERSISTENCE_COST_RECORD_QUERY_FAILED,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
        return cursor.rowcount

SQLiteMessageRepository

SQLiteMessageRepository(db, *, write_context)

SQLite implementation of the MessageRepository protocol.

Parameters:

Name Type Description Default
db Connection

An open aiosqlite connection.

required
write_context WriteContext

Async context manager that serializes writes on the shared connection. Supplied by SQLitePersistenceBackend.write_context in production; tests can pass tests._shared.persistence.make_private_write_context() for standalone construction.

required
Source code in src/synthorg/persistence/sqlite/repositories.py
def __init__(
    self,
    db: aiosqlite.Connection,
    *,
    write_context: WriteContext,
) -> None:
    self._db = db
    self._write_context = write_context

append async

append(message)

Persist a message (append-only per AppendOnlyRepository).

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def append(self, message: Message) -> None:
        """Persist a message (append-only per AppendOnlyRepository)."""
        data = message.model_dump(mode="json")
        msg_id = str(message.id)

        async with self._write_context():
            try:
                await self._db.execute(
                    """\
INSERT INTO messages (
    id, timestamp, sender, "to", type, priority,
    channel, content, attachments, metadata
) VALUES (
    :id, :timestamp, :sender, :to, :type, :priority,
    :channel, :content, :attachments, :metadata
)""",
                    {
                        "id": msg_id,
                        # UTC-normalised ISO so ``purge_before`` /
                        # ``get_history`` ordering compare correctly
                        # regardless of the caller's original offset.
                        "timestamp": format_iso_utc(
                            normalize_utc(message.timestamp),
                        ),
                        "sender": data["sender"],
                        "to": data["to"],
                        "type": data["type"],
                        "priority": data["priority"],
                        "channel": data["channel"],
                        "content": json.dumps(data["parts"]),
                        "attachments": json.dumps(data.get("attachments", [])),
                        "metadata": json.dumps(data["metadata"]),
                    },
                )
                await self._db.commit()
            except sqlite3.IntegrityError as exc:
                await self._safe_rollback(msg_id)
                if is_unique_constraint_error(exc):
                    err_msg = f"Message {msg_id} already exists"
                    logger.warning(PERSISTENCE_MESSAGE_DUPLICATE, message_id=msg_id)
                    raise DuplicateRecordError(err_msg) from exc
                # Other integrity errors (NOT NULL, different UNIQUE).
                msg = f"Failed to save message {msg_id!r}"
                logger.warning(
                    PERSISTENCE_MESSAGE_SAVE_FAILED,
                    message_id=msg_id,
                    error_type=type(exc).__name__,
                    error=safe_error_description(exc),
                )
                raise QueryError(msg) from exc
            except (sqlite3.Error, aiosqlite.Error) as exc:
                await self._safe_rollback(msg_id)
                msg = f"Failed to save message {msg_id!r}"
                logger.warning(
                    PERSISTENCE_MESSAGE_SAVE_FAILED,
                    message_id=msg_id,
                    error_type=type(exc).__name__,
                    error=safe_error_description(exc),
                )
                raise QueryError(msg) from exc

get_history async

get_history(channel, *, limit=DEFAULT_LIST_LIMIT)

Retrieve message history for a channel, newest first.

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def get_history(
        self,
        channel: str,
        *,
        limit: int = DEFAULT_LIST_LIMIT,
    ) -> tuple[Message, ...]:
        """Retrieve message history for a channel, newest first."""
        if limit is not None and limit < 1:
            msg = f"limit must be a positive integer, got {limit}"
            raise QueryError(msg)
        sql = """\
SELECT id, timestamp, sender, "to", type, priority,
       channel, content, attachments, metadata
FROM messages
WHERE channel = ?
ORDER BY timestamp DESC"""
        params: list[object] = [channel]
        if limit is not None:
            sql += " LIMIT ?"
            params.append(limit)

        try:
            cursor = await self._db.execute(sql, params)
            rows = await cursor.fetchall()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to fetch message history for channel {channel!r}"
            logger.warning(
                PERSISTENCE_MESSAGE_HISTORY_FAILED,
                channel=channel,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
        messages = tuple(self._row_to_message(row) for row in rows)
        logger.debug(
            PERSISTENCE_MESSAGE_HISTORY_FETCHED,
            channel=channel,
            count=len(messages),
        )
        return messages

get_by_id async

get_by_id(channel, message_id)

Fetch one message by (channel, id) via the PK point read.

The id predicate alone resolves the row (it is the primary key); the extra channel predicate is a deliberate scoping guard so a caller holding only a message id cannot read a message outside the channel it asked for.

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def get_by_id(
        self,
        channel: str,
        message_id: str,
    ) -> Message | None:
        """Fetch one message by ``(channel, id)`` via the PK point read.

        The ``id`` predicate alone resolves the row (it is the primary
        key); the extra ``channel`` predicate is a deliberate scoping
        guard so a caller holding only a message id cannot read a
        message outside the channel it asked for.
        """
        sql = """\
SELECT id, timestamp, sender, "to", type, priority,
       channel, content, attachments, metadata
FROM messages
WHERE id = ? AND channel = ?"""
        try:
            cursor = await self._db.execute(sql, [message_id, channel])
            row = await cursor.fetchone()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to fetch message {message_id!r}"
            logger.warning(
                PERSISTENCE_MESSAGE_FETCH_FAILED,
                channel=channel,
                message_id=message_id,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
        if row is None:
            return None
        message = self._row_to_message(row)
        logger.debug(
            PERSISTENCE_MESSAGE_FETCHED,
            channel=channel,
            message_id=message_id,
        )
        return message

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Return messages matching the filter spec, newest first.

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def query(
        self,
        filter_spec: MessageFilterSpec,
        *,
        limit: int = DEFAULT_PAGE_SIZE,
        offset: int = 0,
    ) -> tuple[Message, ...]:
        """Return messages matching the filter spec, newest first."""
        limit = validate_pagination_args(
            limit, offset, event=PERSISTENCE_MESSAGE_HISTORY_FAILED
        )
        sql = """\
SELECT id, timestamp, sender, "to", type, priority,
       channel, content, attachments, metadata
FROM messages"""
        params: list[object] = []
        if filter_spec.channel is not None:
            sql += " WHERE channel = ?"
            params.append(filter_spec.channel)
        sql += " ORDER BY timestamp DESC, id ASC LIMIT ? OFFSET ?"
        params.extend([limit, offset])
        try:
            cursor = await self._db.execute(sql, params)
            rows = await cursor.fetchall()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = "Failed to query messages"
            logger.warning(
                PERSISTENCE_MESSAGE_HISTORY_FAILED,
                channel=filter_spec.channel,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
        return tuple(self._row_to_message(row) for row in rows)

purge_before async

purge_before(threshold)

Delete messages with timestamp < threshold (retention).

threshold must be timezone-aware: a naive value compared against UTC-formatted stored timestamps would silently delete the wrong window.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def purge_before(self, threshold: datetime) -> int:
    """Delete messages with ``timestamp < threshold`` (retention).

    ``threshold`` must be timezone-aware: a naive value compared
    against UTC-formatted stored timestamps would silently delete
    the wrong window.
    """
    if threshold.tzinfo is None:
        msg = f"threshold must be timezone-aware, got naive {threshold!r}"
        logger.warning(
            PERSISTENCE_MESSAGE_DELETE_FAILED,
            error="naive_threshold",
            error_type="ValueError",
        )
        raise QueryError(msg)
    aware_threshold = normalize_utc(threshold)
    async with self._write_context():
        try:
            cursor = await self._db.execute(
                "DELETE FROM messages WHERE timestamp < ?",
                (format_iso_utc(aware_threshold),),
            )
            await self._db.commit()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = "Failed to purge messages by threshold"
            logger.warning(
                PERSISTENCE_MESSAGE_DELETE_FAILED,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
        return cursor.rowcount

delete async

delete(message_id)

Delete a single message by id (bespoke per ADR D7, moderation).

Returns True when a row was removed, False when the id did not exist. Concurrent writes are serialized through the shared backend write context. The audit-grade mutation log is emitted by :class:MessageService.delete_message; the repository never logs mutations itself (persistence-boundary rule, see docs/reference/persistence-boundary.md).

Source code in src/synthorg/persistence/sqlite/repositories.py
async def delete(self, message_id: NotBlankStr) -> bool:
    """Delete a single message by id (bespoke per ADR D7, moderation).

    Returns ``True`` when a row was removed, ``False`` when the id
    did not exist. Concurrent writes are serialized through the
    shared backend write context. The audit-grade mutation log is
    emitted by :class:`MessageService.delete_message`; the
    repository never logs mutations itself (persistence-boundary
    rule, see ``docs/reference/persistence-boundary.md``).
    """
    async with self._write_context():
        try:
            cursor = await self._db.execute(
                "DELETE FROM messages WHERE id = ?",
                (message_id,),
            )
            await self._db.commit()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to delete message {message_id!r}"
            logger.warning(
                PERSISTENCE_MESSAGE_DELETE_FAILED,
                message_id=message_id,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            raise QueryError(msg) from exc
        return cursor.rowcount > 0

Postgres Backend

backend

Postgres persistence backend implementation.

Implements the PersistenceBackend protocol on top of psycopg 3 and psycopg_pool.AsyncConnectionPool. Repositories are instantiated per-backend on connect() and receive the shared pool; each pool checkout is an independent transaction, so this backend's write_context is a no-op rather than the in-process lock SQLite acquires to serialize writes across its single connection.

The schema uses native Postgres types (JSONB, TIMESTAMPTZ, BIGINT, BOOLEAN) -- see src/synthorg/persistence/postgres/schema.sql. At the Python level, the protocol surface is identical to the SQLite backend: callers get Pydantic models back either way.

PostgresPersistenceBackend

PostgresPersistenceBackend(config)

Bases: PostgresConnectionMixin, PostgresMigrationMixin

Postgres implementation of the PersistenceBackend protocol.

Uses a psycopg_pool.AsyncConnectionPool for connection management. Each repository method acquires a connection from the pool for the duration of its critical section, so writes are isolated per-connection transaction. There is no shared write lock -- unlike SQLite, Postgres per-connection transactions do not share a single in-process connection.

Parameters:

Name Type Description Default
config PostgresConfig

Postgres-specific configuration.

required
Source code in src/synthorg/persistence/postgres/backend.py
def __init__(self, config: PostgresConfig) -> None:  # noqa: PLR0915 -- repo registry setup intentionally enumerates every attribute
    self._config = config
    self._lifecycle_lock = asyncio.Lock()
    self._pool: AsyncConnectionPool | None = None
    # Repository attributes -- instantiated lazily on connect.
    self._artifacts: ArtifactRepository | None = None
    self._projects: ProjectRepository | None = None
    self._tasks: TaskRepository | None = None
    self._cost_records: CostRecordRepository | None = None
    self._messages: MessageRepository | None = None
    self._lifecycle_events: LifecycleEventRepository | None = None
    self._task_metrics: TaskMetricRepository | None = None
    self._collaboration_metrics: CollaborationMetricRepository | None = None
    self._parked_contexts: ParkedContextRepository | None = None
    self._audit_entries: AuditRepository | None = None
    self._provider_audit_events: PostgresProviderAuditRepo | None = None
    self._preset_overrides: PostgresPresetOverrideRepo | None = None
    self._users: UserRepository | None = None
    self._api_keys: ApiKeyRepository | None = None
    self._checkpoints: CheckpointRepository | None = None
    self._heartbeats: HeartbeatRepository | None = None
    self._agent_states: AgentStateRepository | None = None
    self._settings: SettingsRepository | None = None
    self._custom_presets: PersonalityPresetRepository | None = None
    self._workflow_definitions: WorkflowDefinitionRepository | None = None
    self._workflow_executions: WorkflowExecutionRepository | None = None
    self._subworkflows: SubworkflowRepository | None = None
    self._workflow_versions: VersionRepository[WorkflowDefinition] | None = None
    self._identity_versions: VersionRepository[AgentIdentity] | None = None
    self._evaluation_config_versions: VersionRepository[EvaluationConfig] | None = (
        None
    )
    self._budget_config_versions: VersionRepository[BudgetConfig] | None = None
    self._company_versions: VersionRepository[Company] | None = None
    self._role_versions: VersionRepository[Role] | None = None
    self._decision_records: DecisionRepository | None = None
    self._risk_overrides: RiskOverrideRepository | None = None
    self._ssrf_violations: SsrfViolationRepository | None = None
    self._circuit_breaker_state: CircuitBreakerStateRepository | None = None
    self._ceremony_scheduler_state: CeremonySchedulerStateRepository | None = None
    self._meeting_cooldown: MeetingCooldownRepository | None = None
    self._tracked_containers: TrackedContainerRepository | None = None
    self._training_plans: PostgresTrainingPlanRepository | None = None
    self._training_results: PostgresTrainingResultRepository | None = None
    self._sessions: PostgresSessionRepository | None = None
    self._refresh_tokens: PostgresRefreshTokenRepository | None = None
    self._idempotency_keys: PostgresIdempotencyRepository | None = None
    self._seen_claims: PostgresSeenClaimsRepository | None = None
    self._principle_overrides: PostgresPrincipleOverrideRepository | None = None
    self._mcp_installations: PostgresMcpInstallationRepository | None = None
    self._custom_rules: PostgresCustomRuleRepository | None = None
    self._org_facts: PostgresOrgFactRepository | None = None
    self._ontology_entities: PostgresOntologyEntityRepository | None = None
    self._ontology_drift: PostgresOntologyDriftReportRepository | None = None
    self._connections: PostgresConnectionRepository | None = None
    self._connection_secrets: PostgresConnectionSecretRepository | None = None
    self._oauth_states: PostgresOAuthStateRepository | None = None
    self._webhook_receipts: PostgresWebhookReceiptRepository | None = None
    self._project_cost_aggregates: PostgresProjectCostAggregateRepository | None = (
        None
    )
    self._fine_tune_runs: PostgresFineTuneRunRepository | None = None
    self._fine_tune_checkpoints: PostgresFineTuneCheckpointRepository | None = None

is_connected property

is_connected

Whether the backend has an open pool.

backend_name property

backend_name

Human-readable backend identifier.

kind property

kind

Return the backend discriminator ("postgres").

config property

config

Public read-only view of the backend's Postgres config.

Exposed so callers needing the connection details (the backup-handler factory) do not have to reach for the private _config attribute.

tasks property

tasks

Repository for Task persistence.

cost_records property

cost_records

Repository for CostRecord persistence.

messages property

messages

Repository for Message persistence.

lifecycle_events property

lifecycle_events

Repository for AgentLifecycleEvent persistence.

task_metrics property

task_metrics

Repository for TaskMetricRecord persistence.

collaboration_metrics property

collaboration_metrics

Repository for CollaborationMetricRecord persistence.

parked_contexts property

parked_contexts

Repository for ParkedContext persistence.

audit_entries property

audit_entries

Repository for AuditEntry persistence.

provider_audit_events property

provider_audit_events

Repository for the provider mutation audit log.

preset_overrides property

preset_overrides

Repository for operator-authored provider preset overrides.

decision_records property

decision_records

Repository for DecisionRecord persistence.

users property

users

Repository for User persistence.

api_keys property

api_keys

Repository for ApiKey persistence.

checkpoints property

checkpoints

Repository for Checkpoint persistence.

heartbeats property

heartbeats

Repository for Heartbeat persistence.

agent_states property

agent_states

Repository for AgentRuntimeState persistence.

settings property

settings

Repository for namespaced settings persistence.

artifacts property

artifacts

Repository for Artifact persistence.

projects property

projects

Repository for Project persistence.

custom_presets property

custom_presets

Repository for custom personality preset persistence.

workflow_definitions property

workflow_definitions

Repository for workflow definition persistence.

workflow_executions property

workflow_executions

Repository for workflow execution persistence.

subworkflows property

subworkflows

Repository for subworkflow registry persistence.

workflow_versions property

workflow_versions

Repository for workflow definition version persistence.

identity_versions property

identity_versions

Repository for AgentIdentity version snapshot persistence.

evaluation_config_versions property

evaluation_config_versions

Repository for EvaluationConfig version snapshot persistence.

budget_config_versions property

budget_config_versions

Repository for BudgetConfig version snapshot persistence.

company_versions property

company_versions

Repository for Company version snapshot persistence.

role_versions property

role_versions

Repository for Role version snapshot persistence.

risk_overrides property

risk_overrides

Repository for risk tier override persistence.

ssrf_violations property

ssrf_violations

Repository for SSRF violation record persistence.

circuit_breaker_state property

circuit_breaker_state

Repository for circuit breaker state persistence.

ceremony_scheduler_state property

ceremony_scheduler_state

Repository for ceremony scheduler per-sprint state snapshots.

meeting_cooldown property

meeting_cooldown

Repository for meeting cooldown last-triggered timestamps.

tracked_containers property

tracked_containers

Repository for Docker sandbox tracked-container records.

project_cost_aggregates property

project_cost_aggregates

Repository for durable project cost aggregates.

Raises:

Type Description
PersistenceConnectionError

If not connected.

fine_tune_checkpoints property

fine_tune_checkpoints

Repository for fine-tune checkpoint persistence.

fine_tune_runs property

fine_tune_runs

Repository for fine-tune pipeline runs.

connections property

connections

Repository for external service connection persistence.

connection_secrets property

connection_secrets

Repository for encrypted connection secret persistence.

oauth_states property

oauth_states

Repository for transient OAuth state persistence.

webhook_receipts property

webhook_receipts

Repository for webhook receipt log persistence.

training_plans property

training_plans

Repository for training plan persistence.

training_results property

training_results

Repository for training result persistence.

custom_rules property

custom_rules

Repository for custom signal rule persistence.

sessions property

sessions

Repository for hybrid session state (durable + in-memory cache).

refresh_tokens property

refresh_tokens

Repository for single-use refresh-token rotation.

idempotency_keys property

idempotency_keys

Repository for persistent idempotency keys.

seen_claims property

seen_claims

Repository for worker TaskClaim dedup persistence.

principle_overrides property

principle_overrides

Repository for rollback-restored principle overrides.

mcp_installations property

mcp_installations

Repository for MCP catalog installations.

org_facts property

org_facts

Repository for organizational fact persistence (MVCC).

ontology_entities property

ontology_entities

Repository for ontology entity definitions.

ontology_drift property

ontology_drift

Repository for ontology drift reports.

get_db

get_db()

Return the shared connection pool.

Raises:

Type Description
PersistenceConnectionError

If not yet connected.

Source code in src/synthorg/persistence/postgres/backend.py
def get_db(self) -> AsyncConnectionPool:
    """Return the shared connection pool.

    Raises:
        PersistenceConnectionError: If not yet connected.
    """
    if self._pool is None:
        msg = "Postgres backend not connected"
        logger.warning(PERSISTENCE_BACKEND_NOT_CONNECTED, error=msg)
        raise PersistenceConnectionError(msg)
    return self._pool

write_context async

write_context()

No-op for Postgres.

Each repository checks out its own connection from the async pool; transactions on different connections cannot interleave at the statement level. Implementing the protocol method as a no-op keeps the cross-backend interface honest and lets callers write async with backend.write_context() without backend-specific branching.

Source code in src/synthorg/persistence/postgres/backend.py
@asynccontextmanager
async def write_context(self) -> AsyncIterator[None]:
    """No-op for Postgres.

    Each repository checks out its own connection from the async
    pool; transactions on different connections cannot interleave
    at the statement level. Implementing the protocol method as a
    no-op keeps the cross-backend interface honest and lets
    callers write ``async with backend.write_context()`` without
    backend-specific branching.
    """
    yield

build_lockouts

build_lockouts(auth_config)

Construct a lockout repository using this backend's pool.

Source code in src/synthorg/persistence/postgres/backend.py
def build_lockouts(self, auth_config: AuthConfig) -> LockoutRepository:
    """Construct a lockout repository using this backend's pool."""
    pool = self.get_db()
    return PostgresLockoutRepository(pool, auth_config)

build_escalations

build_escalations(*, notify_channel=None)

Construct an escalation queue repository on the shared pool.

notify_channel enables cross-instance pg_notify publishing when the escalation subsystem has enabled it.

Source code in src/synthorg/persistence/postgres/backend.py
def build_escalations(
    self,
    *,
    notify_channel: str | None = None,
) -> EscalationQueueRepository:
    """Construct an escalation queue repository on the shared pool.

    ``notify_channel`` enables cross-instance pg_notify publishing
    when the escalation subsystem has enabled it.
    """
    from synthorg.persistence.postgres.escalation_repo import (  # noqa: PLC0415
        PostgresEscalationRepository,
    )

    pool = self.get_db()
    return PostgresEscalationRepository(pool, notify_channel=notify_channel)

build_ontology_versioning

build_ontology_versioning()

Construct the ontology versioning service bound to this backend.

Source code in src/synthorg/persistence/postgres/backend.py
def build_ontology_versioning(
    self,
) -> VersioningService[EntityDefinition]:
    """Construct the ontology versioning service bound to this backend."""
    from synthorg.persistence.postgres.ontology_versioning import (  # noqa: PLC0415
        create_postgres_ontology_versioning,
    )

    return create_postgres_ontology_versioning(self.get_db())

get_setting async

get_setting(key)

Retrieve a setting value by key from the _system namespace.

Delegates to self.settings (the SettingsRepository).

Raises:

Type Description
PersistenceConnectionError

If not connected or settings repository is not yet ported.

Source code in src/synthorg/persistence/postgres/backend.py
async def get_setting(self, key: NotBlankStr) -> str | None:
    """Retrieve a setting value by key from the ``_system`` namespace.

    Delegates to ``self.settings`` (the ``SettingsRepository``).

    Raises:
        PersistenceConnectionError: If not connected or settings
            repository is not yet ported.
    """
    entity = await self.settings.get((NotBlankStr("_system"), key))
    return entity.value if entity is not None else None

set_setting async

set_setting(key, value)

Store a setting value (upsert) in the _system namespace.

Delegates to self.settings (the SettingsRepository).

Raises:

Type Description
PersistenceConnectionError

If not connected or settings repository is not yet ported.

Source code in src/synthorg/persistence/postgres/backend.py
async def set_setting(self, key: NotBlankStr, value: str) -> None:
    """Store a setting value (upsert) in the ``_system`` namespace.

    Delegates to ``self.settings`` (the ``SettingsRepository``).

    Raises:
        PersistenceConnectionError: If not connected or settings
            repository is not yet ported.
    """
    updated_at = datetime.now(UTC)
    entity = SettingRow(
        namespace=NotBlankStr("_system"),
        key=key,
        value=value,
        updated_at=format_iso_utc(updated_at),
    )
    await self.settings.save(entity)

repositories

Postgres repository implementations for Task, CostRecord, and Message.

HR-related repositories (LifecycleEvent, TaskMetric, CollaborationMetric) are in hr_repositories.py within this package.

PostgresTaskRepository

PostgresTaskRepository(pool)

Postgres implementation of the TaskRepository protocol.

Parameters:

Name Type Description Default
pool AsyncConnectionPool

An open psycopg_pool.AsyncConnectionPool.

required
Source code in src/synthorg/persistence/postgres/repositories.py
def __init__(self, pool: AsyncConnectionPool) -> None:
    self._pool = pool

save async

save(task)

Persist a task (upsert semantics).

Source code in src/synthorg/persistence/postgres/repositories.py
async def save(self, task: Task) -> None:
    """Persist a task (upsert semantics)."""
    params = _task_params(task)
    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute(
                """
                INSERT INTO tasks (
                    id, title, description, type, priority, project, created_by,
                    assigned_to, status, estimated_complexity, budget_limit,
                    deadline, max_retries, parent_task_id, task_structure,
                    coordination_topology, reviewers, dependencies,
                    artifacts_expected, acceptance_criteria, delegation_chain
                ) VALUES (
                    %(id)s, %(title)s, %(description)s, %(type)s, %(priority)s,
                    %(project)s, %(created_by)s, %(assigned_to)s, %(status)s,
                    %(estimated_complexity)s, %(budget_limit)s, %(deadline)s,
                    %(max_retries)s, %(parent_task_id)s, %(task_structure)s,
                    %(coordination_topology)s, %(reviewers)s, %(dependencies)s,
                    %(artifacts_expected)s, %(acceptance_criteria)s,
                    %(delegation_chain)s
                )
                ON CONFLICT(id) DO UPDATE SET
                    title=EXCLUDED.title,
                    description=EXCLUDED.description,
                    type=EXCLUDED.type,
                    priority=EXCLUDED.priority,
                    project=EXCLUDED.project,
                    created_by=EXCLUDED.created_by,
                    assigned_to=EXCLUDED.assigned_to,
                    status=EXCLUDED.status,
                    estimated_complexity=EXCLUDED.estimated_complexity,
                    budget_limit=EXCLUDED.budget_limit,
                    deadline=EXCLUDED.deadline,
                    max_retries=EXCLUDED.max_retries,
                    parent_task_id=EXCLUDED.parent_task_id,
                    task_structure=EXCLUDED.task_structure,
                    coordination_topology=EXCLUDED.coordination_topology,
                    reviewers=EXCLUDED.reviewers,
                    dependencies=EXCLUDED.dependencies,
                    artifacts_expected=EXCLUDED.artifacts_expected,
                    acceptance_criteria=EXCLUDED.acceptance_criteria,
                    delegation_chain=EXCLUDED.delegation_chain
                """,
                params,
            )
            await conn.commit()
    except psycopg.Error as exc:
        msg = f"Failed to save task {task.id!r}"
        logger.warning(
            PERSISTENCE_TASK_SAVE_FAILED,
            task_id=task.id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc

get async

get(task_id)

Retrieve a task by its ID.

Source code in src/synthorg/persistence/postgres/repositories.py
async def get(self, task_id: str) -> Task | None:
    """Retrieve a task by its ID."""
    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(
                f"SELECT {self._TASK_COLUMNS} FROM tasks WHERE id = %s",  # noqa: S608
                (task_id,),
            )
            row = await cur.fetchone()
    except psycopg.Error as exc:
        msg = f"Failed to fetch task {task_id!r}"
        logger.warning(
            PERSISTENCE_TASK_FETCH_FAILED,
            task_id=task_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    if row is None:
        logger.debug(PERSISTENCE_TASK_FETCHED, task_id=task_id, found=False)
        return None
    logger.debug(PERSISTENCE_TASK_FETCHED, task_id=task_id, found=True)
    return self._row_to_task(row)

list_items async

list_items(*, limit=DEFAULT_PAGE_SIZE, offset=0)

List tasks with pagination (no filters).

Ordering is deterministic on the primary key id so paginated callers see stable windows.

Raises:

Type Description
QueryError

If the query fails or pagination is out of range.

Source code in src/synthorg/persistence/postgres/repositories.py
async def list_items(
    self,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Task, ...]:
    """List tasks with pagination (no filters).

    Ordering is deterministic on the primary key ``id`` so paginated
    callers see stable windows.

    Raises:
        QueryError: If the query fails or pagination is out of range.
    """
    limit = validate_pagination_args(
        limit, offset, event=PERSISTENCE_TASK_LIST_FAILED
    )
    query = (
        f"SELECT {self._TASK_COLUMNS} FROM tasks ORDER BY id ASC LIMIT %s OFFSET %s"  # noqa: S608
    )
    params: list[object] = [limit, offset]

    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(query, params)
            rows = await cur.fetchall()
    except psycopg.Error as exc:
        msg = "Failed to list tasks"
        logger.warning(
            PERSISTENCE_TASK_LIST_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    tasks = tuple(self._row_to_task(row) for row in rows)
    logger.debug(PERSISTENCE_TASK_LISTED, count=len(tasks))
    return tasks

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Query tasks matching the filter spec.

Ordering is deterministic on the primary key id so paginated callers see stable windows.

Raises:

Type Description
QueryError

If the query fails or pagination is out of range.

Source code in src/synthorg/persistence/postgres/repositories.py
async def query(
    self,
    filter_spec: TaskFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Task, ...]:
    """Query tasks matching the filter spec.

    Ordering is deterministic on the primary key ``id`` so paginated
    callers see stable windows.

    Raises:
        QueryError: If the query fails or pagination is out of range.
    """
    limit = validate_pagination_args(
        limit, offset, event=PERSISTENCE_TASK_LIST_FAILED
    )
    clauses: list[str] = []
    params: list[object] = []
    if filter_spec.status is not None:
        clauses.append("status = %s")
        params.append(filter_spec.status.value)
    if filter_spec.assigned_to is not None:
        clauses.append("assigned_to = %s")
        params.append(filter_spec.assigned_to)
    if filter_spec.project is not None:
        clauses.append("project = %s")
        params.append(filter_spec.project)

    query = f"SELECT {self._TASK_COLUMNS} FROM tasks"  # noqa: S608
    if clauses:
        query += " WHERE " + " AND ".join(clauses)
    query += " ORDER BY id ASC LIMIT %s OFFSET %s"
    params.extend([limit, offset])

    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(query, params)
            rows = await cur.fetchall()
    except psycopg.Error as exc:
        msg = "Failed to list tasks"
        logger.warning(
            PERSISTENCE_TASK_LIST_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    tasks = tuple(self._row_to_task(row) for row in rows)
    logger.debug(PERSISTENCE_TASK_LISTED, count=len(tasks))
    return tasks

count async

count(filter_spec)

Count tasks matching the given filter spec.

Source code in src/synthorg/persistence/postgres/repositories.py
async def count(self, filter_spec: TaskFilterSpec) -> int:
    """Count tasks matching the given filter spec."""
    clauses: list[str] = []
    params: list[object] = []
    if filter_spec.status is not None:
        clauses.append("status = %s")
        params.append(filter_spec.status.value)
    if filter_spec.assigned_to is not None:
        clauses.append("assigned_to = %s")
        params.append(filter_spec.assigned_to)
    if filter_spec.project is not None:
        clauses.append("project = %s")
        params.append(filter_spec.project)

    query = "SELECT COUNT(*) AS c FROM tasks"
    if clauses:
        query += " WHERE " + " AND ".join(clauses)

    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(query, params)
            row = await cur.fetchone()
    except psycopg.Error as exc:
        msg = "Failed to count tasks"
        logger.warning(
            PERSISTENCE_TASK_COUNT_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    total = int(row["c"]) if row is not None else 0
    logger.debug(PERSISTENCE_TASK_COUNTED, count=total)
    return total

delete async

delete(task_id)

Delete a task by ID.

Source code in src/synthorg/persistence/postgres/repositories.py
async def delete(self, task_id: str) -> bool:
    """Delete a task by ID."""
    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute("DELETE FROM tasks WHERE id = %s", (task_id,))
            deleted = cur.rowcount > 0
            await conn.commit()
    except psycopg.Error as exc:
        msg = f"Failed to delete task {task_id!r}"
        logger.warning(
            PERSISTENCE_TASK_DELETE_FAILED,
            task_id=task_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    return deleted

PostgresCostRecordRepository

PostgresCostRecordRepository(pool)

Postgres implementation of the CostRecordRepository protocol.

Parameters:

Name Type Description Default
pool AsyncConnectionPool

An open psycopg_pool.AsyncConnectionPool.

required
Source code in src/synthorg/persistence/postgres/repositories.py
def __init__(self, pool: AsyncConnectionPool) -> None:
    self._pool = pool

append async

append(event)

Persist a cost record (append-only per AppendOnlyRepository).

Source code in src/synthorg/persistence/postgres/repositories.py
async def append(self, event: CostRecord) -> None:
    """Persist a cost record (append-only per AppendOnlyRepository)."""
    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute(
                """
                INSERT INTO cost_records (
                    agent_id, task_id, provider, model, input_tokens,
                    output_tokens, cost, currency, timestamp,
                    call_category
                ) VALUES (
                    %(agent_id)s, %(task_id)s, %(provider)s, %(model)s,
                    %(input_tokens)s, %(output_tokens)s, %(cost)s,
                    %(currency)s, %(timestamp)s, %(call_category)s
                )
                """,
                {
                    "agent_id": event.agent_id,
                    "task_id": event.task_id,
                    "provider": event.provider,
                    "model": event.model,
                    "input_tokens": event.input_tokens,
                    "output_tokens": event.output_tokens,
                    "cost": event.cost,
                    "currency": event.currency,
                    "timestamp": event.timestamp,
                    "call_category": event.call_category,
                },
            )
            await conn.commit()
    except psycopg.Error as exc:
        msg = f"Failed to save cost record for agent {event.agent_id!r}"
        logger.warning(
            PERSISTENCE_COST_RECORD_SAVE_FAILED,
            agent_id=event.agent_id,
            task_id=event.task_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Query cost records matching filter spec with pagination.

Raises:

Type Description
QueryError

If the query fails or pagination is out of range.

Source code in src/synthorg/persistence/postgres/repositories.py
async def query(
    self,
    filter_spec: CostRecordFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[CostRecord, ...]:
    """Query cost records matching filter spec with pagination.

    Raises:
        QueryError: If the query fails or pagination is out of range.
    """
    limit = validate_pagination_args(
        limit, offset, event=PERSISTENCE_COST_RECORD_QUERY_FAILED
    )
    clauses: list[str] = []
    params: list[object] = []
    if filter_spec.agent_id is not None:
        clauses.append("agent_id = %s")
        params.append(filter_spec.agent_id)
    if filter_spec.task_id is not None:
        clauses.append("task_id = %s")
        params.append(filter_spec.task_id)

    sql = (
        "SELECT agent_id, task_id, provider, model, input_tokens, "
        "output_tokens, cost, currency, timestamp, call_category "
        "FROM cost_records"
    )
    if clauses:
        sql += " WHERE " + " AND ".join(clauses)
    sql += " ORDER BY timestamp DESC, agent_id ASC, rowid ASC"
    sql += " LIMIT %s OFFSET %s"
    params.extend([limit, offset])

    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(sql, params)
            rows = await cur.fetchall()
    except psycopg.Error as exc:
        msg = "Failed to query cost records"
        logger.warning(
            PERSISTENCE_COST_RECORD_QUERY_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    try:
        records = tuple(CostRecord.model_validate(row) for row in rows)
    except ValidationError as exc:
        msg = "Failed to deserialize cost records"
        logger.warning(
            PERSISTENCE_COST_RECORD_QUERY_FAILED,
            error_type="ValidationError",
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    logger.debug(PERSISTENCE_COST_RECORD_QUERIED, count=len(records))
    return records

aggregate async

aggregate(*, agent_id=None, task_id=None)

Sum total cost, optionally filtered by agent and/or task.

Raises :class:MixedCurrencyAggregationError when the matched rows span multiple currencies. The distinct-currency probe and the SUM run in a single aggregating query (COUNT(DISTINCT) + STRING_AGG(DISTINCT) + SUM) so the two observations share one snapshot and a concurrent commit cannot change the result between them.

Source code in src/synthorg/persistence/postgres/repositories.py
async def aggregate(
    self,
    *,
    agent_id: str | None = None,
    task_id: str | None = None,
) -> float:
    """Sum total cost, optionally filtered by agent and/or task.

    Raises :class:`MixedCurrencyAggregationError` when the matched rows
    span multiple currencies.  The distinct-currency probe and the
    ``SUM`` run in a **single** aggregating query
    (``COUNT(DISTINCT)`` + ``STRING_AGG(DISTINCT)`` + ``SUM``) so the
    two observations share one snapshot and a concurrent commit
    cannot change the result between them.
    """
    conditions: list[str] = []
    params: list[str] = []
    if agent_id is not None:
        conditions.append("agent_id = %s")
        params.append(agent_id)
    if task_id is not None:
        conditions.append("task_id = %s")
        params.append(task_id)
    where_clause = (" WHERE " + " AND ".join(conditions)) if conditions else ""
    # where_clause is built from fixed column names only; user values
    # go through bound %s parameters.
    agg_select = (
        "SELECT "
        "COUNT(DISTINCT currency) AS distinct_count, "
        "STRING_AGG(DISTINCT currency, ',') AS currencies, "
        "COALESCE(SUM(cost), 0.0) AS total_cost "
        "FROM cost_records"
    )
    agg_sql = f"{agg_select}{where_clause}"

    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute(agg_sql, params)
            row = await cur.fetchone()
    except psycopg.Error as exc:
        msg = "Failed to aggregate cost records"
        logger.warning(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    if row is None:
        msg = "aggregate query returned no rows"
        logger.error(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise QueryError(msg)
    distinct_count = safe_int(row[0], default=0)
    currencies_csv = row[1]
    total = safe_float(row[2], default=0.0)
    if distinct_count > 1:
        distinct = frozenset(parse_comma_list(currencies_csv))
        logger.error(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            task_id=task_id,
            currencies=sorted(distinct),
            error="mixed-currency aggregation rejected",
        )
        mixed_msg = "Cannot aggregate costs across mixed currencies"
        raise MixedCurrencyAggregationError(
            mixed_msg,
            currencies=distinct,
            agent_id=agent_id,
            task_id=task_id,
        )
    logger.debug(
        PERSISTENCE_COST_RECORD_AGGREGATED,
        agent_id=agent_id,
        total_cost=total,
    )
    return total

purge_before async

purge_before(threshold)

Delete cost records with timestamp before threshold (retention).

Source code in src/synthorg/persistence/postgres/repositories.py
async def purge_before(self, threshold: datetime) -> int:
    """Delete cost records with timestamp before threshold (retention)."""
    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute(
                "DELETE FROM cost_records WHERE timestamp < %s",
                (normalize_utc(threshold),),
            )
            deleted_count = cur.rowcount
            await conn.commit()
    except psycopg.Error as exc:
        msg = "Failed to purge cost records by threshold"
        logger.warning(
            PERSISTENCE_COST_RECORD_QUERY_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    return deleted_count

PostgresMessageRepository

PostgresMessageRepository(pool)

Postgres implementation of the MessageRepository protocol.

content is stored as TEXT containing a JSON-serialized parts array (same as SQLite, for protocol compatibility). metadata and attachments use native JSONB.

Parameters:

Name Type Description Default
pool AsyncConnectionPool

An open psycopg_pool.AsyncConnectionPool.

required
Source code in src/synthorg/persistence/postgres/repositories.py
def __init__(self, pool: AsyncConnectionPool) -> None:
    self._pool = pool

append async

append(message)

Persist a message (append-only per AppendOnlyRepository).

Source code in src/synthorg/persistence/postgres/repositories.py
async def append(self, message: Message) -> None:
    """Persist a message (append-only per AppendOnlyRepository)."""
    data = message.model_dump(mode="json")
    msg_id = str(message.id)

    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute(
                """
                INSERT INTO messages (
                    id, timestamp, sender, "to", type, priority,
                    channel, content, attachments, metadata
                ) VALUES (
                    %(id)s, %(timestamp)s, %(sender)s, %(to)s, %(type)s,
                    %(priority)s, %(channel)s, %(content)s, %(attachments)s,
                    %(metadata)s
                )
                """,
                {
                    "id": msg_id,
                    "timestamp": message.timestamp,
                    "sender": data["sender"],
                    "to": data["to"],
                    "type": data["type"],
                    "priority": data["priority"],
                    "channel": data["channel"],
                    "content": json.dumps(data["parts"]),
                    "attachments": Jsonb(data.get("attachments", [])),
                    "metadata": Jsonb(data["metadata"]),
                },
            )
            await conn.commit()
    except psycopg.errors.UniqueViolation as exc:
        err_msg = f"Message {msg_id} already exists"
        logger.warning(PERSISTENCE_MESSAGE_DUPLICATE, message_id=msg_id)
        raise DuplicateRecordError(err_msg) from exc
    except psycopg.Error as exc:
        msg = f"Failed to save message {msg_id!r}"
        logger.warning(
            PERSISTENCE_MESSAGE_SAVE_FAILED,
            message_id=msg_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc

get_history async

get_history(channel, *, limit=DEFAULT_LIST_LIMIT)

Retrieve message history for a channel, newest first.

Source code in src/synthorg/persistence/postgres/repositories.py
async def get_history(
    self,
    channel: str,
    *,
    limit: int = DEFAULT_LIST_LIMIT,
) -> tuple[Message, ...]:
    """Retrieve message history for a channel, newest first."""
    if limit is not None and (
        not isinstance(limit, int) or isinstance(limit, bool) or limit < 1
    ):
        msg = f"limit must be a positive integer, got {limit!r}"
        logger.warning(
            PERSISTENCE_MESSAGE_HISTORY_FAILED,
            channel=channel,
            error=msg,
        )
        raise QueryError(msg)
    sql = (
        'SELECT id, timestamp, sender, "to", type, priority, '
        "channel, content, attachments, metadata "
        "FROM messages "
        "WHERE channel = %s "
        "ORDER BY timestamp DESC"
    )
    params: list[object] = [channel]
    if limit is not None:
        sql += " LIMIT %s"
        params.append(limit)

    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(sql, params)
            rows = await cur.fetchall()
    except psycopg.Error as exc:
        msg = f"Failed to fetch message history for channel {channel!r}"
        logger.warning(
            PERSISTENCE_MESSAGE_HISTORY_FAILED,
            channel=channel,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    messages = tuple(self._row_to_message(row) for row in rows)
    logger.debug(
        PERSISTENCE_MESSAGE_HISTORY_FETCHED,
        channel=channel,
        count=len(messages),
    )
    return messages

get_by_id async

get_by_id(channel, message_id)

Fetch one message by (channel, id) via the PK point read.

The id predicate alone resolves the row (it is the primary key); the extra channel predicate is a deliberate scoping guard so a caller holding only a message id cannot read a message outside the channel it asked for.

Source code in src/synthorg/persistence/postgres/repositories.py
async def get_by_id(
    self,
    channel: str,
    message_id: str,
) -> Message | None:
    """Fetch one message by ``(channel, id)`` via the PK point read.

    The ``id`` predicate alone resolves the row (it is the primary
    key); the extra ``channel`` predicate is a deliberate scoping
    guard so a caller holding only a message id cannot read a
    message outside the channel it asked for.
    """
    sql = (
        'SELECT id, timestamp, sender, "to", type, priority, '
        "channel, content, attachments, metadata "
        "FROM messages "
        "WHERE id = %s AND channel = %s"
    )
    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(sql, [message_id, channel])
            row = await cur.fetchone()
    except psycopg.Error as exc:
        msg = f"Failed to fetch message {message_id!r}"
        logger.warning(
            PERSISTENCE_MESSAGE_FETCH_FAILED,
            channel=channel,
            message_id=message_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    if row is None:
        return None
    message = self._row_to_message(row)
    logger.debug(
        PERSISTENCE_MESSAGE_FETCHED,
        channel=channel,
        message_id=message_id,
    )
    return message

query async

query(filter_spec, *, limit=DEFAULT_PAGE_SIZE, offset=0)

Return messages matching the filter spec, newest first.

Raises:

Type Description
QueryError

If the query fails or pagination is out of range.

Source code in src/synthorg/persistence/postgres/repositories.py
async def query(
    self,
    filter_spec: MessageFilterSpec,
    *,
    limit: int = DEFAULT_PAGE_SIZE,
    offset: int = 0,
) -> tuple[Message, ...]:
    """Return messages matching the filter spec, newest first.

    Raises:
        QueryError: If the query fails or pagination is out of range.
    """
    limit = validate_pagination_args(
        limit, offset, event=PERSISTENCE_MESSAGE_HISTORY_FAILED
    )
    sql = (
        'SELECT id, timestamp, sender, "to", type, priority, '
        "channel, content, attachments, metadata "
        "FROM messages"
    )
    params: list[object] = []
    if filter_spec.channel is not None:
        sql += " WHERE channel = %s"
        params.append(filter_spec.channel)
    sql += " ORDER BY timestamp DESC, id ASC LIMIT %s OFFSET %s"
    params.extend([limit, offset])
    try:
        async with (
            self._pool.connection() as conn,
            conn.cursor(row_factory=dict_row) as cur,
        ):
            await cur.execute(sql, params)
            rows = await cur.fetchall()
    except psycopg.Error as exc:
        msg = "Failed to query messages"
        logger.warning(
            PERSISTENCE_MESSAGE_HISTORY_FAILED,
            channel=filter_spec.channel,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    return tuple(self._row_to_message(row) for row in rows)

purge_before async

purge_before(threshold)

Delete messages with timestamp < threshold (retention).

Source code in src/synthorg/persistence/postgres/repositories.py
async def purge_before(self, threshold: datetime) -> int:
    """Delete messages with ``timestamp < threshold`` (retention)."""
    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute(
                "DELETE FROM messages WHERE timestamp < %s",
                (normalize_utc(threshold),),
            )
            rowcount = cur.rowcount
            await conn.commit()
    except psycopg.Error as exc:
        msg = "Failed to purge messages by threshold"
        logger.warning(
            PERSISTENCE_MESSAGE_DELETE_FAILED,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    return rowcount

delete async

delete(message_id)

Delete a single message by id (bespoke per ADR D7, moderation).

Returns True when a row was removed, False when the id did not exist. The audit-grade mutation log is emitted by :class:MessageService.delete_message; the repository never logs mutations itself (persistence-boundary rule, see docs/reference/persistence-boundary.md).

Source code in src/synthorg/persistence/postgres/repositories.py
async def delete(self, message_id: NotBlankStr) -> bool:
    """Delete a single message by id (bespoke per ADR D7, moderation).

    Returns ``True`` when a row was removed, ``False`` when the id
    did not exist. The audit-grade mutation log is emitted by
    :class:`MessageService.delete_message`; the repository never
    logs mutations itself (persistence-boundary rule, see
    ``docs/reference/persistence-boundary.md``).
    """
    try:
        async with self._pool.connection() as conn, conn.cursor() as cur:
            await cur.execute(
                "DELETE FROM messages WHERE id = %s",
                (message_id,),
            )
            await conn.commit()
            deleted = cur.rowcount > 0
    except psycopg.Error as exc:
        msg = f"Failed to delete message {message_id!r}"
        logger.warning(
            PERSISTENCE_MESSAGE_DELETE_FAILED,
            message_id=message_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise QueryError(msg) from exc
    return deleted