Operational Data Persistence¶
Agent memory is handled by the MemoryBackend protocol (Mem0 initial, custom stack future;
see Decision Log). Operational data (tasks, cost records,
messages, audit logs) is a separate concern managed by a pluggable PersistenceBackend protocol.
Application code depends only on repository protocols; the storage engine is an implementation
detail swappable via config.
See also: Memory and Persistence (agent memory), Memory Learning (procedural memory auto-generation + injection strategies), Shared Organizational Memory.
Architecture¶
Protocol Design¶
@runtime_checkable
class PersistenceBackend(Protocol):
"""Lifecycle management for operational data storage."""
async def connect(self) -> None: ...
async def disconnect(self) -> None: ...
async def health_check(self) -> bool: ...
async def migrate(self) -> None: ...
@property
def is_connected(self) -> bool: ...
@property
def backend_name(self) -> NotBlankStr: ...
@property
def tasks(self) -> TaskRepository: ...
@property
def cost_records(self) -> CostRecordRepository: ...
@property
def messages(self) -> MessageRepository: ...
# ... plus lifecycle_events, task_metrics, collaboration_metrics,
# parked_contexts, audit_entries, users, api_keys, checkpoints,
# heartbeats, agent_states, settings, artifacts, projects,
# custom_presets
Each entity type has its own repository protocol:
@runtime_checkable
class TaskRepository(Protocol):
"""CRUD + query interface for Task persistence."""
async def save(self, task: Task) -> None: ...
async def get(self, task_id: str) -> Task | None: ...
async def list_tasks(self, *, status: TaskStatus | None = None, assigned_to: str | None = None, project: str | None = None) -> tuple[Task, ...]: ...
async def delete(self, task_id: str) -> bool: ...
@runtime_checkable
class CostRecordRepository(Protocol):
"""CRUD + aggregation interface for CostRecord persistence."""
async def save(self, record: CostRecord) -> None: ...
async def query(self, *, agent_id: str | None = None, task_id: str | None = None) -> tuple[CostRecord, ...]: ...
async def aggregate(self, *, agent_id: str | None = None) -> float: ...
@runtime_checkable
class MessageRepository(Protocol):
"""CRUD + query interface for Message persistence."""
async def save(self, message: Message) -> None: ...
async def get_history(self, channel: str, *, limit: int | None = None) -> tuple[Message, ...]: ...
Configuration¶
persistence:
backend: "sqlite" # sqlite, postgres (mariadb future)
sqlite:
path: "/data/synthorg.db" # database file path (mounted volume in Docker)
wal_mode: true # WAL for concurrent read performance
journal_size_limit: 67108864 # 64 MB WAL journal limit
postgres: # requires `synthorg[postgres]` extra
host: "db.internal"
port: 5432
database: "synthorg"
username: "synthorg_app"
password: "${POSTGRES_PASSWORD}" # SecretStr -- redacted from logs
ssl_mode: "verify-full" # production: verify-full authenticates the server's certificate chain AND hostname; "require" only encrypts the transport without verifying identity (MITM-vulnerable). Use verify-ca or verify-full in any deployment exposed to a network path you do not fully control.
pool_min_size: 1
pool_max_size: 10
pool_timeout_seconds: 30.0
application_name: "synthorg"
statement_timeout_ms: 30000 # server-side per-query limit
connect_timeout_seconds: 10.0
# mariadb: # future
# url: "mariadb://user:pass@host:3306/synthorg"
# pool_size: 10
Entities Persisted¶
| Entity | Source Module | Repository | Key Queries |
|---|---|---|---|
Task |
core/task.py |
TaskRepository |
by status, by assignee, by project |
CostRecord |
budget/cost_record.py |
CostRecordRepository |
by agent, by task, aggregations |
Message |
communication/message.py |
MessageRepository |
by channel |
AuditEntry |
security/models.py |
AuditRepository |
by agent, by action type, by verdict, by risk level, time range |
ParkedContext |
security/timeout/parked_context.py |
ParkedContextRepository |
by execution_id, by agent_id, by task_id |
AgentRuntimeState |
engine/agent_state.py |
AgentStateRepository |
by agent_id, active agents |
| Setting | settings/models.py |
SettingsRepository |
by namespace+key, by namespace, all |
Artifact |
core/artifact.py |
ArtifactRepository |
by task_id, by created_by, by artifact_type |
HandoffArtifact |
engine/workflow/handoff.py |
(in-memory, per-execution frame) | Structured inter-stage handoff; artifact_refs resolve through ArtifactRepository. See Verification & Quality: Verification Stage |
Project |
core/project.py |
ProjectRepository |
by status, by lead |
DecisionRecord |
engine/decisions.py |
DecisionRepository |
by task_id (version ASC), by agent (role=executor or reviewer, recorded_at DESC) |
| Custom preset | templates/preset_service.py |
PersonalityPresetRepository |
by name |
Schema Strategy¶
- Declared schema in
schema.sql, applied via yoyo-migrations revision files underpersistence/<backend>/revisions/. - At startup,
PersistenceBackend.migrate()callssynthorg.persistence.migrations.migrate_apply()which wraps yoyo in-process. - Yoyo tracks applied versions plus content hashes in its
_yoyo_migrationtable (and a per-action audit log in_yoyo_log); no hand-rolled version tracking. - Both persistence and ontology tables are consolidated into a single
schema.sql(same database file). - CI runs
scripts/check_schema_drift_revisions.pyon every PR (declaredschema.sqlvs accumulated revisions, per backend). - Squashing: a single-baseline squash collapses every revision into one seed file derived from
schema.sql(run rarely; documented indocs/guides/persistence-migrations.md).
Key Principles¶
- Application code never imports a concrete backend
- Only repository protocols are used. This ensures complete decoupling from the storage engine.
- Adding a new backend requires no changes to consumers
- Implement
PersistenceBackend+ all repository protocols. Existing application code works unchanged. - Same entity models everywhere
- Repositories accept and return the existing frozen Pydantic models (
Task,CostRecord,Message). No ORM models or data transfer objects. - Async throughout
- All repository methods are async, matching the framework's concurrency model.
Multi-Tenancy¶
Each company gets its own database. The PersistenceConfig embedded in a company's RootConfig
specifies the backend type and connection details (e.g., a unique SQLite file path or PostgreSQL
database URL). The create_backend(config) factory returns an isolated PersistenceBackend
instance per company; no shared state, no cross-company data leakage.
# One database per company -- configured in each company's YAML
company_a_backend = create_backend(company_a_config.persistence)
company_b_backend = create_backend(company_b_config.persistence)
# Each backend has independent lifecycle: connect -> migrate -> use -> disconnect
Planned
Runtime backend switching (e.g., migrating a company from SQLite to PostgreSQL during operation) is a planned future capability. The protocol-based design already supports this: the engine would disconnect the current backend, connect a new one with different config, and migrate. Implementation details (data migration tooling, zero-downtime switchover, connection draining) are deferred to the PostgreSQL backend implementation.
Optional Repository Capability Extensions¶
Some features are meaningful only for specific backends. The persistence layer exposes these
via optional, runtime-checkable Protocol extensions that concrete repositories may implement
without polluting the core PersistenceBackend protocol. SQLite-only or Postgres-only features
belong here.
Pattern:
- Define a new
@runtime_checkableProtocolinsynthorg/persistence/<feature>_capability.py. - The Postgres repository implements the protocol by adding the required methods to its class; the SQLite repository simply does not implement them.
- Call sites use
isinstance(repo, MyCapability)before invoking the extension methods, and either skip the feature or raiseHTTP 422(ClientException(status_code=422, ...)) when the active backend doesn't support it. - All user-facing parameters (column names, path expressions) are validated against an allowlist or regex before being woven into SQL. Value-side parameters are always passed via psycopg placeholders, never interpolated.
Reference implementation: JsonbQueryCapability in
src/synthorg/persistence/jsonb_capability.py exposes query_jsonb_contains and
query_jsonb_key_exists methods on PostgresAuditRepository. These use the GIN-indexed
@> and ? JSONB operators on audit_entries.matched_rules. The audit search endpoint
checks capability via isinstance and returns HTTP 422 on SQLite.
This pattern keeps the base PersistenceBackend protocol clean while still letting specific
backends expose their unique strengths. Future capabilities (e.g. full-text search, vector
similarity, time-series window functions) should follow the same template.
Database-Enforced Invariants¶
Critical invariants that cannot be violated under any deployment (including multi-instance Postgres clusters) are enforced by database constraint triggers rather than in-process application locks. The triggers are the sole source of truth; the application catches constraint violations and maps them to domain errors.
| Invariant | Enforcement mechanism | Postgres object | SQLite object |
|---|---|---|---|
| At most one CEO | Partial unique index | idx_single_ceo |
idx_single_ceo |
| At least one CEO | AFTER UPDATE constraint trigger (deferrable) |
trg_enforce_ceo_minimum |
enforce_ceo_minimum (BEFORE UPDATE) |
| At least one owner | AFTER UPDATE constraint trigger (deferrable) |
trg_enforce_owner_minimum |
enforce_owner_minimum (BEFORE UPDATE) |
| Unique username | Column UNIQUE constraint |
users_username_key |
users.username |
When the repo's save() raises a driver exception, _classify_postgres_user_error (or
_classify_sqlite_user_error) maps it to a stable constraint token and the repository raises
ConstraintViolationError(constraint=<token>). The controller matches on the structural token
rather than parsing DB error messages, so the mapping is stable across driver versions.
Service-layer mutations on the settings table (company, departments, agents) use
compare-and-swap optimistic concurrency via the existing updated_at column and
SettingsRepository.set(expected_updated_at=...). Each mutation retries once on
VersionConflictError and raises if the retry also fails. Retry attempts are logged via
API_CONCURRENCY_CONFLICT for observability. This replaces the module-level asyncio.Lock
instances that were only safe within a single process.