Skip to content

Persistence

Pluggable operational data persistence -- protocol, configuration, and SQLite backend.

Protocol

protocol

PersistenceBackend protocol -- lifecycle + repository access.

Application code depends on this protocol for storage lifecycle management. Repository protocols provide entity-level access.

PersistenceBackend

Bases: Protocol

Lifecycle management for operational data storage.

Concrete backends implement this protocol to provide connection management, health monitoring, schema migrations, and access to entity-specific repositories.

Attributes:

Name Type Description
is_connected bool

Whether the backend has an active connection.

backend_name NotBlankStr

Human-readable backend identifier.

tasks TaskRepository

Repository for Task persistence.

cost_records CostRecordRepository

Repository for CostRecord persistence.

messages MessageRepository

Repository for Message persistence.

lifecycle_events LifecycleEventRepository

Repository for AgentLifecycleEvent persistence.

task_metrics TaskMetricRepository

Repository for TaskMetricRecord persistence.

collaboration_metrics CollaborationMetricRepository

Repository for CollaborationMetricRecord persistence.

parked_contexts ParkedContextRepository

Repository for ParkedContext persistence.

audit_entries AuditRepository

Repository for AuditEntry persistence.

users UserRepository

Repository for User persistence.

api_keys ApiKeyRepository

Repository for ApiKey persistence.

checkpoints CheckpointRepository

Repository for Checkpoint persistence.

heartbeats HeartbeatRepository

Repository for Heartbeat persistence.

agent_states AgentStateRepository

Repository for AgentRuntimeState persistence.

settings SettingsRepository

Repository for namespaced settings persistence.

artifacts ArtifactRepository

Repository for Artifact persistence.

projects ProjectRepository

Repository for Project persistence.

custom_presets PersonalityPresetRepository

Repository for custom personality preset persistence.

workflow_definitions WorkflowDefinitionRepository

Repository for workflow definition persistence.

workflow_executions WorkflowExecutionRepository

Repository for workflow execution persistence.

is_connected property

is_connected

Whether the backend has an active connection.

backend_name property

backend_name

Human-readable backend identifier (e.g. "sqlite").

tasks property

tasks

Repository for Task persistence.

cost_records property

cost_records

Repository for CostRecord persistence.

messages property

messages

Repository for Message persistence.

lifecycle_events property

lifecycle_events

Repository for AgentLifecycleEvent persistence.

task_metrics property

task_metrics

Repository for TaskMetricRecord persistence.

collaboration_metrics property

collaboration_metrics

Repository for CollaborationMetricRecord persistence.

parked_contexts property

parked_contexts

Repository for ParkedContext persistence.

audit_entries property

audit_entries

Repository for AuditEntry persistence.

users property

users

Repository for User persistence.

api_keys property

api_keys

Repository for ApiKey persistence.

checkpoints property

checkpoints

Repository for Checkpoint persistence.

heartbeats property

heartbeats

Repository for Heartbeat persistence.

agent_states property

agent_states

Repository for AgentRuntimeState persistence.

settings property

settings

Repository for namespaced settings persistence.

artifacts property

artifacts

Repository for Artifact persistence.

projects property

projects

Repository for Project persistence.

custom_presets property

custom_presets

Repository for custom personality preset persistence.

workflow_definitions property

workflow_definitions

Repository for workflow definition persistence.

workflow_executions property

workflow_executions

Repository for workflow execution persistence.

connect async

connect()

Establish connection to the storage backend.

Raises:

Type Description
PersistenceConnectionError

If the connection cannot be established.

Source code in src/synthorg/persistence/protocol.py
async def connect(self) -> None:
    """Establish connection to the storage backend.

    Raises:
        PersistenceConnectionError: If the connection cannot be
            established.
    """
    ...

disconnect async

disconnect()

Close the storage backend connection.

Safe to call even if not connected.

Source code in src/synthorg/persistence/protocol.py
async def disconnect(self) -> None:
    """Close the storage backend connection.

    Safe to call even if not connected.
    """
    ...

health_check async

health_check()

Check whether the backend is healthy and responsive.

Returns:

Type Description
bool

True if the backend is reachable and operational.

Source code in src/synthorg/persistence/protocol.py
async def health_check(self) -> bool:
    """Check whether the backend is healthy and responsive.

    Returns:
        ``True`` if the backend is reachable and operational.
    """
    ...

migrate async

migrate()

Run pending schema migrations.

Raises:

Type Description
MigrationError

If a migration fails.

Source code in src/synthorg/persistence/protocol.py
async def migrate(self) -> None:
    """Run pending schema migrations.

    Raises:
        MigrationError: If a migration fails.
    """
    ...

get_db

get_db()

Return the underlying database connection.

Returns:

Type Description
Any

The raw database connection object (backend-specific).

Raises:

Type Description
PersistenceConnectionError

If not yet connected.

Source code in src/synthorg/persistence/protocol.py
def get_db(self) -> Any:
    """Return the underlying database connection.

    Returns:
        The raw database connection object (backend-specific).

    Raises:
        PersistenceConnectionError: If not yet connected.
    """
    ...

get_setting async

get_setting(key)

Retrieve a setting value by key.

Parameters:

Name Type Description Default
key NotBlankStr

Setting key.

required

Returns:

Type Description
str | None

The setting value, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/protocol.py
async def get_setting(self, key: NotBlankStr) -> str | None:
    """Retrieve a setting value by key.

    Args:
        key: Setting key.

    Returns:
        The setting value, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

set_setting async

set_setting(key, value)

Store a setting value.

Upserts -- creates or updates the key.

Parameters:

Name Type Description Default
key NotBlankStr

Setting key.

required
value str

Setting value.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/protocol.py
async def set_setting(self, key: NotBlankStr, value: str) -> None:
    """Store a setting value.

    Upserts -- creates or updates the key.

    Args:
        key: Setting key.
        value: Setting value.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

Config

config

Persistence configuration models.

Frozen Pydantic models for persistence backend selection and backend-specific settings.

SQLiteConfig pydantic-model

Bases: BaseModel

SQLite-specific persistence configuration.

Attributes:

Name Type Description
path NotBlankStr

Database file path. Use ":memory:" for in-memory databases (useful for testing).

wal_mode bool

Whether to enable WAL journal mode for concurrent read performance.

journal_size_limit int

Maximum WAL journal size in bytes (default 64 MB).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _reject_traversal

path pydantic-field

path = 'synthorg.db'

Database file path

wal_mode pydantic-field

wal_mode = True

Enable WAL journal mode

journal_size_limit pydantic-field

journal_size_limit = 67108864

Maximum WAL journal size in bytes

PersistenceConfig pydantic-model

Bases: BaseModel

Top-level persistence configuration.

Attributes:

Name Type Description
backend NotBlankStr

Backend name -- currently only "sqlite" is implemented.

sqlite SQLiteConfig

SQLite-specific settings (used when backend="sqlite").

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_backend_name

backend pydantic-field

backend = 'sqlite'

Persistence backend name

sqlite pydantic-field

sqlite

SQLite-specific settings

Repositories

repositories

Repository protocols for operational data persistence.

Each entity type has its own protocol so that application code depends only on abstract interfaces, never on a concrete backend.

CollaborationMetricRepository

Bases: Protocol

Append-only persistence + query for CollaborationMetricRecord.

save async

save(record)

Persist a collaboration metric record.

Parameters:

Name Type Description Default
record CollaborationMetricRecord

The collaboration metric record to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/hr/persistence_protocol.py
async def save(self, record: CollaborationMetricRecord) -> None:
    """Persist a collaboration metric record.

    Args:
        record: The collaboration metric record to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(*, agent_id=None, since=None)

Query collaboration metric records with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent identifier.

None
since AwareDatetime | None

Include records after this time.

None

Returns:

Type Description
tuple[CollaborationMetricRecord, ...]

Matching collaboration metric records.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/hr/persistence_protocol.py
async def query(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    since: AwareDatetime | None = None,
) -> tuple[CollaborationMetricRecord, ...]:
    """Query collaboration metric records with optional filters.

    Args:
        agent_id: Filter by agent identifier.
        since: Include records after this time.

    Returns:
        Matching collaboration metric records.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

LifecycleEventRepository

Bases: Protocol

CRUD + query interface for AgentLifecycleEvent persistence.

save async

save(event)

Persist a lifecycle event.

Parameters:

Name Type Description Default
event AgentLifecycleEvent

The lifecycle event to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/hr/persistence_protocol.py
async def save(self, event: AgentLifecycleEvent) -> None:
    """Persist a lifecycle event.

    Args:
        event: The lifecycle event to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_events async

list_events(*, agent_id=None, event_type=None, since=None, limit=None)

List lifecycle events with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent identifier.

None
event_type LifecycleEventType | None

Filter by event type.

None
since AwareDatetime | None

Filter events after this timestamp.

None
limit int | None

Maximum number of events to return. None for all.

None

Returns:

Type Description
tuple[AgentLifecycleEvent, ...]

Matching lifecycle events.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/hr/persistence_protocol.py
async def list_events(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    event_type: LifecycleEventType | None = None,
    since: AwareDatetime | None = None,
    limit: int | None = None,
) -> tuple[AgentLifecycleEvent, ...]:
    """List lifecycle events with optional filters.

    Args:
        agent_id: Filter by agent identifier.
        event_type: Filter by event type.
        since: Filter events after this timestamp.
        limit: Maximum number of events to return. ``None`` for all.

    Returns:
        Matching lifecycle events.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

TaskMetricRepository

Bases: Protocol

Append-only persistence + query for TaskMetricRecord.

save async

save(record)

Persist a task metric record.

Parameters:

Name Type Description Default
record TaskMetricRecord

The task metric record to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/hr/persistence_protocol.py
async def save(self, record: TaskMetricRecord) -> None:
    """Persist a task metric record.

    Args:
        record: The task metric record to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(*, agent_id=None, since=None, until=None)

Query task metric records with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent identifier.

None
since AwareDatetime | None

Include records after this time.

None
until AwareDatetime | None

Include records before this time.

None

Returns:

Type Description
tuple[TaskMetricRecord, ...]

Matching task metric records.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/hr/persistence_protocol.py
async def query(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    since: AwareDatetime | None = None,
    until: AwareDatetime | None = None,
) -> tuple[TaskMetricRecord, ...]:
    """Query task metric records with optional filters.

    Args:
        agent_id: Filter by agent identifier.
        since: Include records after this time.
        until: Include records before this time.

    Returns:
        Matching task metric records.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

TaskRepository

Bases: Protocol

CRUD + query interface for Task persistence.

save async

save(task)

Persist a task (insert or update).

Parameters:

Name Type Description Default
task Task

The task to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, task: Task) -> None:
    """Persist a task (insert or update).

    Args:
        task: The task to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(task_id)

Retrieve a task by its ID.

Parameters:

Name Type Description Default
task_id NotBlankStr

The task identifier.

required

Returns:

Type Description
Task | None

The task, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get(self, task_id: NotBlankStr) -> Task | None:
    """Retrieve a task by its ID.

    Args:
        task_id: The task identifier.

    Returns:
        The task, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_tasks async

list_tasks(*, status=None, assigned_to=None, project=None)

List tasks with optional filters.

Parameters:

Name Type Description Default
status TaskStatus | None

Filter by task status.

None
assigned_to NotBlankStr | None

Filter by assignee agent ID.

None
project NotBlankStr | None

Filter by project ID.

None

Returns:

Type Description
tuple[Task, ...]

Matching tasks as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def list_tasks(
    self,
    *,
    status: TaskStatus | None = None,
    assigned_to: NotBlankStr | None = None,
    project: NotBlankStr | None = None,
) -> tuple[Task, ...]:
    """List tasks with optional filters.

    Args:
        status: Filter by task status.
        assigned_to: Filter by assignee agent ID.
        project: Filter by project ID.

    Returns:
        Matching tasks as a tuple.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(task_id)

Delete a task by ID.

Parameters:

Name Type Description Default
task_id NotBlankStr

The task identifier.

required

Returns:

Type Description
bool

True if the task was deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete(self, task_id: NotBlankStr) -> bool:
    """Delete a task by ID.

    Args:
        task_id: The task identifier.

    Returns:
        ``True`` if the task was deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

CostRecordRepository

Bases: Protocol

Append-only persistence + query/aggregation for CostRecord.

save async

save(record)

Persist a cost record (append-only).

Parameters:

Name Type Description Default
record CostRecord

The cost record to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, record: CostRecord) -> None:
    """Persist a cost record (append-only).

    Args:
        record: The cost record to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

query async

query(*, agent_id=None, task_id=None)

Query cost records with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent identifier.

None
task_id NotBlankStr | None

Filter by task identifier.

None

Returns:

Type Description
tuple[CostRecord, ...]

Matching cost records as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def query(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
) -> tuple[CostRecord, ...]:
    """Query cost records with optional filters.

    Args:
        agent_id: Filter by agent identifier.
        task_id: Filter by task identifier.

    Returns:
        Matching cost records as a tuple.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

aggregate async

aggregate(*, agent_id=None, task_id=None)

Sum total cost_usd, optionally filtered by agent and/or task.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent identifier.

None
task_id NotBlankStr | None

Filter by task identifier.

None

Returns:

Type Description
float

Total cost in USD (base currency).

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def aggregate(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
) -> float:
    """Sum total cost_usd, optionally filtered by agent and/or task.

    Args:
        agent_id: Filter by agent identifier.
        task_id: Filter by task identifier.

    Returns:
        Total cost in USD (base currency).

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

MessageRepository

Bases: Protocol

Write + history query interface for Message persistence.

save async

save(message)

Persist a message.

Parameters:

Name Type Description Default
message Message

The message to persist.

required

Raises:

Type Description
DuplicateRecordError

If a message with the same ID exists.

PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, message: Message) -> None:
    """Persist a message.

    Args:
        message: The message to persist.

    Raises:
        DuplicateRecordError: If a message with the same ID exists.
        PersistenceError: If the operation fails.
    """
    ...

get_history async

get_history(channel, *, limit=None)

Retrieve message history for a channel.

Parameters:

Name Type Description Default
channel NotBlankStr

Channel name to query.

required
limit int | None

Maximum number of messages to return (newest first).

None

Returns:

Type Description
tuple[Message, ...]

Messages ordered by timestamp descending.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_history(
    self,
    channel: NotBlankStr,
    *,
    limit: int | None = None,
) -> tuple[Message, ...]:
    """Retrieve message history for a channel.

    Args:
        channel: Channel name to query.
        limit: Maximum number of messages to return (newest first).

    Returns:
        Messages ordered by timestamp descending.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

ParkedContextRepository

Bases: Protocol

CRUD interface for parked agent execution contexts.

save async

save(context)

Persist a parked context.

Parameters:

Name Type Description Default
context ParkedContext

The parked context to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, context: ParkedContext) -> None:
    """Persist a parked context.

    Args:
        context: The parked context to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(parked_id)

Retrieve a parked context by ID.

Parameters:

Name Type Description Default
parked_id NotBlankStr

The parked context identifier.

required

Returns:

Type Description
ParkedContext | None

The parked context, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get(self, parked_id: NotBlankStr) -> ParkedContext | None:
    """Retrieve a parked context by ID.

    Args:
        parked_id: The parked context identifier.

    Returns:
        The parked context, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_approval async

get_by_approval(approval_id)

Retrieve a parked context by approval ID.

Parameters:

Name Type Description Default
approval_id NotBlankStr

The approval item identifier.

required

Returns:

Type Description
ParkedContext | None

The parked context, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_by_approval(self, approval_id: NotBlankStr) -> ParkedContext | None:
    """Retrieve a parked context by approval ID.

    Args:
        approval_id: The approval item identifier.

    Returns:
        The parked context, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_agent async

get_by_agent(agent_id)

Retrieve all parked contexts for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required

Returns:

Type Description
tuple[ParkedContext, ...]

Parked contexts for the agent.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_by_agent(self, agent_id: NotBlankStr) -> tuple[ParkedContext, ...]:
    """Retrieve all parked contexts for an agent.

    Args:
        agent_id: The agent identifier.

    Returns:
        Parked contexts for the agent.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(parked_id)

Delete a parked context by ID.

Parameters:

Name Type Description Default
parked_id NotBlankStr

The parked context identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete(self, parked_id: NotBlankStr) -> bool:
    """Delete a parked context by ID.

    Args:
        parked_id: The parked context identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

AuditRepository

Bases: Protocol

Append-only persistence + query interface for AuditEntry.

Audit entries are immutable records of security evaluations. No update or delete operations are provided to preserve audit integrity.

save async

save(entry)

Persist an audit entry (append-only).

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to persist.

required

Raises:

Type Description
DuplicateRecordError

If an entry with the same ID exists.

QueryError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, entry: AuditEntry) -> None:
    """Persist an audit entry (append-only).

    Args:
        entry: The audit entry to persist.

    Raises:
        DuplicateRecordError: If an entry with the same ID exists.
        QueryError: If the operation fails.
    """
    ...

query async

query(
    *,
    agent_id=None,
    action_type=None,
    verdict=None,
    risk_level=None,
    since=None,
    until=None,
    limit=100,
)

Query audit entries with optional filters.

Filters are AND-combined. Results are ordered by timestamp descending (newest first).

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent identifier.

None
action_type str | None

Filter by action type string.

None
verdict AuditVerdictStr | None

Filter by verdict string.

None
risk_level ApprovalRiskLevel | None

Filter by risk level.

None
since AwareDatetime | None

Only return entries at or after this timestamp.

None
until AwareDatetime | None

Only return entries at or before this timestamp.

None
limit int

Maximum number of entries to return (must be >= 1).

100

Returns:

Type Description
tuple[AuditEntry, ...]

Matching audit entries as a tuple.

Raises:

Type Description
QueryError

If the operation fails, limit < 1, or until is earlier than since.

Source code in src/synthorg/persistence/repositories.py
async def query(  # noqa: PLR0913
    self,
    *,
    agent_id: NotBlankStr | None = None,
    action_type: str | None = None,
    verdict: AuditVerdictStr | None = None,
    risk_level: ApprovalRiskLevel | None = None,
    since: AwareDatetime | None = None,
    until: AwareDatetime | None = None,
    limit: int = 100,
) -> tuple[AuditEntry, ...]:
    """Query audit entries with optional filters.

    Filters are AND-combined. Results are ordered by timestamp
    descending (newest first).

    Args:
        agent_id: Filter by agent identifier.
        action_type: Filter by action type string.
        verdict: Filter by verdict string.
        risk_level: Filter by risk level.
        since: Only return entries at or after this timestamp.
        until: Only return entries at or before this timestamp.
        limit: Maximum number of entries to return (must be >= 1).

    Returns:
        Matching audit entries as a tuple.

    Raises:
        QueryError: If the operation fails, *limit* < 1, or
            *until* is earlier than *since*.
    """
    ...

UserRepository

Bases: Protocol

CRUD interface for User persistence.

save async

save(user)

Persist a user (insert or update).

Parameters:

Name Type Description Default
user User

The user to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, user: User) -> None:
    """Persist a user (insert or update).

    Args:
        user: The user to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(user_id)

Retrieve a user by ID.

Parameters:

Name Type Description Default
user_id NotBlankStr

The user identifier.

required

Returns:

Type Description
User | None

The user, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get(self, user_id: NotBlankStr) -> User | None:
    """Retrieve a user by ID.

    Args:
        user_id: The user identifier.

    Returns:
        The user, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_username async

get_by_username(username)

Retrieve a user by username.

Parameters:

Name Type Description Default
username NotBlankStr

The login username.

required

Returns:

Type Description
User | None

The user, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_by_username(self, username: NotBlankStr) -> User | None:
    """Retrieve a user by username.

    Args:
        username: The login username.

    Returns:
        The user, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_users async

list_users()

List all human users (excludes the system user).

Returns:

Type Description
tuple[User, ...]

Human users as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def list_users(self) -> tuple[User, ...]:
    """List all human users (excludes the system user).

    Returns:
        Human users as a tuple.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

count async

count()

Count the number of human users (excludes the system user).

Returns:

Type Description
int

Human user count.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def count(self) -> int:
    """Count the number of human users (excludes the system user).

    Returns:
        Human user count.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

count_by_role async

count_by_role(role)

Count users with a specific role.

Parameters:

Name Type Description Default
role HumanRole

The role to filter by.

required

Returns:

Type Description
int

Number of users with the given role.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def count_by_role(self, role: HumanRole) -> int:
    """Count users with a specific role.

    Args:
        role: The role to filter by.

    Returns:
        Number of users with the given role.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(user_id)

Delete a user by ID.

Parameters:

Name Type Description Default
user_id NotBlankStr

The user identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete(self, user_id: NotBlankStr) -> bool:
    """Delete a user by ID.

    Args:
        user_id: The user identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

ApiKeyRepository

Bases: Protocol

CRUD interface for API key persistence.

save async

save(key)

Persist an API key.

Parameters:

Name Type Description Default
key ApiKey

The API key to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, key: ApiKey) -> None:
    """Persist an API key.

    Args:
        key: The API key to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(key_id)

Retrieve an API key by ID.

Parameters:

Name Type Description Default
key_id NotBlankStr

The key identifier.

required

Returns:

Type Description
ApiKey | None

The API key, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get(self, key_id: NotBlankStr) -> ApiKey | None:
    """Retrieve an API key by ID.

    Args:
        key_id: The key identifier.

    Returns:
        The API key, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_by_hash async

get_by_hash(key_hash)

Retrieve an API key by its hash.

Parameters:

Name Type Description Default
key_hash NotBlankStr

HMAC-SHA256 hex digest.

required

Returns:

Type Description
ApiKey | None

The API key, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_by_hash(self, key_hash: NotBlankStr) -> ApiKey | None:
    """Retrieve an API key by its hash.

    Args:
        key_hash: HMAC-SHA256 hex digest.

    Returns:
        The API key, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_by_user async

list_by_user(user_id)

List API keys belonging to a user.

Parameters:

Name Type Description Default
user_id NotBlankStr

The owner user ID.

required

Returns:

Type Description
tuple[ApiKey, ...]

API keys for the user.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def list_by_user(self, user_id: NotBlankStr) -> tuple[ApiKey, ...]:
    """List API keys belonging to a user.

    Args:
        user_id: The owner user ID.

    Returns:
        API keys for the user.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(key_id)

Delete an API key by ID.

Parameters:

Name Type Description Default
key_id NotBlankStr

The key identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete(self, key_id: NotBlankStr) -> bool:
    """Delete an API key by ID.

    Args:
        key_id: The key identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

CheckpointRepository

Bases: Protocol

CRUD interface for checkpoint persistence.

save async

save(checkpoint)

Persist a checkpoint (insert or replace by ID).

Parameters:

Name Type Description Default
checkpoint Checkpoint

The checkpoint to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, checkpoint: Checkpoint) -> None:
    """Persist a checkpoint (insert or replace by ID).

    Args:
        checkpoint: The checkpoint to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_latest async

get_latest(*, execution_id=None, task_id=None)

Retrieve the latest checkpoint by turn_number.

At least one filter (execution_id or task_id) is required.

Parameters:

Name Type Description Default
execution_id NotBlankStr | None

Filter by execution identifier.

None
task_id NotBlankStr | None

Filter by task identifier.

None

Returns:

Type Description
Checkpoint | None

The checkpoint with the highest turn_number, or None.

Raises:

Type Description
PersistenceError

If the operation fails.

ValueError

If neither filter is provided.

Source code in src/synthorg/persistence/repositories.py
async def get_latest(
    self,
    *,
    execution_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
) -> Checkpoint | None:
    """Retrieve the latest checkpoint by turn_number.

    At least one filter (``execution_id`` or ``task_id``) is required.

    Args:
        execution_id: Filter by execution identifier.
        task_id: Filter by task identifier.

    Returns:
        The checkpoint with the highest turn_number, or ``None``.

    Raises:
        PersistenceError: If the operation fails.
        ValueError: If neither filter is provided.
    """
    ...

delete_by_execution async

delete_by_execution(execution_id)

Delete all checkpoints for an execution.

Parameters:

Name Type Description Default
execution_id NotBlankStr

The execution identifier.

required

Returns:

Type Description
int

Number of checkpoints deleted.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete_by_execution(self, execution_id: NotBlankStr) -> int:
    """Delete all checkpoints for an execution.

    Args:
        execution_id: The execution identifier.

    Returns:
        Number of checkpoints deleted.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

HeartbeatRepository

Bases: Protocol

CRUD interface for heartbeat persistence.

save async

save(heartbeat)

Persist a heartbeat (upsert by execution_id).

Parameters:

Name Type Description Default
heartbeat Heartbeat

The heartbeat to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, heartbeat: Heartbeat) -> None:
    """Persist a heartbeat (upsert by execution_id).

    Args:
        heartbeat: The heartbeat to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(execution_id)

Retrieve a heartbeat by execution ID.

Parameters:

Name Type Description Default
execution_id NotBlankStr

The execution identifier.

required

Returns:

Type Description
Heartbeat | None

The heartbeat, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get(self, execution_id: NotBlankStr) -> Heartbeat | None:
    """Retrieve a heartbeat by execution ID.

    Args:
        execution_id: The execution identifier.

    Returns:
        The heartbeat, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_stale async

get_stale(threshold)

Retrieve heartbeats older than the threshold.

Parameters:

Name Type Description Default
threshold AwareDatetime

Heartbeats with last_heartbeat_at before this timestamp are considered stale.

required

Returns:

Type Description
tuple[Heartbeat, ...]

Stale heartbeats as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_stale(
    self,
    threshold: AwareDatetime,
) -> tuple[Heartbeat, ...]:
    """Retrieve heartbeats older than the threshold.

    Args:
        threshold: Heartbeats with ``last_heartbeat_at`` before
            this timestamp are considered stale.

    Returns:
        Stale heartbeats as a tuple.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(execution_id)

Delete a heartbeat by execution ID.

Parameters:

Name Type Description Default
execution_id NotBlankStr

The execution identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete(self, execution_id: NotBlankStr) -> bool:
    """Delete a heartbeat by execution ID.

    Args:
        execution_id: The execution identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

AgentStateRepository

Bases: Protocol

CRUD + query interface for agent runtime state persistence.

Provides a lightweight per-agent registry of execution state for dashboard queries, graceful shutdown discovery, and cross-restart recovery.

save async

save(state)

Upsert an agent runtime state by agent_id.

Parameters:

Name Type Description Default
state AgentRuntimeState

The agent runtime state to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def save(self, state: AgentRuntimeState) -> None:
    """Upsert an agent runtime state by ``agent_id``.

    Args:
        state: The agent runtime state to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(agent_id)

Retrieve an agent runtime state by agent ID.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required

Returns:

Type Description
AgentRuntimeState | None

The agent state, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get(self, agent_id: NotBlankStr) -> AgentRuntimeState | None:
    """Retrieve an agent runtime state by agent ID.

    Args:
        agent_id: The agent identifier.

    Returns:
        The agent state, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_active async

get_active()

Retrieve all non-idle agent states.

Returns states where status != 'idle', ordered by last_activity_at descending (most recent first).

Returns:

Type Description
tuple[AgentRuntimeState, ...]

Active agent states as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_active(self) -> tuple[AgentRuntimeState, ...]:
    """Retrieve all non-idle agent states.

    Returns states where ``status != 'idle'``, ordered by
    ``last_activity_at`` descending (most recent first).

    Returns:
        Active agent states as a tuple.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(agent_id)

Delete an agent runtime state by agent ID.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete(self, agent_id: NotBlankStr) -> bool:
    """Delete an agent runtime state by agent ID.

    Args:
        agent_id: The agent identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

SettingsRepository

Bases: Protocol

CRUD interface for namespaced settings persistence.

Settings are stored as (namespace, key) composite-keyed string values with updated_at timestamps.

get async

get(namespace, key)

Retrieve a setting value and its timestamp.

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required
key NotBlankStr

Setting key within the namespace.

required

Returns:

Type Description
tuple[str, str] | None

(value, updated_at) tuple, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get(
    self,
    namespace: NotBlankStr,
    key: NotBlankStr,
) -> tuple[str, str] | None:
    """Retrieve a setting value and its timestamp.

    Args:
        namespace: Setting namespace.
        key: Setting key within the namespace.

    Returns:
        ``(value, updated_at)`` tuple, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_namespace async

get_namespace(namespace)

Retrieve all settings in a namespace.

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required

Returns:

Type Description
tuple[tuple[str, str, str], ...]

Tuple of (key, value, updated_at) tuples, sorted by key.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_namespace(
    self,
    namespace: NotBlankStr,
) -> tuple[tuple[str, str, str], ...]:
    """Retrieve all settings in a namespace.

    Args:
        namespace: Setting namespace.

    Returns:
        Tuple of ``(key, value, updated_at)`` tuples, sorted by key.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get_all async

get_all()

Retrieve all settings across all namespaces.

Returns:

Type Description
tuple[str, str, str, str]

Tuple of (namespace, key, value, updated_at) tuples,

...

sorted by namespace then key.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def get_all(self) -> tuple[tuple[str, str, str, str], ...]:
    """Retrieve all settings across all namespaces.

    Returns:
        Tuple of ``(namespace, key, value, updated_at)`` tuples,
        sorted by namespace then key.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

set async

set(namespace, key, value, updated_at, *, expected_updated_at=None)

Upsert a setting value.

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required
key NotBlankStr

Setting key within the namespace.

required
value str

Setting value as a string.

required
updated_at str

ISO 8601 timestamp of the change.

required
expected_updated_at str | None

When provided, the row is only updated if the current updated_at matches (atomic compare-and-swap).

None

Returns:

Type Description
bool

True if the write succeeded.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def set(
    self,
    namespace: NotBlankStr,
    key: NotBlankStr,
    value: str,
    updated_at: str,
    *,
    expected_updated_at: str | None = None,
) -> bool:
    """Upsert a setting value.

    Args:
        namespace: Setting namespace.
        key: Setting key within the namespace.
        value: Setting value as a string.
        updated_at: ISO 8601 timestamp of the change.
        expected_updated_at: When provided, the row is only
            updated if the current ``updated_at`` matches
            (atomic compare-and-swap).

    Returns:
        ``True`` if the write succeeded.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(namespace, key)

Delete a setting.

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required
key NotBlankStr

Setting key within the namespace.

required

Returns:

Type Description
bool

True if the setting was deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete(
    self,
    namespace: NotBlankStr,
    key: NotBlankStr,
) -> bool:
    """Delete a setting.

    Args:
        namespace: Setting namespace.
        key: Setting key within the namespace.

    Returns:
        ``True`` if the setting was deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete_namespace async

delete_namespace(namespace)

Delete all settings in a namespace.

Parameters:

Name Type Description Default
namespace NotBlankStr

Setting namespace.

required

Returns:

Type Description
int

Number of settings deleted.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/repositories.py
async def delete_namespace(self, namespace: NotBlankStr) -> int:
    """Delete all settings in a namespace.

    Args:
        namespace: Setting namespace.

    Returns:
        Number of settings deleted.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

ArtifactRepository

Bases: Protocol

CRUD + query interface for Artifact persistence.

save async

save(artifact)

Persist an artifact (insert or update).

Parameters:

Name Type Description Default
artifact Artifact

The artifact to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def save(self, artifact: Artifact) -> None:
    """Persist an artifact (insert or update).

    Args:
        artifact: The artifact to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(artifact_id)

Retrieve an artifact by its ID.

Parameters:

Name Type Description Default
artifact_id NotBlankStr

The artifact identifier.

required

Returns:

Type Description
Artifact | None

The artifact, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def get(self, artifact_id: NotBlankStr) -> Artifact | None:
    """Retrieve an artifact by its ID.

    Args:
        artifact_id: The artifact identifier.

    Returns:
        The artifact, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_artifacts async

list_artifacts(*, task_id=None, created_by=None, artifact_type=None)

List artifacts with optional filters.

Results are ordered by artifact ID ascending to ensure deterministic pagination across backends.

Parameters:

Name Type Description Default
task_id NotBlankStr | None

Filter by originating task ID.

None
created_by NotBlankStr | None

Filter by creator agent ID.

None
artifact_type ArtifactType | None

Filter by artifact type.

None

Returns:

Type Description
tuple[Artifact, ...]

Matching artifacts ordered by ID, as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def list_artifacts(
    self,
    *,
    task_id: NotBlankStr | None = None,
    created_by: NotBlankStr | None = None,
    artifact_type: ArtifactType | None = None,
) -> tuple[Artifact, ...]:
    """List artifacts with optional filters.

    Results are ordered by artifact ID ascending to ensure
    deterministic pagination across backends.

    Args:
        task_id: Filter by originating task ID.
        created_by: Filter by creator agent ID.
        artifact_type: Filter by artifact type.

    Returns:
        Matching artifacts ordered by ID, as a tuple.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(artifact_id)

Delete an artifact by ID.

Parameters:

Name Type Description Default
artifact_id NotBlankStr

The artifact identifier.

required

Returns:

Type Description
bool

True if the artifact was deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def delete(self, artifact_id: NotBlankStr) -> bool:
    """Delete an artifact by ID.

    Args:
        artifact_id: The artifact identifier.

    Returns:
        ``True`` if the artifact was deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

ProjectRepository

Bases: Protocol

CRUD + query interface for Project persistence.

save async

save(project)

Persist a project (insert or update).

Parameters:

Name Type Description Default
project Project

The project to persist.

required

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def save(self, project: Project) -> None:
    """Persist a project (insert or update).

    Args:
        project: The project to persist.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

get async

get(project_id)

Retrieve a project by its ID.

Parameters:

Name Type Description Default
project_id NotBlankStr

The project identifier.

required

Returns:

Type Description
Project | None

The project, or None if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def get(self, project_id: NotBlankStr) -> Project | None:
    """Retrieve a project by its ID.

    Args:
        project_id: The project identifier.

    Returns:
        The project, or ``None`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

list_projects async

list_projects(*, status=None, lead=None)

List projects with optional filters.

Results are ordered by project ID ascending to ensure deterministic pagination across backends.

Parameters:

Name Type Description Default
status ProjectStatus | None

Filter by project status.

None
lead NotBlankStr | None

Filter by project lead agent ID.

None

Returns:

Type Description
tuple[Project, ...]

Matching projects ordered by ID, as a tuple.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def list_projects(
    self,
    *,
    status: ProjectStatus | None = None,
    lead: NotBlankStr | None = None,
) -> tuple[Project, ...]:
    """List projects with optional filters.

    Results are ordered by project ID ascending to ensure
    deterministic pagination across backends.

    Args:
        status: Filter by project status.
        lead: Filter by project lead agent ID.

    Returns:
        Matching projects ordered by ID, as a tuple.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

delete async

delete(project_id)

Delete a project by ID.

Parameters:

Name Type Description Default
project_id NotBlankStr

The project identifier.

required

Returns:

Type Description
bool

True if the project was deleted, False if not found.

Raises:

Type Description
PersistenceError

If the operation fails.

Source code in src/synthorg/persistence/artifact_project_repos.py
async def delete(self, project_id: NotBlankStr) -> bool:
    """Delete a project by ID.

    Args:
        project_id: The project identifier.

    Returns:
        ``True`` if the project was deleted, ``False`` if not found.

    Raises:
        PersistenceError: If the operation fails.
    """
    ...

Factory

factory

Factory for creating persistence backends from configuration.

Each company gets its own PersistenceBackend instance, which maps to its own database. This enables multi-tenancy: one database per company, selectable via the PersistenceConfig embedded in each company's RootConfig.

create_backend

create_backend(config)

Create a persistence backend from configuration.

Factory function that maps config.backend to the correct concrete backend class. Each call returns a new, disconnected backend instance -- the caller is responsible for calling connect() and migrate().

Parameters:

Name Type Description Default
config PersistenceConfig

Persistence configuration (includes backend selection and backend-specific settings).

required

Returns:

Type Description
PersistenceBackend

A new, disconnected backend instance.

Raises:

Type Description
PersistenceConnectionError

If the backend name is not recognized.

Example::

config = PersistenceConfig(
    backend="sqlite",
    sqlite=SQLiteConfig(path="data/company-a.db"),
)
backend = create_backend(config)
await backend.connect()
await backend.migrate()
Source code in src/synthorg/persistence/factory.py
def create_backend(config: PersistenceConfig) -> PersistenceBackend:
    """Create a persistence backend from configuration.

    Factory function that maps ``config.backend`` to the correct
    concrete backend class.  Each call returns a new, disconnected
    backend instance -- the caller is responsible for calling
    ``connect()`` and ``migrate()``.

    Args:
        config: Persistence configuration (includes backend selection
            and backend-specific settings).

    Returns:
        A new, disconnected backend instance.

    Raises:
        PersistenceConnectionError: If the backend name is not
            recognized.

    Example::

        config = PersistenceConfig(
            backend="sqlite",
            sqlite=SQLiteConfig(path="data/company-a.db"),
        )
        backend = create_backend(config)
        await backend.connect()
        await backend.migrate()
    """
    if config.backend == "sqlite":
        backend: PersistenceBackend = SQLitePersistenceBackend(config.sqlite)
        logger.debug(
            PERSISTENCE_BACKEND_CREATED,
            backend="sqlite",
            path=config.sqlite.path,
        )
        return backend
    msg = f"Unknown persistence backend: {config.backend!r}"
    logger.error(PERSISTENCE_BACKEND_UNKNOWN, backend=config.backend)
    raise PersistenceConnectionError(msg)

Errors

errors

Persistence error hierarchy.

All persistence-related errors inherit from PersistenceError so callers can catch the entire family with a single except clause.

PersistenceError

Bases: Exception

Base exception for all persistence operations.

PersistenceConnectionError

Bases: PersistenceError

Raised when a backend connection cannot be established or is lost.

MigrationError

Bases: PersistenceError

Raised when a database migration fails.

RecordNotFoundError

Bases: PersistenceError

Raised when a requested record does not exist.

Used by ArtifactStorageBackend.retrieve() when no content exists for the given artifact ID. Repository get() methods return None on miss instead of raising.

DuplicateRecordError

Bases: PersistenceError

Raised when inserting a record that already exists.

QueryError

Bases: PersistenceError

Raised when a query fails due to invalid parameters or backend issues.

VersionConflictError

Bases: QueryError

Raised when an optimistic concurrency version check fails.

ArtifactTooLargeError

Bases: PersistenceError

Raised when a single artifact exceeds the maximum allowed size.

ArtifactStorageFullError

Bases: PersistenceError

Raised when total artifact storage exceeds capacity.

SQLite Backend

backend

SQLite persistence backend implementation.

SQLitePersistenceBackend

SQLitePersistenceBackend(config)

SQLite implementation of the PersistenceBackend protocol.

Uses a single aiosqlite.Connection with WAL mode enabled by default for file-based databases (in-memory databases do not support WAL). Configurable via SQLiteConfig.wal_mode.

Parameters:

Name Type Description Default
config SQLiteConfig

SQLite-specific configuration.

required
Source code in src/synthorg/persistence/sqlite/backend.py
def __init__(self, config: SQLiteConfig) -> None:
    self._config = config
    self._lifecycle_lock = asyncio.Lock()
    self._db: aiosqlite.Connection | None = None
    self._artifacts: SQLiteArtifactRepository | None = None
    self._projects: SQLiteProjectRepository | None = None
    self._tasks: SQLiteTaskRepository | None = None
    self._cost_records: SQLiteCostRecordRepository | None = None
    self._messages: SQLiteMessageRepository | None = None
    self._lifecycle_events: SQLiteLifecycleEventRepository | None = None
    self._task_metrics: SQLiteTaskMetricRepository | None = None
    self._collaboration_metrics: SQLiteCollaborationMetricRepository | None = None
    self._parked_contexts: SQLiteParkedContextRepository | None = None
    self._audit_entries: SQLiteAuditRepository | None = None
    self._users: SQLiteUserRepository | None = None
    self._api_keys: SQLiteApiKeyRepository | None = None
    self._checkpoints: SQLiteCheckpointRepository | None = None
    self._heartbeats: SQLiteHeartbeatRepository | None = None
    self._agent_states: SQLiteAgentStateRepository | None = None
    self._settings: SQLiteSettingsRepository | None = None
    self._custom_presets: SQLitePersonalityPresetRepository | None = None
    self._workflow_definitions: SQLiteWorkflowDefinitionRepository | None = None
    self._workflow_executions: SQLiteWorkflowExecutionRepository | None = None

is_connected property

is_connected

Whether the backend has an active connection.

backend_name property

backend_name

Human-readable backend identifier.

tasks property

tasks

Repository for Task persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

cost_records property

cost_records

Repository for CostRecord persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

messages property

messages

Repository for Message persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

lifecycle_events property

lifecycle_events

Repository for AgentLifecycleEvent persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

task_metrics property

task_metrics

Repository for TaskMetricRecord persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

collaboration_metrics property

collaboration_metrics

Repository for CollaborationMetricRecord persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

parked_contexts property

parked_contexts

Repository for ParkedContext persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

audit_entries property

audit_entries

Repository for AuditEntry persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

users property

users

Repository for User persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

api_keys property

api_keys

Repository for ApiKey persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

checkpoints property

checkpoints

Repository for Checkpoint persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

heartbeats property

heartbeats

Repository for Heartbeat persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

agent_states property

agent_states

Repository for AgentRuntimeState persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

settings property

settings

Repository for namespaced settings persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

artifacts property

artifacts

Repository for Artifact persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

projects property

projects

Repository for Project persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

custom_presets property

custom_presets

Repository for custom personality preset persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

workflow_definitions property

workflow_definitions

Repository for workflow definition persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

workflow_executions property

workflow_executions

Repository for workflow execution persistence.

Raises:

Type Description
PersistenceConnectionError

If not connected.

connect async

connect()

Open the SQLite database and configure WAL mode.

Source code in src/synthorg/persistence/sqlite/backend.py
async def connect(self) -> None:
    """Open the SQLite database and configure WAL mode."""
    async with self._lifecycle_lock:
        if self._db is not None:
            logger.debug(PERSISTENCE_BACKEND_ALREADY_CONNECTED)
            return

        logger.info(
            PERSISTENCE_BACKEND_CONNECTING,
            path=self._config.path,
        )
        try:
            self._db = await aiosqlite.connect(self._config.path)
            self._db.row_factory = aiosqlite.Row

            # Enable foreign key enforcement (off by default in SQLite).
            await self._db.execute("PRAGMA foreign_keys = ON")

            if self._config.wal_mode:
                await self._configure_wal()

            self._create_repositories()
        except (sqlite3.Error, OSError) as exc:
            await self._cleanup_failed_connect(exc)

        logger.info(
            PERSISTENCE_BACKEND_CONNECTED,
            path=self._config.path,
        )

get_db

get_db()

Return the shared database connection.

Raises:

Type Description
PersistenceConnectionError

If not yet connected.

Source code in src/synthorg/persistence/sqlite/backend.py
def get_db(self) -> aiosqlite.Connection:
    """Return the shared database connection.

    Raises:
        PersistenceConnectionError: If not yet connected.
    """
    if self._db is None:
        msg = "Database not connected"
        raise PersistenceConnectionError(msg)
    return self._db

disconnect async

disconnect()

Close the database connection.

Source code in src/synthorg/persistence/sqlite/backend.py
async def disconnect(self) -> None:
    """Close the database connection."""
    async with self._lifecycle_lock:
        if self._db is None:
            return

        logger.info(PERSISTENCE_BACKEND_DISCONNECTING, path=self._config.path)
        try:
            await self._db.close()
            logger.info(
                PERSISTENCE_BACKEND_DISCONNECTED,
                path=self._config.path,
            )
        except (sqlite3.Error, OSError) as exc:
            logger.warning(
                PERSISTENCE_BACKEND_DISCONNECT_ERROR,
                path=self._config.path,
                error=str(exc),
                error_type=type(exc).__name__,
            )
        finally:
            self._clear_state()

health_check async

health_check()

Check database connectivity.

Source code in src/synthorg/persistence/sqlite/backend.py
async def health_check(self) -> bool:
    """Check database connectivity."""
    if self._db is None:
        return False
    try:
        cursor = await self._db.execute("SELECT 1")
        row = await cursor.fetchone()
        healthy = row is not None
    except (sqlite3.Error, aiosqlite.Error) as exc:
        logger.warning(
            PERSISTENCE_BACKEND_HEALTH_CHECK,
            healthy=False,
            error=str(exc),
            error_type=type(exc).__name__,
        )
        return False
    logger.debug(PERSISTENCE_BACKEND_HEALTH_CHECK, healthy=healthy)
    return healthy

migrate async

migrate()

Apply the database schema.

Raises:

Type Description
PersistenceConnectionError

If not connected.

MigrationError

If schema application fails.

Source code in src/synthorg/persistence/sqlite/backend.py
async def migrate(self) -> None:
    """Apply the database schema.

    Raises:
        PersistenceConnectionError: If not connected.
        MigrationError: If schema application fails.
    """
    if self._db is None:
        msg = "Cannot migrate: not connected"
        logger.warning(PERSISTENCE_BACKEND_NOT_CONNECTED, error=msg)
        raise PersistenceConnectionError(msg)
    await apply_schema(self._db)

get_setting async

get_setting(key)

Retrieve a setting value by key from the _system namespace.

Delegates to self.settings (the SettingsRepository).

Raises:

Type Description
PersistenceConnectionError

If not connected.

Source code in src/synthorg/persistence/sqlite/backend.py
async def get_setting(self, key: NotBlankStr) -> str | None:
    """Retrieve a setting value by key from the ``_system`` namespace.

    Delegates to ``self.settings`` (the ``SettingsRepository``).

    Raises:
        PersistenceConnectionError: If not connected.
    """
    result = await self.settings.get(NotBlankStr("_system"), key)
    return result[0] if result is not None else None

set_setting async

set_setting(key, value)

Store a setting value (upsert) in the _system namespace.

Delegates to self.settings (the SettingsRepository).

Raises:

Type Description
PersistenceConnectionError

If not connected.

Source code in src/synthorg/persistence/sqlite/backend.py
async def set_setting(self, key: NotBlankStr, value: str) -> None:
    """Store a setting value (upsert) in the ``_system`` namespace.

    Delegates to ``self.settings`` (the ``SettingsRepository``).

    Raises:
        PersistenceConnectionError: If not connected.
    """
    updated_at = datetime.now(UTC).isoformat()
    await self.settings.set(
        NotBlankStr("_system"),
        key,
        value,
        updated_at,
    )

repositories

SQLite repository implementations for Task, CostRecord, and Message.

HR-related repositories (LifecycleEvent, TaskMetric, CollaborationMetric) are in hr_repositories.py within this package.

SQLiteTaskRepository

SQLiteTaskRepository(db)

SQLite implementation of the TaskRepository protocol.

Parameters:

Name Type Description Default
db Connection

An open aiosqlite connection.

required
Source code in src/synthorg/persistence/sqlite/repositories.py
def __init__(self, db: aiosqlite.Connection) -> None:
    self._db = db

save async

save(task)

Persist a task (upsert semantics).

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def save(self, task: Task) -> None:
        """Persist a task (upsert semantics)."""
        try:
            params = task.model_dump(mode="json")
            # Tuple fields must be stored as JSON strings.
            params["reviewers"] = _json_list(task.reviewers)
            params["dependencies"] = _json_list(task.dependencies)
            params["artifacts_expected"] = _json_list(task.artifacts_expected)
            params["acceptance_criteria"] = _json_list(
                task.acceptance_criteria,
            )
            params["delegation_chain"] = _json_list(task.delegation_chain)

            await self._db.execute(
                """\
INSERT INTO tasks (
    id, title, description, type, priority, project, created_by,
    assigned_to, status, estimated_complexity, budget_limit, deadline,
    max_retries, parent_task_id, task_structure, coordination_topology,
    reviewers, dependencies, artifacts_expected, acceptance_criteria,
    delegation_chain
) VALUES (
    :id, :title, :description, :type, :priority, :project, :created_by,
    :assigned_to, :status, :estimated_complexity, :budget_limit, :deadline,
    :max_retries, :parent_task_id, :task_structure, :coordination_topology,
    :reviewers, :dependencies, :artifacts_expected, :acceptance_criteria,
    :delegation_chain
)
ON CONFLICT(id) DO UPDATE SET
    title=excluded.title,
    description=excluded.description,
    type=excluded.type,
    priority=excluded.priority,
    project=excluded.project,
    created_by=excluded.created_by,
    assigned_to=excluded.assigned_to,
    status=excluded.status,
    estimated_complexity=excluded.estimated_complexity,
    budget_limit=excluded.budget_limit,
    deadline=excluded.deadline,
    max_retries=excluded.max_retries,
    parent_task_id=excluded.parent_task_id,
    task_structure=excluded.task_structure,
    coordination_topology=excluded.coordination_topology,
    reviewers=excluded.reviewers,
    dependencies=excluded.dependencies,
    artifacts_expected=excluded.artifacts_expected,
    acceptance_criteria=excluded.acceptance_criteria,
    delegation_chain=excluded.delegation_chain
""",
                params,
            )
            await self._db.commit()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to save task {task.id!r}"
            logger.exception(
                PERSISTENCE_TASK_SAVE_FAILED, task_id=task.id, error=str(exc)
            )
            raise QueryError(msg) from exc
        logger.debug(PERSISTENCE_TASK_SAVED, task_id=task.id)

get async

get(task_id)

Retrieve a task by its ID.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def get(self, task_id: str) -> Task | None:
    """Retrieve a task by its ID."""
    try:
        cursor = await self._db.execute(
            f"SELECT {self._TASK_COLUMNS} FROM tasks WHERE id = ?",  # noqa: S608
            (task_id,),
        )
        row = await cursor.fetchone()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = f"Failed to fetch task {task_id!r}"
        logger.exception(
            PERSISTENCE_TASK_FETCH_FAILED,
            task_id=task_id,
            error=str(exc),
        )
        raise QueryError(msg) from exc
    if row is None:
        logger.debug(PERSISTENCE_TASK_FETCHED, task_id=task_id, found=False)
        return None
    logger.debug(PERSISTENCE_TASK_FETCHED, task_id=task_id, found=True)
    return self._row_to_task(row)

list_tasks async

list_tasks(*, status=None, assigned_to=None, project=None)

List tasks with optional filters.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def list_tasks(
    self,
    *,
    status: TaskStatus | None = None,
    assigned_to: str | None = None,
    project: str | None = None,
) -> tuple[Task, ...]:
    """List tasks with optional filters."""
    clauses: list[str] = []
    params: list[str] = []
    if status is not None:
        clauses.append("status = ?")
        params.append(status.value)
    if assigned_to is not None:
        clauses.append("assigned_to = ?")
        params.append(assigned_to)
    if project is not None:
        clauses.append("project = ?")
        params.append(project)

    query = f"SELECT {self._TASK_COLUMNS} FROM tasks"  # noqa: S608
    if clauses:
        query += " WHERE " + " AND ".join(clauses)

    try:
        cursor = await self._db.execute(query, params)
        rows = await cursor.fetchall()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = "Failed to list tasks"
        logger.exception(PERSISTENCE_TASK_LIST_FAILED, error=str(exc))
        raise QueryError(msg) from exc
    tasks = tuple(self._row_to_task(row) for row in rows)
    logger.debug(PERSISTENCE_TASK_LISTED, count=len(tasks))
    return tasks

delete async

delete(task_id)

Delete a task by ID.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def delete(self, task_id: str) -> bool:
    """Delete a task by ID."""
    try:
        cursor = await self._db.execute(
            "DELETE FROM tasks WHERE id = ?", (task_id,)
        )
        await self._db.commit()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = f"Failed to delete task {task_id!r}"
        logger.exception(
            PERSISTENCE_TASK_DELETE_FAILED,
            task_id=task_id,
            error=str(exc),
        )
        raise QueryError(msg) from exc
    deleted = cursor.rowcount > 0
    logger.debug(PERSISTENCE_TASK_DELETED, task_id=task_id, deleted=deleted)
    return deleted

SQLiteCostRecordRepository

SQLiteCostRecordRepository(db)

SQLite implementation of the CostRecordRepository protocol.

Parameters:

Name Type Description Default
db Connection

An open aiosqlite connection.

required
Source code in src/synthorg/persistence/sqlite/repositories.py
def __init__(self, db: aiosqlite.Connection) -> None:
    self._db = db

save async

save(record)

Persist a cost record (append-only).

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def save(self, record: CostRecord) -> None:
        """Persist a cost record (append-only)."""
        try:
            data = record.model_dump(mode="json")
            await self._db.execute(
                """\
INSERT INTO cost_records (
    agent_id, task_id, provider, model, input_tokens,
    output_tokens, cost_usd, timestamp, call_category
) VALUES (
    :agent_id, :task_id, :provider, :model, :input_tokens,
    :output_tokens, :cost_usd, :timestamp, :call_category
)""",
                data,
            )
            await self._db.commit()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to save cost record for agent {record.agent_id!r}"
            logger.exception(
                PERSISTENCE_COST_RECORD_SAVE_FAILED,
                agent_id=record.agent_id,
                task_id=record.task_id,
                error=str(exc),
            )
            raise QueryError(msg) from exc
        logger.debug(
            PERSISTENCE_COST_RECORD_SAVED,
            agent_id=record.agent_id,
            task_id=record.task_id,
        )

query async

query(*, agent_id=None, task_id=None)

Query cost records with optional filters.

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def query(
        self,
        *,
        agent_id: str | None = None,
        task_id: str | None = None,
    ) -> tuple[CostRecord, ...]:
        """Query cost records with optional filters."""
        clauses: list[str] = []
        params: list[str] = []
        if agent_id is not None:
            clauses.append("agent_id = ?")
            params.append(agent_id)
        if task_id is not None:
            clauses.append("task_id = ?")
            params.append(task_id)

        sql = """\
SELECT agent_id, task_id, provider, model, input_tokens,
       output_tokens, cost_usd, timestamp, call_category
FROM cost_records"""
        if clauses:
            sql += " WHERE " + " AND ".join(clauses)

        try:
            cursor = await self._db.execute(sql, params)
            rows = await cursor.fetchall()
            records = tuple(CostRecord.model_validate(dict(row)) for row in rows)
        except (
            sqlite3.Error,
            aiosqlite.Error,
            json.JSONDecodeError,
            ValidationError,
        ) as exc:
            msg = "Failed to query cost records"
            logger.exception(PERSISTENCE_COST_RECORD_QUERY_FAILED, error=str(exc))
            raise QueryError(msg) from exc
        logger.debug(PERSISTENCE_COST_RECORD_QUERIED, count=len(records))
        return records

aggregate async

aggregate(*, agent_id=None, task_id=None)

Sum total cost_usd, optionally filtered by agent and/or task.

Source code in src/synthorg/persistence/sqlite/repositories.py
async def aggregate(
    self,
    *,
    agent_id: str | None = None,
    task_id: str | None = None,
) -> float:
    """Sum total cost_usd, optionally filtered by agent and/or task."""
    try:
        sql = "SELECT COALESCE(SUM(cost_usd), 0.0) FROM cost_records"
        conditions: list[str] = []
        params: list[str] = []
        if agent_id is not None:
            conditions.append("agent_id = ?")
            params.append(agent_id)
        if task_id is not None:
            conditions.append("task_id = ?")
            params.append(task_id)
        if conditions:
            sql += " WHERE " + " AND ".join(conditions)
        cursor = await self._db.execute(sql, tuple(params))
        row = await cursor.fetchone()
    except (sqlite3.Error, aiosqlite.Error) as exc:
        msg = "Failed to aggregate cost records"
        logger.exception(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            error=str(exc),
        )
        raise QueryError(msg) from exc
    if row is None:
        msg = "aggregate query returned no rows"
        logger.error(
            PERSISTENCE_COST_RECORD_AGGREGATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise QueryError(msg)
    total = float(row[0])
    logger.debug(
        PERSISTENCE_COST_RECORD_AGGREGATED,
        agent_id=agent_id,
        total_usd=total,
    )
    return total

SQLiteMessageRepository

SQLiteMessageRepository(db)

SQLite implementation of the MessageRepository protocol.

Parameters:

Name Type Description Default
db Connection

An open aiosqlite connection.

required
Source code in src/synthorg/persistence/sqlite/repositories.py
def __init__(self, db: aiosqlite.Connection) -> None:
    self._db = db

save async

save(message)

Persist a message.

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def save(self, message: Message) -> None:
        """Persist a message."""
        data = message.model_dump(mode="json")
        msg_id = str(message.id)

        try:
            await self._db.execute(
                """\
INSERT INTO messages (
    id, timestamp, sender, "to", type, priority,
    channel, content, attachments, metadata
) VALUES (
    :id, :timestamp, :sender, :to, :type, :priority,
    :channel, :content, :attachments, :metadata
)""",
                {
                    "id": msg_id,
                    "timestamp": data["timestamp"],
                    "sender": data["sender"],
                    "to": data["to"],
                    "type": data["type"],
                    "priority": data["priority"],
                    "channel": data["channel"],
                    "content": data["content"],
                    "attachments": json.dumps(data["attachments"]),
                    "metadata": json.dumps(data["metadata"]),
                },
            )
            await self._db.commit()
        except sqlite3.IntegrityError as exc:
            error_text = str(exc)
            is_duplicate_id = (
                "UNIQUE constraint failed: messages.id" in error_text
                or "PRIMARY KEY" in error_text
            )
            if is_duplicate_id:
                err_msg = f"Message {msg_id} already exists"
                logger.warning(PERSISTENCE_MESSAGE_DUPLICATE, message_id=msg_id)
                raise DuplicateRecordError(err_msg) from exc
            # Other integrity errors (NOT NULL, different UNIQUE).
            msg = f"Failed to save message {msg_id!r}"
            logger.exception(
                PERSISTENCE_MESSAGE_SAVE_FAILED,
                message_id=msg_id,
                error=error_text,
            )
            raise QueryError(msg) from exc
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to save message {msg_id!r}"
            logger.exception(
                PERSISTENCE_MESSAGE_SAVE_FAILED,
                message_id=msg_id,
                error=str(exc),
            )
            raise QueryError(msg) from exc
        logger.debug(PERSISTENCE_MESSAGE_SAVED, message_id=msg_id)

get_history async

get_history(channel, *, limit=None)

Retrieve message history for a channel, newest first.

Source code in src/synthorg/persistence/sqlite/repositories.py
    async def get_history(
        self,
        channel: str,
        *,
        limit: int | None = None,
    ) -> tuple[Message, ...]:
        """Retrieve message history for a channel, newest first."""
        if limit is not None and limit < 1:
            msg = f"limit must be a positive integer, got {limit}"
            raise QueryError(msg)
        sql = """\
SELECT id, timestamp, sender, "to", type, priority,
       channel, content, attachments, metadata
FROM messages
WHERE channel = ?
ORDER BY timestamp DESC"""
        params: list[object] = [channel]
        if limit is not None:
            sql += " LIMIT ?"
            params.append(limit)

        try:
            cursor = await self._db.execute(sql, params)
            rows = await cursor.fetchall()
        except (sqlite3.Error, aiosqlite.Error) as exc:
            msg = f"Failed to fetch message history for channel {channel!r}"
            logger.exception(
                PERSISTENCE_MESSAGE_HISTORY_FAILED,
                channel=channel,
                error=str(exc),
            )
            raise QueryError(msg) from exc
        messages = tuple(self._row_to_message(row) for row in rows)
        logger.debug(
            PERSISTENCE_MESSAGE_HISTORY_FETCHED,
            channel=channel,
            count=len(messages),
        )
        return messages