Skip to content

Memory

Persistent agent memory -- protocol, retrieval pipeline, shared org memory, consolidation, and archival.

Protocol

protocol

MemoryBackend protocol -- lifecycle + memory operations.

Application code depends on this protocol for agent memory storage and retrieval. Concrete backends implement this protocol to provide connection management, health monitoring, and memory CRUD operations.

MemoryBackend

Bases: Protocol

Structural interface for agent memory storage backends.

Concrete backends implement this protocol to provide per-agent memory storage, retrieval, and lifecycle management.

All CRUD operations (store, retrieve, get, delete, count) require a connected backend and raise MemoryConnectionError if called before connect().

Attributes:

Name Type Description
is_connected bool

Whether the backend has an active connection.

backend_name NotBlankStr

Human-readable backend identifier.

is_connected property

is_connected

Whether the backend has an active connection.

backend_name property

backend_name

Human-readable backend identifier (e.g. "mem0").

connect async

connect()

Establish connection to the memory backend.

Raises:

Type Description
MemoryConnectionError

If the connection cannot be established.

Source code in src/synthorg/memory/protocol.py
async def connect(self) -> None:
    """Establish connection to the memory backend.

    Raises:
        MemoryConnectionError: If the connection cannot be
            established.
    """
    ...

disconnect async

disconnect()

Close the memory backend connection.

Safe to call even if not connected.

Source code in src/synthorg/memory/protocol.py
async def disconnect(self) -> None:
    """Close the memory backend connection.

    Safe to call even if not connected.
    """
    ...

health_check async

health_check()

Check whether the backend is healthy and responsive.

Returns:

Type Description
bool

True if the backend is reachable and operational,

bool

False if unreachable or unhealthy.

Note

Implementations should catch connection-level errors, log them at WARNING level with full exception context, and return False. The caught exception must never be silently discarded. Only raise for programming errors (e.g. backend not initialized).

Source code in src/synthorg/memory/protocol.py
async def health_check(self) -> bool:
    """Check whether the backend is healthy and responsive.

    Returns:
        ``True`` if the backend is reachable and operational,
        ``False`` if unreachable or unhealthy.

    Note:
        Implementations should catch connection-level errors,
        log them at WARNING level with full exception context,
        and return ``False``.  The caught exception must never be
        silently discarded.  Only raise for programming errors
        (e.g. backend not initialized).
    """
    ...

store async

store(agent_id, request)

Store a memory entry for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
request MemoryStoreRequest

Memory content and metadata.

required

Returns:

Type Description
NotBlankStr

The backend-assigned memory ID.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryStoreError

If the store operation fails.

Source code in src/synthorg/memory/protocol.py
async def store(
    self,
    agent_id: NotBlankStr,
    request: MemoryStoreRequest,
) -> NotBlankStr:
    """Store a memory entry for an agent.

    Args:
        agent_id: Owning agent identifier.
        request: Memory content and metadata.

    Returns:
        The backend-assigned memory ID.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryStoreError: If the store operation fails.
    """
    ...

retrieve async

retrieve(agent_id, query)

Retrieve memories for an agent, ordered by relevance.

When query.text is None, performs metadata-only filtering (no semantic search).

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
query MemoryQuery

Retrieval parameters.

required

Returns:

Type Description
tuple[MemoryEntry, ...]

Matching memory entries ordered by relevance.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryRetrievalError

If the retrieval fails.

Source code in src/synthorg/memory/protocol.py
async def retrieve(
    self,
    agent_id: NotBlankStr,
    query: MemoryQuery,
) -> tuple[MemoryEntry, ...]:
    """Retrieve memories for an agent, ordered by relevance.

    When ``query.text`` is ``None``, performs metadata-only
    filtering (no semantic search).

    Args:
        agent_id: Owning agent identifier.
        query: Retrieval parameters.

    Returns:
        Matching memory entries ordered by relevance.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryRetrievalError: If the retrieval fails.
    """
    ...

get async

get(agent_id, memory_id)

Get a specific memory entry by ID.

Returns None when the entry does not exist -- MemoryNotFoundError is never raised by this method.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
memory_id NotBlankStr

Memory identifier.

required

Returns:

Type Description
MemoryEntry | None

The memory entry, or None if not found.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryRetrievalError

If the backend query fails.

Source code in src/synthorg/memory/protocol.py
async def get(
    self,
    agent_id: NotBlankStr,
    memory_id: NotBlankStr,
) -> MemoryEntry | None:
    """Get a specific memory entry by ID.

    Returns ``None`` when the entry does not exist --
    ``MemoryNotFoundError`` is never raised by this method.

    Args:
        agent_id: Owning agent identifier.
        memory_id: Memory identifier.

    Returns:
        The memory entry, or ``None`` if not found.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryRetrievalError: If the backend query fails.
    """
    ...

delete async

delete(agent_id, memory_id)

Delete a specific memory entry.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
memory_id NotBlankStr

Memory identifier.

required

Returns:

Type Description
bool

True if the entry was deleted, False if not found.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryStoreError

If the delete operation fails.

Source code in src/synthorg/memory/protocol.py
async def delete(
    self,
    agent_id: NotBlankStr,
    memory_id: NotBlankStr,
) -> bool:
    """Delete a specific memory entry.

    Args:
        agent_id: Owning agent identifier.
        memory_id: Memory identifier.

    Returns:
        ``True`` if the entry was deleted, ``False`` if not found.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryStoreError: If the delete operation fails.
    """
    ...

count async

count(agent_id, *, category=None)

Count memory entries for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
category MemoryCategory | None

Optional category filter.

None

Returns:

Type Description
int

Number of matching entries.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryRetrievalError

If the count query fails.

Source code in src/synthorg/memory/protocol.py
async def count(
    self,
    agent_id: NotBlankStr,
    *,
    category: MemoryCategory | None = None,
) -> int:
    """Count memory entries for an agent.

    Args:
        agent_id: Owning agent identifier.
        category: Optional category filter.

    Returns:
        Number of matching entries.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryRetrievalError: If the count query fails.
    """
    ...

Config

config

Memory configuration models.

Frozen Pydantic models for company-wide memory backend selection and backend-specific settings.

MemoryStorageConfig pydantic-model

Bases: BaseModel

Storage-specific memory configuration.

Attributes:

Name Type Description
data_dir NotBlankStr

Directory path for memory data persistence.

vector_store NotBlankStr

Vector store backend name.

history_store NotBlankStr

History store backend name.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_store_names
  • _reject_traversal

data_dir pydantic-field

data_dir = '/data/memory'

Directory path for memory data persistence. Default targets a Docker volume mount -- override for local development.

vector_store pydantic-field

vector_store = 'qdrant'

Vector store backend name

history_store pydantic-field

history_store = 'sqlite'

History store backend name

MemoryOptionsConfig pydantic-model

Bases: BaseModel

Memory behaviour options.

Attributes:

Name Type Description
retention_days int | None

Days to retain memories (None = forever).

max_memories_per_agent int

Maximum memories per agent.

consolidation_interval ConsolidationInterval

How often to consolidate memories.

shared_knowledge_base bool

Whether shared knowledge is enabled.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

retention_days pydantic-field

retention_days = None

Days to retain memories (None = forever)

max_memories_per_agent pydantic-field

max_memories_per_agent = 10000

Maximum memories per agent

consolidation_interval pydantic-field

consolidation_interval = DAILY

How often to consolidate memories

shared_knowledge_base pydantic-field

shared_knowledge_base = True

Whether shared knowledge is enabled

EmbedderOverrideConfig pydantic-model

Bases: BaseModel

User-facing embedder override configuration.

Allows users to override the auto-selected embedding model via company YAML config, runtime settings, or template config. All fields are optional -- None means "use auto-selection".

Attributes:

Name Type Description
provider NotBlankStr | None

Embedding provider name override.

model NotBlankStr | None

Embedding model identifier override.

dims int | None

Embedding vector dimensions (required when model is set, since dimensions are model-dependent).

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

Validators:

  • _model_requires_dims

provider pydantic-field

provider = None

Embedding provider name override

model pydantic-field

model = None

Embedding model identifier override

dims pydantic-field

dims = None

Embedding vector dimensions

CompanyMemoryConfig pydantic-model

Bases: BaseModel

Top-level company-wide memory configuration.

Attributes:

Name Type Description
backend NotBlankStr

Memory backend name (validated against _VALID_BACKENDS).

level MemoryLevel

Default memory persistence level.

storage MemoryStorageConfig

Storage-specific settings.

options MemoryOptionsConfig

Memory behaviour options.

retrieval MemoryRetrievalConfig

Memory retrieval pipeline settings.

consolidation ConsolidationConfig

Memory consolidation settings.

embedder EmbedderOverrideConfig | None

Optional embedder override (None = auto-select).

procedural ProceduralMemoryConfig

Procedural memory auto-generation settings.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_backend_name

backend pydantic-field

backend = 'mem0'

Memory backend name

level pydantic-field

level = SESSION

Default memory persistence level

storage pydantic-field

storage

Storage-specific settings

options pydantic-field

options

Memory behaviour options

retrieval pydantic-field

retrieval

Memory retrieval pipeline settings

consolidation pydantic-field

consolidation

Memory consolidation settings

embedder pydantic-field

embedder = None

Optional embedder override. When set, overrides auto-selection for provider, model, and/or dims.

procedural pydantic-field

procedural

Procedural memory auto-generation settings. Controls whether failure-driven skill proposals are generated, which model to use, and quality thresholds.

Models

models

Memory domain models.

Frozen Pydantic models for memory storage requests, entries, and queries. MemoryStoreRequest is what callers pass to store(); MemoryEntry is what comes back from retrieve().

MemoryMetadata pydantic-model

Bases: BaseModel

Metadata associated with a memory entry.

Attributes:

Name Type Description
source NotBlankStr | None

Origin of the memory (task ID, conversation, etc.).

confidence float

Confidence score for the memory (0.0 to 1.0).

tags tuple[NotBlankStr, ...]

Categorization tags for filtering.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _deduplicate_tags

source pydantic-field

source = None

Origin of the memory

confidence pydantic-field

confidence = 1.0

Confidence score

tags pydantic-field

tags = ()

Categorization tags

MemoryStoreRequest pydantic-model

Bases: BaseModel

Input to MemoryBackend.store().

The backend assigns id and created_at; callers should not fabricate them.

Attributes:

Name Type Description
category MemoryCategory

Memory type category.

content NotBlankStr

Memory content text.

metadata MemoryMetadata

Associated metadata.

expires_at AwareDatetime | None

Optional expiration timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

category pydantic-field

category

Memory type category

content pydantic-field

content

Memory content text

metadata pydantic-field

metadata

Associated metadata

expires_at pydantic-field

expires_at = None

Optional expiration timestamp

MemoryEntry pydantic-model

Bases: BaseModel

A memory entry returned from the backend.

Attributes:

Name Type Description
id NotBlankStr

Unique memory identifier (assigned by backend).

agent_id NotBlankStr

Owning agent identifier.

category MemoryCategory

Memory type category.

content NotBlankStr

Memory content text.

metadata MemoryMetadata

Associated metadata.

created_at AwareDatetime

Creation timestamp.

updated_at AwareDatetime | None

Last update timestamp.

expires_at AwareDatetime | None

Optional expiration timestamp.

relevance_score float | None

Relevance score set by backend on retrieval.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_timestamps

id pydantic-field

id

Unique memory identifier

agent_id pydantic-field

agent_id

Owning agent identifier

category pydantic-field

category

Memory type category

content pydantic-field

content

Memory content text

metadata pydantic-field

metadata

Associated metadata

created_at pydantic-field

created_at

Creation timestamp

updated_at pydantic-field

updated_at = None

Last update timestamp

expires_at pydantic-field

expires_at = None

Optional expiration timestamp

relevance_score pydantic-field

relevance_score = None

Relevance score set by backend on retrieval

MemoryQuery pydantic-model

Bases: BaseModel

Query parameters for MemoryBackend.retrieve().

When text is None, the backend performs metadata-only filtering (no semantic search).

Attributes:

Name Type Description
text NotBlankStr | None

Semantic search text (None for metadata-only).

categories frozenset[MemoryCategory] | None

Filter by memory categories.

tags tuple[NotBlankStr, ...]

Filter by tags (AND semantics).

min_relevance float

Minimum relevance score threshold.

limit int

Maximum number of results.

since AwareDatetime | None

Only memories created at or after this timestamp.

until AwareDatetime | None

Only memories created before this timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _deduplicate_tags
  • _validate_time_range

text pydantic-field

text = None

Semantic search text

categories pydantic-field

categories = None

Filter by memory categories

tags pydantic-field

tags = ()

Filter by tags (AND semantics)

min_relevance pydantic-field

min_relevance = 0.0

Minimum relevance score threshold

limit pydantic-field

limit = 10

Maximum number of results

since pydantic-field

since = None

Only memories created at or after this timestamp

until pydantic-field

until = None

Only memories created before this timestamp

Capabilities

capabilities

MemoryCapabilities protocol -- capability discovery.

Backends that implement MemoryCapabilities expose what features they support, enabling runtime capability checks.

MemoryCapabilities

Bases: Protocol

Capability discovery for memory backends.

Attributes:

Name Type Description
supported_categories frozenset[MemoryCategory]

Memory categories this backend supports.

supports_graph bool

Whether graph-based memory is available.

supports_temporal bool

Whether temporal tracking is available.

supports_vector_search bool

Whether vector/semantic search is available.

supports_shared_access bool

Whether cross-agent shared memory is available.

max_memories_per_agent int | None

Maximum memories per agent, or None for unlimited.

supported_categories property

supported_categories

Memory categories this backend supports.

supports_graph property

supports_graph

Whether graph-based memory is available.

supports_temporal property

supports_temporal

Whether temporal tracking is available.

supports_vector_search

Whether vector/semantic search is available.

supports_shared_access property

supports_shared_access

Whether cross-agent shared memory is available.

max_memories_per_agent property

max_memories_per_agent

Maximum memories per agent, or None for unlimited.

Errors

errors

Memory error hierarchy.

All memory-related errors inherit from MemoryError so callers can catch the entire family with a single except clause.

Note: this shadows the built-in MemoryError (which signals out-of-memory conditions in CPython). Within the synthorg namespace the domain-specific meaning is unambiguous; callers outside the package should import explicitly.

MemoryError

Bases: Exception

Base exception for all memory operations.

MemoryConnectionError

Bases: MemoryError

Raised when a backend connection cannot be established or is lost.

MemoryStoreError

Bases: MemoryError

Raised when a store operation fails.

MemoryRetrievalError

Bases: MemoryError

Raised when a retrieve or search operation fails.

MemoryNotFoundError

Bases: MemoryError

Raised when a specific memory ID is not found.

Note: The MemoryBackend.get() protocol method returns None for missing entries rather than raising this error. This exception is available for concrete backend implementations that need to signal "not found" in non-protocol internal methods or batch operations.

MemoryConfigError

Bases: MemoryError

Raised when memory configuration is invalid.

MemoryCapabilityError

Bases: MemoryError

Raised when an unsupported operation is attempted for a backend.

Factory

factory

Factory for creating memory backends from configuration.

Each company gets its own MemoryBackend instance. The factory dispatches to concrete backend implementations based on config.backend.

create_memory_backend

create_memory_backend(config, *, embedder=None)

Create a memory backend from configuration.

Parameters:

Name Type Description Default
config CompanyMemoryConfig

Memory configuration (includes backend selection and backend-specific settings).

required
embedder Mem0EmbedderConfig | None

Backend-specific embedder configuration. Required for the "mem0" backend (must be a Mem0EmbedderConfig instance).

None

Returns:

Type Description
MemoryBackend

A new, disconnected backend instance. The caller must call

MemoryBackend

connect() before use.

Raises:

Type Description
MemoryConfigError

If the backend is not recognized or required configuration is missing.

Source code in src/synthorg/memory/factory.py
def create_memory_backend(
    config: CompanyMemoryConfig,
    *,
    embedder: Mem0EmbedderConfig | None = None,
) -> MemoryBackend:
    """Create a memory backend from configuration.

    Args:
        config: Memory configuration (includes backend selection and
            backend-specific settings).
        embedder: Backend-specific embedder configuration.  Required
            for the ``"mem0"`` backend (must be a
            ``Mem0EmbedderConfig`` instance).

    Returns:
        A new, disconnected backend instance.  The caller must call
        ``connect()`` before use.

    Raises:
        MemoryConfigError: If the backend is not recognized or
            required configuration is missing.
    """
    if config.backend == "mem0":
        return _create_mem0_backend(config, embedder=embedder)
    # Defensive guard: config validation rejects unknown backends, so
    # this branch is unreachable under normal construction.  It exists
    # as a safety net for callers that bypass validation (e.g. via
    # model_construct).
    msg = f"Unknown memory backend: {config.backend!r}"
    logger.error(MEMORY_BACKEND_UNKNOWN, backend=config.backend)
    raise MemoryConfigError(msg)

Retriever

retriever

Context injection strategy -- pre-retrieves and injects memories.

Orchestrates the full retrieval pipeline: backend query → ranking → budget-fit → format. Implements MemoryInjectionStrategy protocol.

ContextInjectionStrategy

ContextInjectionStrategy(
    *, backend, config, shared_store=None, token_estimator=None, memory_filter=None
)

Context injection strategy -- pre-retrieves and injects memories.

Implements MemoryInjectionStrategy protocol. Orchestrates the full pipeline: retrieve → rank → budget-fit → format.

Initialise the context injection strategy.

Parameters:

Name Type Description Default
backend MemoryBackend

Memory backend for personal memories.

required
config MemoryRetrievalConfig

Retrieval pipeline configuration.

required
shared_store SharedKnowledgeStore | None

Optional shared knowledge store.

None
token_estimator TokenEstimator | None

Optional custom token estimator.

None
memory_filter MemoryFilterStrategy | None

Optional filter applied after ranking, before formatting. When None and config.non_inferable_only is True, a TagBasedMemoryFilter is auto-created. When None and non_inferable_only is False, all ranked memories are injected (backward-compatible).

None
Source code in src/synthorg/memory/retriever.py
def __init__(
    self,
    *,
    backend: MemoryBackend,
    config: MemoryRetrievalConfig,
    shared_store: SharedKnowledgeStore | None = None,
    token_estimator: TokenEstimator | None = None,
    memory_filter: MemoryFilterStrategy | None = None,
) -> None:
    """Initialise the context injection strategy.

    Args:
        backend: Memory backend for personal memories.
        config: Retrieval pipeline configuration.
        shared_store: Optional shared knowledge store.
        token_estimator: Optional custom token estimator.
        memory_filter: Optional filter applied after ranking,
            before formatting.  When ``None`` and
            ``config.non_inferable_only`` is ``True``, a
            ``TagBasedMemoryFilter`` is auto-created.  When ``None``
            and ``non_inferable_only`` is ``False``, all ranked
            memories are injected (backward-compatible).
    """
    self._backend = backend
    self._config = config
    self._shared_store = shared_store
    if memory_filter is None and config.non_inferable_only:
        memory_filter = TagBasedMemoryFilter()
    elif memory_filter is not None and config.non_inferable_only:
        logger.debug(
            MEMORY_FILTER_INIT,
            note="explicit memory_filter overrides non_inferable_only config",
            filter_strategy=getattr(memory_filter, "strategy_name", "unknown"),
        )
    self._memory_filter = memory_filter
    self._estimator = (
        token_estimator if token_estimator is not None else DefaultTokenEstimator()
    )
    logger.debug(
        MEMORY_RETRIEVAL_START,
        strategy="context_injection",
        backend=backend.backend_name
        if hasattr(backend, "backend_name")
        else type(backend).__qualname__,
        has_shared_store=shared_store is not None,
    )

strategy_name property

strategy_name

Human-readable strategy identifier.

Returns:

Type Description
str

"context_injection".

prepare_messages async

prepare_messages(agent_id, query_text, token_budget, *, categories=None)

Full pipeline: retrieve → rank → budget-fit → format.

Returns empty tuple on any failure (graceful degradation). Never raises domain memory errors to the caller. Re-raises builtins.MemoryError and RecursionError.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent requesting memories.

required
query_text NotBlankStr

Text for semantic retrieval.

required
token_budget int

Maximum tokens for memory content.

required
categories frozenset[MemoryCategory] | None

Optional category filter.

None

Returns:

Type Description
tuple[ChatMessage, ...]

Tuple of ChatMessage instances (may be empty).

Source code in src/synthorg/memory/retriever.py
async def prepare_messages(
    self,
    agent_id: NotBlankStr,
    query_text: NotBlankStr,
    token_budget: int,
    *,
    categories: frozenset[MemoryCategory] | None = None,
) -> tuple[ChatMessage, ...]:
    """Full pipeline: retrieve → rank → budget-fit → format.

    Returns empty tuple on any failure (graceful degradation).
    Never raises domain memory errors to the caller.
    Re-raises ``builtins.MemoryError`` and ``RecursionError``.

    Args:
        agent_id: The agent requesting memories.
        query_text: Text for semantic retrieval.
        token_budget: Maximum tokens for memory content.
        categories: Optional category filter.

    Returns:
        Tuple of ``ChatMessage`` instances (may be empty).
    """
    logger.info(
        MEMORY_RETRIEVAL_START,
        agent_id=agent_id,
        token_budget=token_budget,
    )

    if token_budget <= 0:
        logger.info(
            MEMORY_RETRIEVAL_SKIPPED,
            agent_id=agent_id,
            reason="non-positive token budget",
            token_budget=token_budget,
        )
        return ()

    try:
        return await self._execute_pipeline(
            agent_id=agent_id,
            query_text=query_text,
            token_budget=token_budget,
            categories=categories,
        )
    except builtins.MemoryError:
        logger.error(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline",
            agent_id=agent_id,
            error_type="system",
            exc_info=True,
        )
        raise
    except RecursionError:
        logger.error(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline",
            agent_id=agent_id,
            error_type="system",
            exc_info=True,
        )
        raise
    except memory_errors.MemoryError:
        logger.warning(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline",
            agent_id=agent_id,
            exc_info=True,
        )
        return ()
    except Exception as exc:
        # ExceptionGroup may wrap system-level errors that must
        # propagate -- inspect and re-raise them.
        if isinstance(exc, ExceptionGroup):
            system_errors = exc.subgroup(
                lambda e: isinstance(
                    e,
                    builtins.MemoryError | RecursionError,
                ),
            )
            if system_errors is not None:
                logger.error(
                    MEMORY_RETRIEVAL_DEGRADED,
                    source="pipeline",
                    agent_id=agent_id,
                    error_type="system_in_exception_group",
                    exc_info=True,
                )
                raise system_errors.exceptions[0] from exc
        logger.error(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline",
            agent_id=agent_id,
            error_type=type(exc).__qualname__,
            exc_info=True,
        )
        return ()

get_tool_definitions

get_tool_definitions()

Context injection provides no tools.

Returns:

Type Description
tuple[ToolDefinition, ...]

Empty tuple.

Source code in src/synthorg/memory/retriever.py
def get_tool_definitions(self) -> tuple[ToolDefinition, ...]:
    """Context injection provides no tools.

    Returns:
        Empty tuple.
    """
    return ()

Retrieval Config

retrieval_config

Memory retrieval pipeline configuration.

Frozen Pydantic config for the retrieval pipeline -- weights, thresholds, and strategy selection.

MemoryRetrievalConfig pydantic-model

Bases: BaseModel

Configuration for the memory retrieval and ranking pipeline.

Attributes:

Name Type Description
strategy InjectionStrategy

Which injection strategy to use.

relevance_weight float

Weight for backend relevance score (0.0-1.0).

recency_weight float

Weight for recency decay score (0.0-1.0).

recency_decay_rate float

Exponential decay rate per hour.

personal_boost float

Boost applied to personal over shared (0.0-1.0).

min_relevance float

Minimum combined (relevance + recency) score to include.

max_memories int

Maximum candidates to retrieve (1-100).

include_shared bool

Whether to query SharedKnowledgeStore.

default_relevance float

Score for entries missing relevance_score.

injection_point InjectionPoint

Message role for context injection.

non_inferable_only bool

When True, auto-creates a TagBasedMemoryFilter in ContextInjectionStrategy if no explicit filter is provided.

fusion_strategy FusionStrategy

Ranking fusion strategy -- LINEAR for single-source relevance+recency, RRF for multi-source ranked list merging.

rrf_k int

RRF smoothing constant (1-1000, only used with RRF strategy).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_weight_sum
  • _validate_rrf_k_strategy_consistency
  • _validate_reformulation_not_supported
  • _validate_personal_boost_rrf_consistency
  • _validate_supported_strategy

strategy pydantic-field

strategy = CONTEXT

Which injection strategy to use

relevance_weight pydantic-field

relevance_weight = 0.7

Weight for backend relevance score

recency_weight pydantic-field

recency_weight = 0.3

Weight for recency decay score

recency_decay_rate pydantic-field

recency_decay_rate = 0.01

Exponential decay rate per hour

personal_boost pydantic-field

personal_boost = 0.1

Boost applied to personal over shared memories

min_relevance pydantic-field

min_relevance = 0.3

Minimum combined (relevance + recency) score to include

max_memories pydantic-field

max_memories = 20

Maximum candidates to retrieve

include_shared pydantic-field

include_shared = True

Whether to query SharedKnowledgeStore

default_relevance pydantic-field

default_relevance = 0.5

Score for entries missing relevance_score

injection_point pydantic-field

injection_point = SYSTEM

Message role for context injection

non_inferable_only pydantic-field

non_inferable_only = False

When True, only inject memories tagged as non-inferable

fusion_strategy pydantic-field

fusion_strategy = LINEAR

Ranking fusion strategy: linear for single-source relevance+recency, rrf for multi-source ranked list merging

rrf_k pydantic-field

rrf_k = _DEFAULT_RRF_K

RRF smoothing constant k (only used with RRF strategy)

query_reformulation_enabled pydantic-field

query_reformulation_enabled = False

Reserved for future query reformulation support in the TOOL_BASED strategy. Not yet wired into the retrieval pipeline -- must remain False until implemented.

max_reformulation_rounds pydantic-field

max_reformulation_rounds = 2

Reserved for future query reformulation support (1-5). Currently unused until reformulation is wired.

Ranking

ranking

Memory ranking -- scoring and sorting functions.

All functions are functionally pure (deterministic given the same inputs). Logging calls are the only side effect.

rank_memories scores entries via linear combination of relevance and recency (single-source). fuse_ranked_lists merges multiple pre-ranked lists via Reciprocal Rank Fusion (multi-source).

FusionStrategy

Bases: StrEnum

Ranking fusion strategy selection.

Attributes:

Name Type Description
LINEAR

Weighted linear combination of relevance and recency (default, for single-source scoring).

RRF

Reciprocal Rank Fusion for merging multiple ranked lists (for multi-source hybrid search).

ScoredMemory pydantic-model

Bases: BaseModel

Memory entry with computed ranking scores.

Attributes:

Name Type Description
entry MemoryEntry

The original memory entry.

relevance_score float

Relevance score after pipeline-specific transformations (0.0-1.0).

recency_score float

Exponential decay based on age (0.0-1.0). Always 0.0 for RRF-produced results.

combined_score float

Final ranking signal (0.0-1.0). For linear ranking this is a weighted combination; for RRF this is the normalized fusion score.

is_shared bool

Whether this came from SharedKnowledgeStore.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

entry pydantic-field

entry

The original memory entry

relevance_score pydantic-field

relevance_score

Relevance score (after boost)

recency_score pydantic-field

recency_score

Recency decay score

combined_score pydantic-field

combined_score

Weighted combination score

is_shared pydantic-field

is_shared = False

Whether from SharedKnowledgeStore

compute_recency_score

compute_recency_score(created_at, now, decay_rate)

Compute exponential recency decay score.

exp(-decay_rate * age_hours). Returns 1.0 for zero age, decays toward 0.0 over time. Future timestamps are clamped to 1.0.

Parameters:

Name Type Description Default
created_at datetime

When the memory was created.

required
now datetime

Current timestamp for age calculation.

required
decay_rate float

Exponential decay rate per hour.

required

Returns:

Type Description
float

Recency score between 0.0 and 1.0.

Source code in src/synthorg/memory/ranking.py
def compute_recency_score(
    created_at: datetime,
    now: datetime,
    decay_rate: float,
) -> float:
    """Compute exponential recency decay score.

    ``exp(-decay_rate * age_hours)``.  Returns 1.0 for zero age,
    decays toward 0.0 over time.  Future timestamps are clamped to
    1.0.

    Args:
        created_at: When the memory was created.
        now: Current timestamp for age calculation.
        decay_rate: Exponential decay rate per hour.

    Returns:
        Recency score between 0.0 and 1.0.
    """
    age_seconds = (now - created_at).total_seconds()
    if age_seconds <= 0:
        return 1.0
    age_hours = age_seconds / 3600.0
    return math.exp(-decay_rate * age_hours)

compute_combined_score

compute_combined_score(relevance, recency, relevance_weight, recency_weight)

Weighted linear combination of relevance and recency.

Parameters:

Name Type Description Default
relevance float

Relevance score (0.0-1.0).

required
recency float

Recency score (0.0-1.0).

required
relevance_weight float

Weight for relevance.

required
recency_weight float

Weight for recency.

required

Returns:

Type Description
float

Combined score clamped to [0.0, 1.0]. When

float

relevance_weight + recency_weight == 1.0 and inputs are

float

in [0.0, 1.0], the result is naturally bounded; the clamp

float

guards against floating-point tolerance in the weight sum.

Source code in src/synthorg/memory/ranking.py
def compute_combined_score(
    relevance: float,
    recency: float,
    relevance_weight: float,
    recency_weight: float,
) -> float:
    """Weighted linear combination of relevance and recency.

    Args:
        relevance: Relevance score (0.0-1.0).
        recency: Recency score (0.0-1.0).
        relevance_weight: Weight for relevance.
        recency_weight: Weight for recency.

    Returns:
        Combined score clamped to [0.0, 1.0].  When
        ``relevance_weight + recency_weight == 1.0`` and inputs are
        in [0.0, 1.0], the result is naturally bounded; the clamp
        guards against floating-point tolerance in the weight sum.
    """
    return min(1.0, relevance_weight * relevance + recency_weight * recency)

rank_memories

rank_memories(entries, *, config, now, shared_entries=())

Score, merge, sort, filter, and truncate memory entries.

  1. Score personal entries (with personal_boost).
  2. Score shared entries (no boost).
  3. Merge both sets.
  4. Filter by min_relevance threshold on combined_score.
  5. Sort descending by combined_score.
  6. Truncate to max_memories.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Personal memory entries.

required
config MemoryRetrievalConfig

Retrieval pipeline configuration.

required
now datetime

Current timestamp for recency calculations.

required
shared_entries tuple[MemoryEntry, ...]

Shared memory entries (no personal boost).

()

Returns:

Type Description
tuple[ScoredMemory, ...]

Sorted and filtered tuple of ScoredMemory.

Source code in src/synthorg/memory/ranking.py
def rank_memories(
    entries: tuple[MemoryEntry, ...],
    *,
    config: MemoryRetrievalConfig,
    now: datetime,
    shared_entries: tuple[MemoryEntry, ...] = (),
) -> tuple[ScoredMemory, ...]:
    """Score, merge, sort, filter, and truncate memory entries.

    1. Score personal entries (with ``personal_boost``).
    2. Score shared entries (no boost).
    3. Merge both sets.
    4. Filter by ``min_relevance`` threshold on ``combined_score``.
    5. Sort descending by ``combined_score``.
    6. Truncate to ``max_memories``.

    Args:
        entries: Personal memory entries.
        config: Retrieval pipeline configuration.
        now: Current timestamp for recency calculations.
        shared_entries: Shared memory entries (no personal boost).

    Returns:
        Sorted and filtered tuple of ``ScoredMemory``.
    """
    scored = [
        _score_entry(entry, config=config, now=now, is_shared=False)
        for entry in entries
    ]
    scored.extend(
        _score_entry(entry, config=config, now=now, is_shared=True)
        for entry in shared_entries
    )

    filtered = [s for s in scored if s.combined_score >= config.min_relevance]
    filtered.sort(key=lambda s: s.combined_score, reverse=True)

    result = tuple(filtered[: config.max_memories])

    logger.debug(
        MEMORY_RANKING_COMPLETE,
        total_candidates=len(scored),
        after_filter=len(filtered),
        after_truncation=len(result),
        min_relevance=config.min_relevance,
        max_memories=config.max_memories,
    )

    return result

fuse_ranked_lists

fuse_ranked_lists(ranked_lists, *, k=60, max_results=20)

Merge multiple pre-ranked lists via Reciprocal Rank Fusion.

RRF_score(doc) = sum(1 / (k + rank_i)) across all lists containing the document. Scores are min-max normalized to [0.0, 1.0].

For RRF output, only combined_score is the meaningful ranking signal. relevance_score preserves the entry's raw backend relevance (or 0.0 if absent); recency_score is 0.0.

When the same entry ID appears in multiple lists, the first MemoryEntry object encountered is retained.

Unlike rank_memories, this function does not apply a min_relevance threshold -- callers are responsible for post-filtering if needed.

Parameters:

Name Type Description Default
ranked_lists tuple[tuple[MemoryEntry, ...], ...]

Each inner tuple is a pre-sorted ranked list of memory entries (best first).

required
k int

RRF smoothing constant (default 60, must be >= 1). Smaller values amplify rank differences.

60
max_results int

Maximum entries to return (must be >= 1).

20

Returns:

Type Description
tuple[ScoredMemory, ...]

Sorted tuple of ScoredMemory by descending RRF score.

Raises:

Type Description
ValueError

If k < 1 or max_results < 1.

Source code in src/synthorg/memory/ranking.py
def fuse_ranked_lists(
    ranked_lists: tuple[tuple[MemoryEntry, ...], ...],
    *,
    k: int = 60,
    max_results: int = 20,
) -> tuple[ScoredMemory, ...]:
    """Merge multiple pre-ranked lists via Reciprocal Rank Fusion.

    ``RRF_score(doc) = sum(1 / (k + rank_i))`` across all lists
    containing the document.  Scores are min-max normalized to
    [0.0, 1.0].

    For RRF output, only ``combined_score`` is the meaningful
    ranking signal.  ``relevance_score`` preserves the entry's raw
    backend relevance (or 0.0 if absent); ``recency_score`` is 0.0.

    When the same entry ID appears in multiple lists, the first
    ``MemoryEntry`` object encountered is retained.

    Unlike ``rank_memories``, this function does **not** apply a
    ``min_relevance`` threshold -- callers are responsible for
    post-filtering if needed.

    Args:
        ranked_lists: Each inner tuple is a pre-sorted ranked list
            of memory entries (best first).
        k: RRF smoothing constant (default 60, must be >= 1).
            Smaller values amplify rank differences.
        max_results: Maximum entries to return (must be >= 1).

    Returns:
        Sorted tuple of ``ScoredMemory`` by descending RRF score.

    Raises:
        ValueError: If ``k < 1`` or ``max_results < 1``.
    """
    if k < 1:
        msg = f"k must be >= 1, got {k}"
        logger.warning(MEMORY_RRF_VALIDATION_FAILED, param="k", value=k)
        raise ValueError(msg)
    if max_results < 1:
        msg = f"max_results must be >= 1, got {max_results}"
        logger.warning(
            MEMORY_RRF_VALIDATION_FAILED,
            param="max_results",
            value=max_results,
        )
        raise ValueError(msg)

    scores, entries, duplicate_count = _accumulate_rrf_scores(ranked_lists, k)

    if not entries:
        logger.info(
            MEMORY_RRF_FUSION_COMPLETE,
            num_lists=len(ranked_lists),
            unique_entries=0,
            after_truncation=0,
            duplicate_ids_skipped=duplicate_count,
            k=k,
        )
        return ()

    normalized = _normalize_rrf_scores(scores)
    scored_list = _build_rrf_scored_memories(entries, normalized)
    scored_list.sort(key=lambda s: s.combined_score, reverse=True)
    result = tuple(scored_list[:max_results])

    logger.info(
        MEMORY_RRF_FUSION_COMPLETE,
        num_lists=len(ranked_lists),
        unique_entries=len(entries),
        after_truncation=len(result),
        duplicate_ids_skipped=duplicate_count,
        k=k,
    )

    return result

Filter

filter

Memory filter strategies for non-inferable principle enforcement.

Filters scored memories before injection into agent prompts. The TagBasedMemoryFilter (initial D23 implementation) retains only memories tagged with "non-inferable"; the PassthroughMemoryFilter is a no-op for backward compatibility and testing.

Both satisfy the MemoryFilterStrategy runtime-checkable protocol.

MemoryFilterStrategy

Bases: Protocol

Protocol for filtering scored memories before prompt injection.

strategy_name property

strategy_name

Human-readable name of the filter strategy.

filter_for_injection

filter_for_injection(memories)

Filter memories suitable for injection.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Ranked scored memories from the retrieval pipeline.

required

Returns:

Type Description
tuple[ScoredMemory, ...]

Subset of memories that pass the filter.

Source code in src/synthorg/memory/filter.py
def filter_for_injection(
    self,
    memories: tuple[ScoredMemory, ...],
) -> tuple[ScoredMemory, ...]:
    """Filter memories suitable for injection.

    Args:
        memories: Ranked scored memories from the retrieval pipeline.

    Returns:
        Subset of memories that pass the filter.
    """
    ...

TagBasedMemoryFilter

TagBasedMemoryFilter(required_tag=NON_INFERABLE_TAG)

Filter that retains only memories with a required tag.

The default required tag is "non-inferable" per D23. Memories whose entry.metadata.tags do not contain the required tag are excluded from prompt injection.

Parameters:

Name Type Description Default
required_tag str

Tag that must be present for a memory to pass.

NON_INFERABLE_TAG
Source code in src/synthorg/memory/filter.py
def __init__(self, required_tag: str = NON_INFERABLE_TAG) -> None:
    if not isinstance(required_tag, str) or not required_tag.strip():
        msg = "required_tag must be a non-empty string"
        raise ValueError(msg)
    self._required_tag = required_tag.strip()
    logger.debug(
        MEMORY_FILTER_INIT,
        strategy=self.strategy_name,
        required_tag=required_tag,
    )

strategy_name property

strategy_name

Human-readable name of the filter strategy.

Returns:

Type Description
str

"tag_based".

filter_for_injection

filter_for_injection(memories)

Return only memories containing the required tag.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Ranked scored memories.

required

Returns:

Type Description
tuple[ScoredMemory, ...]

Filtered tuple with only tagged memories.

Source code in src/synthorg/memory/filter.py
def filter_for_injection(
    self,
    memories: tuple[ScoredMemory, ...],
) -> tuple[ScoredMemory, ...]:
    """Return only memories containing the required tag.

    Args:
        memories: Ranked scored memories.

    Returns:
        Filtered tuple with only tagged memories.
    """
    retained = tuple(
        m for m in memories if self._required_tag in m.entry.metadata.tags
    )

    logger.info(
        MEMORY_FILTER_APPLIED,
        strategy=self.strategy_name,
        candidates=len(memories),
        retained=len(retained),
        required_tag=self._required_tag,
    )

    return retained

PassthroughMemoryFilter

No-op filter that returns all memories unchanged.

Useful for backward compatibility and testing -- all memories pass through without filtering.

strategy_name property

strategy_name

Human-readable name of the filter strategy.

Returns:

Type Description
str

"passthrough".

filter_for_injection

filter_for_injection(memories)

Return all memories unchanged.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Ranked scored memories.

required

Returns:

Type Description
tuple[ScoredMemory, ...]

The input tuple unchanged.

Source code in src/synthorg/memory/filter.py
def filter_for_injection(
    self,
    memories: tuple[ScoredMemory, ...],
) -> tuple[ScoredMemory, ...]:
    """Return all memories unchanged.

    Args:
        memories: Ranked scored memories.

    Returns:
        The input tuple unchanged.
    """
    logger.info(
        MEMORY_FILTER_APPLIED,
        strategy=self.strategy_name,
        candidates=len(memories),
        retained=len(memories),
    )
    return memories

Formatter

formatter

Memory context formatter -- converts ranked memories to ChatMessages.

Handles token budget enforcement via greedy packing: iterates by rank, skips entries that exceed the remaining budget, and continues with smaller entries to maximise context within the token limit.

MEMORY_BLOCK_START module-attribute

MEMORY_BLOCK_START = '--- AGENT MEMORY ---'

Delimiter marking the start of memory context.

MEMORY_BLOCK_END module-attribute

MEMORY_BLOCK_END = '--- END MEMORY ---'

Delimiter marking the end of memory context.

format_memory_context

format_memory_context(memories, *, estimator, token_budget, injection_point=SYSTEM)

Format ranked memories into ChatMessage(s), respecting token budget.

Uses greedy packing: iterates memories by rank order and includes each one if it fits within the remaining budget.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Pre-ranked memories (highest score first).

required
estimator TokenEstimator

Token estimation implementation.

required
token_budget int

Maximum tokens for the memory block.

required
injection_point InjectionPoint

Role for the output message.

SYSTEM

Returns:

Type Description
ChatMessage

Tuple containing a single ChatMessage with formatted

...

memories, or empty tuple if no memories fit or input is empty.

Source code in src/synthorg/memory/formatter.py
def format_memory_context(
    memories: tuple[ScoredMemory, ...],
    *,
    estimator: TokenEstimator,
    token_budget: int,
    injection_point: InjectionPoint = InjectionPoint.SYSTEM,
) -> tuple[ChatMessage, ...]:
    """Format ranked memories into ChatMessage(s), respecting token budget.

    Uses greedy packing: iterates memories by rank order and includes
    each one if it fits within the remaining budget.

    Args:
        memories: Pre-ranked memories (highest score first).
        estimator: Token estimation implementation.
        token_budget: Maximum tokens for the memory block.
        injection_point: Role for the output message.

    Returns:
        Tuple containing a single ``ChatMessage`` with formatted
        memories, or empty tuple if no memories fit or input is empty.
    """
    if not memories or token_budget <= 0:
        return ()

    # Account for both newlines in the final block format:
    # "{START}\n{body}\n{END}"
    delimiter_text = f"{MEMORY_BLOCK_START}\n\n{MEMORY_BLOCK_END}"
    delimiter_tokens = estimator.estimate_tokens(delimiter_text)

    remaining = token_budget - delimiter_tokens
    if remaining <= 0:
        logger.debug(
            MEMORY_TOKEN_BUDGET_EXCEEDED,
            budget=token_budget,
            delimiter_tokens=delimiter_tokens,
            reason="budget exhausted by delimiters",
        )
        return ()

    # Greedy packing: iterate by rank, include memories that fit.
    # Entries too large for the remaining budget are skipped (not
    # stopping), allowing shorter lower-ranked entries to fill the
    # remaining space.
    included_lines: list[str] = []
    for memory in memories:
        line = _format_line(memory)
        line_tokens = estimator.estimate_tokens(line)
        # Account for the newline separator added by "\n".join()
        separator_cost = estimator.estimate_tokens("\n") if included_lines else 0
        if line_tokens + separator_cost <= remaining:
            included_lines.append(line)
            remaining -= line_tokens + separator_cost
        else:
            logger.debug(
                MEMORY_TOKEN_BUDGET_EXCEEDED,
                budget=token_budget,
                remaining=remaining,
                line_tokens=line_tokens,
                skipped_memory_id=memory.entry.id,
            )

    if not included_lines:
        logger.debug(
            MEMORY_TOKEN_BUDGET_EXCEEDED,
            budget=token_budget,
            total_candidates=len(memories),
            reason="no memories fit within budget",
        )
        return ()

    body = "\n".join(included_lines)
    block = f"{MEMORY_BLOCK_START}\n{body}\n{MEMORY_BLOCK_END}"

    try:
        role = _INJECTION_POINT_TO_ROLE[injection_point]
    except KeyError:
        msg = f"Unsupported injection point: {injection_point!r}"
        logger.warning(
            MEMORY_FORMAT_INVALID_INJECTION_POINT,
            injection_point=injection_point,
            reason=msg,
        )
        raise ValueError(msg) from None
    message = ChatMessage(role=role, content=block)

    logger.debug(
        MEMORY_FORMAT_COMPLETE,
        included_count=len(included_lines),
        total_candidates=len(memories),
        token_budget=token_budget,
        injection_point=injection_point.value,
    )

    return (message,)

Injection

injection

Memory injection strategy protocol and supporting types.

Defines the pluggable MemoryInjectionStrategy protocol that controls how memories reach agents during execution. Three strategies are planned (context injection, tool-based, self-editing); this module provides the protocol and enums for all. ContextInjectionStrategy (in synthorg.memory.retriever) and ToolBasedInjectionStrategy (in synthorg.memory.tool_retriever) are implemented.

TokenEstimator is a local structural protocol that avoids a memory -> engine import cycle (PromptTokenEstimator lives in engine/prompt.py). Any object with estimate_tokens(str) -> int satisfies it automatically.

InjectionStrategy

Bases: StrEnum

Which injection strategy to use for surfacing memories.

Attributes:

Name Type Description
CONTEXT

Pre-execution context injection (implemented).

TOOL_BASED

On-demand via agent tools (implemented).

SELF_EDITING

Structured read/write memory blocks (future).

InjectionPoint

Bases: StrEnum

Role of the injected memory message.

Attributes:

Name Type Description
SYSTEM

Memory injected as a SYSTEM message (default).

USER

Memory injected as a USER message.

TokenEstimator

Bases: Protocol

Token estimation protocol (avoids memory → engine dependency).

Any object with estimate_tokens(str) -> int satisfies this protocol structurally.

estimate_tokens

estimate_tokens(text)

Estimate the number of tokens in text.

Implementations must return non-negative values.

Parameters:

Name Type Description Default
text str

The text to estimate tokens for.

required

Returns:

Type Description
int

Estimated token count (non-negative).

Source code in src/synthorg/memory/injection.py
def estimate_tokens(self, text: str) -> int:
    """Estimate the number of tokens in *text*.

    Implementations must return non-negative values.

    Args:
        text: The text to estimate tokens for.

    Returns:
        Estimated token count (non-negative).
    """
    ...

DefaultTokenEstimator

Heuristic token estimator: len(text) // 4.

Suitable for rough budget enforcement when a model-specific tokenizer is unavailable.

estimate_tokens

estimate_tokens(text)

Estimate tokens as max(1, len(text) // 4) for non-empty text.

Returns 0 for empty text, at least 1 for any non-empty text.

Parameters:

Name Type Description Default
text str

The text to estimate tokens for.

required

Returns:

Type Description
int

Estimated token count (non-negative).

Source code in src/synthorg/memory/injection.py
def estimate_tokens(self, text: str) -> int:
    """Estimate tokens as ``max(1, len(text) // 4)`` for non-empty text.

    Returns 0 for empty text, at least 1 for any non-empty text.

    Args:
        text: The text to estimate tokens for.

    Returns:
        Estimated token count (non-negative).
    """
    if not text:
        return 0
    return max(1, len(text) // 4)

MemoryInjectionStrategy

Bases: Protocol

Pluggable strategy for making memories available to agents.

Implementations determine how memories reach the agent:

  • Context injection: pre-execution message injection.
  • Tool-based: on-demand retrieval via agent tools.
  • Self-editing: structured read/write memory blocks.

strategy_name property

strategy_name

Human-readable strategy identifier.

Returns:

Type Description
str

Strategy name string.

prepare_messages async

prepare_messages(agent_id, query_text, token_budget)

Return memory messages to inject into agent context.

Context injection returns ranked, formatted memories. Tool-based may return empty (tools handle retrieval). Self-editing returns the core memory block.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent requesting memories.

required
query_text NotBlankStr

Text to use for semantic retrieval.

required
token_budget int

Maximum tokens for memory content.

required

Returns:

Type Description
tuple[ChatMessage, ...]

Tuple of ChatMessage instances (may be empty).

Source code in src/synthorg/memory/injection.py
async def prepare_messages(
    self,
    agent_id: NotBlankStr,
    query_text: NotBlankStr,
    token_budget: int,
) -> tuple[ChatMessage, ...]:
    """Return memory messages to inject into agent context.

    Context injection returns ranked, formatted memories.
    Tool-based may return empty (tools handle retrieval).
    Self-editing returns the core memory block.

    Args:
        agent_id: The agent requesting memories.
        query_text: Text to use for semantic retrieval.
        token_budget: Maximum tokens for memory content.

    Returns:
        Tuple of ``ChatMessage`` instances (may be empty).
    """
    ...

get_tool_definitions

get_tool_definitions()

Return tool definitions this strategy provides.

Context injection returns (). Tool-based returns recall/search tools. Self-editing returns read/write tools.

Returns:

Type Description
tuple[ToolDefinition, ...]

Tuple of ToolDefinition instances.

Source code in src/synthorg/memory/injection.py
def get_tool_definitions(self) -> tuple[ToolDefinition, ...]:
    """Return tool definitions this strategy provides.

    Context injection returns ``()``.  Tool-based returns
    recall/search tools.  Self-editing returns read/write tools.

    Returns:
        Tuple of ``ToolDefinition`` instances.
    """
    ...

Org Memory

protocol

OrgMemoryBackend protocol -- lifecycle + org memory operations.

Application code depends on this protocol for shared organizational memory storage and retrieval. Concrete backends implement this protocol to provide company-wide knowledge management.

OrgMemoryBackend

Bases: Protocol

Structural interface for organizational memory backends.

Provides company-wide knowledge storage, retrieval, and lifecycle management. All operations require a connected backend.

Attributes:

Name Type Description
is_connected bool

Whether the backend has an active connection.

backend_name NotBlankStr

Human-readable backend identifier.

is_connected property

is_connected

Whether the backend has an active connection.

backend_name property

backend_name

Human-readable backend identifier.

connect async

connect()

Establish connection to the org memory backend.

Raises:

Type Description
OrgMemoryConnectionError

If the connection fails.

Source code in src/synthorg/memory/org/protocol.py
async def connect(self) -> None:
    """Establish connection to the org memory backend.

    Raises:
        OrgMemoryConnectionError: If the connection fails.
    """
    ...

disconnect async

disconnect()

Close the org memory backend connection.

Safe to call even if not connected.

Source code in src/synthorg/memory/org/protocol.py
async def disconnect(self) -> None:
    """Close the org memory backend connection.

    Safe to call even if not connected.
    """
    ...

health_check async

health_check()

Check whether the backend is healthy and responsive.

Returns:

Type Description
bool

True if the backend is reachable and operational.

Source code in src/synthorg/memory/org/protocol.py
async def health_check(self) -> bool:
    """Check whether the backend is healthy and responsive.

    Returns:
        ``True`` if the backend is reachable and operational.
    """
    ...

query async

query(query)

Query organizational facts.

Parameters:

Name Type Description Default
query OrgMemoryQuery

Query parameters.

required

Returns:

Type Description
tuple[OrgFact, ...]

Matching facts ordered by relevance.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/protocol.py
async def query(self, query: OrgMemoryQuery) -> tuple[OrgFact, ...]:
    """Query organizational facts.

    Args:
        query: Query parameters.

    Returns:
        Matching facts ordered by relevance.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    ...

write async

write(request, *, author)

Write a new organizational fact.

Parameters:

Name Type Description Default
request OrgFactWriteRequest

Fact content and category.

required
author OrgFactAuthor

The author of the fact.

required

Returns:

Type Description
NotBlankStr

The assigned fact ID.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryAccessDeniedError

If write access is denied.

OrgMemoryWriteError

If the write operation fails.

Source code in src/synthorg/memory/org/protocol.py
async def write(
    self,
    request: OrgFactWriteRequest,
    *,
    author: OrgFactAuthor,
) -> NotBlankStr:
    """Write a new organizational fact.

    Args:
        request: Fact content and category.
        author: The author of the fact.

    Returns:
        The assigned fact ID.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryAccessDeniedError: If write access is denied.
        OrgMemoryWriteError: If the write operation fails.
    """
    ...

list_policies async

list_policies()

List all core policy facts.

Returns:

Type Description
tuple[OrgFact, ...]

Tuple of core policy facts.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

Source code in src/synthorg/memory/org/protocol.py
async def list_policies(self) -> tuple[OrgFact, ...]:
    """List all core policy facts.

    Returns:
        Tuple of core policy facts.

    Raises:
        OrgMemoryConnectionError: If not connected.
    """
    ...

config

Org memory configuration models.

Frozen Pydantic models for organizational memory backend selection and behaviour settings.

ExtendedStoreConfig pydantic-model

Bases: BaseModel

Configuration for the extended org facts store.

Attributes:

Name Type Description
backend NotBlankStr

Store backend name (e.g. "sqlite").

max_retrieved_per_query int

Maximum facts to retrieve per query.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_backend_name

backend pydantic-field

backend = 'sqlite'

Store backend name

max_retrieved_per_query pydantic-field

max_retrieved_per_query = 5

Maximum facts to retrieve per query

OrgMemoryConfig pydantic-model

Bases: BaseModel

Top-level organizational memory configuration.

Attributes:

Name Type Description
backend NotBlankStr

Org memory backend name.

core_policies tuple[NotBlankStr, ...]

Core policy texts injected into system prompts.

extended_store ExtendedStoreConfig

Extended facts store configuration.

write_access WriteAccessConfig

Write access control configuration.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_backend_name

backend pydantic-field

backend = 'hybrid_prompt_retrieval'

Org memory backend name

core_policies pydantic-field

core_policies = ()

Core policy texts injected into system prompts

extended_store pydantic-field

extended_store

Extended facts store configuration

write_access pydantic-field

write_access

Write access control configuration

models

Org memory domain models.

Frozen Pydantic models for organizational facts -- shared company-wide knowledge such as policies, ADRs, procedures, and conventions.

OrgFactAuthor pydantic-model

Bases: BaseModel

Author of an organizational fact.

If is_human is True, agent_id must be None. If is_human is False, agent_id and seniority are required.

Attributes:

Name Type Description
agent_id NotBlankStr | None

Agent identifier (None for human authors).

seniority SeniorityLevel | None

Agent seniority level (None for human authors).

is_human bool

Whether the author is a human operator.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_author_consistency

agent_id pydantic-field

agent_id = None

Agent identifier (None for human authors)

seniority pydantic-field

seniority = None

Agent seniority level (None for human authors)

is_human pydantic-field

is_human = False

Whether the author is a human operator

OrgFact pydantic-model

Bases: BaseModel

An organizational fact -- a piece of shared company-wide knowledge.

Attributes:

Name Type Description
id NotBlankStr

Unique identifier for this fact.

content NotBlankStr

The fact content text.

category OrgFactCategory

Category classification.

author OrgFactAuthor

Who created this fact.

created_at AwareDatetime

Creation timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique fact identifier

content pydantic-field

content

Fact content text

category pydantic-field

category

Category classification

author pydantic-field

author

Who created this fact

created_at pydantic-field

created_at

Creation timestamp

OrgFactWriteRequest pydantic-model

Bases: BaseModel

Request to write a new organizational fact.

Attributes:

Name Type Description
content NotBlankStr

The fact content text.

category OrgFactCategory

Category classification.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

content pydantic-field

content

Fact content text

category pydantic-field

category

Category classification

OrgMemoryQuery pydantic-model

Bases: BaseModel

Query parameters for org memory retrieval.

Attributes:

Name Type Description
context NotBlankStr | None

Text search context (None for metadata-only).

categories frozenset[OrgFactCategory] | None

Filter by fact categories.

limit int

Maximum number of results.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

context pydantic-field

context = None

Text search context

categories pydantic-field

categories = None

Filter by fact categories

limit pydantic-field

limit = 5

Maximum number of results

store

Org fact store -- protocol and SQLite implementation.

Self-contained storage for organizational facts, separate from the operational persistence layer.

OrgFactStore

Bases: Protocol

Protocol for organizational fact persistence.

is_connected property

is_connected

Whether the store has an active connection.

connect async

connect()

Establish connection to the store.

Source code in src/synthorg/memory/org/store.py
async def connect(self) -> None:
    """Establish connection to the store."""
    ...

disconnect async

disconnect()

Close the store connection.

Source code in src/synthorg/memory/org/store.py
async def disconnect(self) -> None:
    """Close the store connection."""
    ...

save async

save(fact)

Save an organizational fact.

Parameters:

Name Type Description Default
fact OrgFact

The fact to persist.

required

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryWriteError

If the save fails.

Source code in src/synthorg/memory/org/store.py
async def save(self, fact: OrgFact) -> None:
    """Save an organizational fact.

    Args:
        fact: The fact to persist.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryWriteError: If the save fails.
    """
    ...

get async

get(fact_id)

Get a fact by ID.

Parameters:

Name Type Description Default
fact_id NotBlankStr

The fact identifier.

required

Returns:

Type Description
OrgFact | None

The fact, or None if not found.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/store.py
async def get(self, fact_id: NotBlankStr) -> OrgFact | None:
    """Get a fact by ID.

    Args:
        fact_id: The fact identifier.

    Returns:
        The fact, or ``None`` if not found.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    ...

query async

query(*, categories=None, text=None, limit=5)

Query facts by category and/or text.

Parameters:

Name Type Description Default
categories frozenset[OrgFactCategory] | None

Optional category filter.

None
text str | None

Optional text search (substring match).

None
limit int

Maximum results.

5

Returns:

Type Description
tuple[OrgFact, ...]

Matching facts.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/store.py
async def query(
    self,
    *,
    categories: frozenset[OrgFactCategory] | None = None,
    text: str | None = None,
    limit: int = 5,
) -> tuple[OrgFact, ...]:
    """Query facts by category and/or text.

    Args:
        categories: Optional category filter.
        text: Optional text search (substring match).
        limit: Maximum results.

    Returns:
        Matching facts.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    ...

list_by_category async

list_by_category(category)

List all facts in a category.

Parameters:

Name Type Description Default
category OrgFactCategory

The category to list.

required

Returns:

Type Description
tuple[OrgFact, ...]

All facts in the category.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/store.py
async def list_by_category(
    self,
    category: OrgFactCategory,
) -> tuple[OrgFact, ...]:
    """List all facts in a category.

    Args:
        category: The category to list.

    Returns:
        All facts in the category.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    ...

delete async

delete(fact_id)

Delete a fact by ID.

Parameters:

Name Type Description Default
fact_id NotBlankStr

Fact identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryWriteError

If the delete fails.

Source code in src/synthorg/memory/org/store.py
async def delete(self, fact_id: NotBlankStr) -> bool:
    """Delete a fact by ID.

    Args:
        fact_id: Fact identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryWriteError: If the delete fails.
    """
    ...

SQLiteOrgFactStore

SQLiteOrgFactStore(db_path)

SQLite-backed organizational fact store.

Uses a separate database from the operational persistence layer to keep institutional knowledge decoupled.

Parameters:

Name Type Description Default
db_path str

Path to the SQLite database file (or :memory:).

required

Raises:

Type Description
OrgMemoryConnectionError

If the path contains traversal.

Source code in src/synthorg/memory/org/store.py
def __init__(self, db_path: str) -> None:
    _reject_traversal(db_path)
    self._db_path = db_path
    self._db: aiosqlite.Connection | None = None

is_connected property

is_connected

Whether the store has an active connection.

backend_name property

backend_name

Human-readable store identifier.

connect async

connect()

Open the SQLite database with WAL mode and ensure schema.

Raises:

Type Description
OrgMemoryConnectionError

If the connection fails.

Source code in src/synthorg/memory/org/store.py
async def connect(self) -> None:
    """Open the SQLite database with WAL mode and ensure schema.

    Raises:
        OrgMemoryConnectionError: If the connection fails.
    """
    if self._db is not None:
        return
    try:
        self._db = await aiosqlite.connect(self._db_path)
        self._db.row_factory = aiosqlite.Row
        if self._db_path != ":memory:":
            await self._db.execute("PRAGMA journal_mode=WAL")
        await self._ensure_schema()
    except (sqlite3.Error, OSError) as exc:
        if self._db is not None:
            try:
                await self._db.close()
            except (sqlite3.Error, OSError) as close_exc:
                logger.warning(
                    ORG_MEMORY_DISCONNECT_FAILED,
                    db_path=self._db_path,
                    reason="cleanup close during failed connect",
                    error=str(close_exc),
                    error_type=type(close_exc).__name__,
                )
        self._db = None
        msg = f"Failed to connect to org fact store: {exc}"
        logger.exception(
            ORG_MEMORY_CONNECT_FAILED,
            db_path=self._db_path,
            error=str(exc),
        )
        raise OrgMemoryConnectionError(msg) from exc

disconnect async

disconnect()

Close the database connection.

Source code in src/synthorg/memory/org/store.py
async def disconnect(self) -> None:
    """Close the database connection."""
    if self._db is None:
        return
    try:
        await self._db.close()
    except (sqlite3.Error, OSError) as exc:
        logger.warning(
            ORG_MEMORY_DISCONNECT_FAILED,
            db_path=self._db_path,
            error=str(exc),
            error_type=type(exc).__name__,
        )
    finally:
        self._db = None

save async

save(fact)

Persist a fact to the database.

Uses INSERT (not INSERT OR REPLACE) to preserve the append-only audit trail. Duplicate IDs raise OrgMemoryWriteError.

Parameters:

Name Type Description Default
fact OrgFact

The fact to save.

required

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryWriteError

If the save fails or the ID exists.

Source code in src/synthorg/memory/org/store.py
async def save(self, fact: OrgFact) -> None:
    """Persist a fact to the database.

    Uses ``INSERT`` (not ``INSERT OR REPLACE``) to preserve the
    append-only audit trail.  Duplicate IDs raise
    ``OrgMemoryWriteError``.

    Args:
        fact: The fact to save.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryWriteError: If the save fails or the ID exists.
    """
    db = self._require_connected()
    try:
        await db.execute(
            "INSERT INTO org_facts "
            "(id, content, category, author_agent_id, "
            "author_seniority, author_is_human, created_at) "
            "VALUES (?, ?, ?, ?, ?, ?, ?)",
            (
                fact.id,
                fact.content,
                fact.category.value,
                fact.author.agent_id,
                fact.author.seniority.value if fact.author.seniority else None,
                int(fact.author.is_human),
                fact.created_at.isoformat(),
            ),
        )
        await db.commit()
    except sqlite3.Error as exc:
        logger.exception(
            ORG_MEMORY_WRITE_FAILED,
            fact_id=fact.id,
            error=str(exc),
        )
        msg = f"Failed to save org fact: {exc}"
        raise OrgMemoryWriteError(msg) from exc

get async

get(fact_id)

Get a fact by its ID.

Parameters:

Name Type Description Default
fact_id NotBlankStr

Fact identifier.

required

Returns:

Type Description
OrgFact | None

The fact or None.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/store.py
async def get(self, fact_id: NotBlankStr) -> OrgFact | None:
    """Get a fact by its ID.

    Args:
        fact_id: Fact identifier.

    Returns:
        The fact or ``None``.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    db = self._require_connected()
    try:
        cursor = await db.execute(
            "SELECT * FROM org_facts WHERE id = ?",
            (fact_id,),
        )
        row = await cursor.fetchone()
    except sqlite3.Error as exc:
        logger.exception(
            ORG_MEMORY_QUERY_FAILED,
            fact_id=fact_id,
            error=str(exc),
        )
        msg = f"Failed to get org fact: {exc}"
        raise OrgMemoryQueryError(msg) from exc
    if row is None:
        return None
    return _row_to_org_fact(row)

query async

query(*, categories=None, text=None, limit=5)

Query facts by category and/or text content.

All dynamic values are passed as parameterized query parameters. The WHERE clause is constructed from safe column/operator constants only -- no user input is interpolated into SQL.

Parameters:

Name Type Description Default
categories frozenset[OrgFactCategory] | None

Category filter.

None
text str | None

Text substring filter.

None
limit int

Maximum results.

5

Returns:

Type Description
tuple[OrgFact, ...]

Matching facts.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/store.py
async def query(
    self,
    *,
    categories: frozenset[OrgFactCategory] | None = None,
    text: str | None = None,
    limit: int = 5,
) -> tuple[OrgFact, ...]:
    """Query facts by category and/or text content.

    All dynamic values are passed as parameterized query parameters.
    The ``WHERE`` clause is constructed from safe column/operator
    constants only -- no user input is interpolated into SQL.

    Args:
        categories: Category filter.
        text: Text substring filter.
        limit: Maximum results.

    Returns:
        Matching facts.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    db = self._require_connected()
    clauses: list[str] = []
    params: list[str | int] = []

    if categories is not None and categories:
        placeholders = ",".join("?" for _ in categories)
        clauses.append(f"category IN ({placeholders})")
        params.extend(c.value for c in categories)

    escaped = ""
    if text is not None:
        escaped = text.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
        clauses.append("content LIKE ? ESCAPE '\\'")
        params.append(f"%{escaped}%")

    where = f" WHERE {' AND '.join(clauses)}" if clauses else ""
    # When a text filter is active, rank by match position (earlier
    # = more relevant), then content length (shorter = more focused),
    # then recency.  Without text, fall back to pure recency.
    if text is not None:
        order = (
            "ORDER BY INSTR(LOWER(content), LOWER(?)) ASC, "
            "LENGTH(content) ASC, created_at DESC"
        )
        params.append(escaped)
    else:
        order = "ORDER BY created_at DESC"
    sql = f"SELECT * FROM org_facts{where} {order} LIMIT ?"  # noqa: S608
    params.append(limit)

    try:
        cursor = await db.execute(sql, params)
        rows = await cursor.fetchall()
    except sqlite3.Error as exc:
        logger.exception(
            ORG_MEMORY_QUERY_FAILED,
            error=str(exc),
        )
        msg = f"Failed to query org facts: {exc}"
        raise OrgMemoryQueryError(msg) from exc
    return tuple(_row_to_org_fact(row) for row in rows)

list_by_category async

list_by_category(category)

List all facts in a category.

Parameters:

Name Type Description Default
category OrgFactCategory

The category to list.

required

Returns:

Type Description
tuple[OrgFact, ...]

All facts in the category.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/store.py
async def list_by_category(
    self,
    category: OrgFactCategory,
) -> tuple[OrgFact, ...]:
    """List all facts in a category.

    Args:
        category: The category to list.

    Returns:
        All facts in the category.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    db = self._require_connected()
    try:
        cursor = await db.execute(
            "SELECT * FROM org_facts WHERE category = ? ORDER BY created_at DESC",
            (category.value,),
        )
        rows = await cursor.fetchall()
    except sqlite3.Error as exc:
        logger.exception(
            ORG_MEMORY_QUERY_FAILED,
            category=category.value,
            error=str(exc),
        )
        msg = f"Failed to list org facts by category: {exc}"
        raise OrgMemoryQueryError(msg) from exc
    return tuple(_row_to_org_fact(row) for row in rows)

delete async

delete(fact_id)

Delete a fact by ID.

Parameters:

Name Type Description Default
fact_id NotBlankStr

Fact identifier.

required

Returns:

Type Description
bool

True if deleted, False if not found.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryWriteError

If the delete fails.

Source code in src/synthorg/memory/org/store.py
async def delete(self, fact_id: NotBlankStr) -> bool:
    """Delete a fact by ID.

    Args:
        fact_id: Fact identifier.

    Returns:
        ``True`` if deleted, ``False`` if not found.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryWriteError: If the delete fails.
    """
    db = self._require_connected()
    try:
        cursor = await db.execute(
            "DELETE FROM org_facts WHERE id = ?",
            (fact_id,),
        )
        await db.commit()
    except sqlite3.Error as exc:
        logger.exception(
            ORG_MEMORY_WRITE_FAILED,
            fact_id=fact_id,
            error=str(exc),
        )
        msg = f"Failed to delete org fact: {exc}"
        raise OrgMemoryWriteError(msg) from exc
    else:
        return cursor.rowcount > 0

access_control

Write access control for organizational memory.

Provides seniority-based and human-based write restriction models, configuration, and enforcement functions.

CategoryWriteRule pydantic-model

Bases: BaseModel

Write permission rule for a single fact category.

Attributes:

Name Type Description
allowed_seniority SeniorityLevel | None

Minimum seniority level for agent writes (None means only humans can write).

human_allowed bool

Whether human operators can write.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

allowed_seniority pydantic-field

allowed_seniority = None

Minimum seniority level for agent writes (None = human-only)

human_allowed pydantic-field

human_allowed = True

Whether human operators can write

WriteAccessConfig pydantic-model

Bases: BaseModel

Write access configuration for all fact categories.

Attributes:

Name Type Description
rules dict[OrgFactCategory, CategoryWriteRule]

Per-category write rules (read-only mapping).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _wrap_rules_readonly

rules pydantic-field

rules

Per-category write rules

check_write_access

check_write_access(config, category, author)

Check whether the given author may write to the given category.

Parameters:

Name Type Description Default
config WriteAccessConfig

Write access configuration.

required
category OrgFactCategory

Target fact category.

required
author OrgFactAuthor

The author attempting the write.

required

Returns:

Type Description
bool

True if write is permitted, False otherwise.

Source code in src/synthorg/memory/org/access_control.py
def check_write_access(
    config: WriteAccessConfig,
    category: OrgFactCategory,
    author: OrgFactAuthor,
) -> bool:
    """Check whether the given author may write to the given category.

    Args:
        config: Write access configuration.
        category: Target fact category.
        author: The author attempting the write.

    Returns:
        ``True`` if write is permitted, ``False`` otherwise.
    """
    # Fail closed: if a category has no explicit rule, deny all writes.
    rule = config.rules.get(
        category,
        CategoryWriteRule(allowed_seniority=None, human_allowed=False),
    )

    if author.is_human:
        return rule.human_allowed

    if rule.allowed_seniority is None:
        return False

    if author.seniority is None:
        return False

    return compare_seniority(author.seniority, rule.allowed_seniority) >= 0

require_write_access

require_write_access(config, category, author)

Check write access and raise if denied.

Parameters:

Name Type Description Default
config WriteAccessConfig

Write access configuration.

required
category OrgFactCategory

Target fact category.

required
author OrgFactAuthor

The author attempting the write.

required

Raises:

Type Description
OrgMemoryAccessDeniedError

If write is not permitted.

Source code in src/synthorg/memory/org/access_control.py
def require_write_access(
    config: WriteAccessConfig,
    category: OrgFactCategory,
    author: OrgFactAuthor,
) -> None:
    """Check write access and raise if denied.

    Args:
        config: Write access configuration.
        category: Target fact category.
        author: The author attempting the write.

    Raises:
        OrgMemoryAccessDeniedError: If write is not permitted.
    """
    if not check_write_access(config, category, author):
        author_desc = (
            "human"
            if author.is_human
            else f"agent {author.agent_id} ({author.seniority})"
        )
        msg = (
            f"Write access denied: {author_desc} cannot write "
            f"to category {category.value!r}"
        )
        logger.warning(
            ORG_MEMORY_WRITE_DENIED,
            category=category.value,
            author_is_human=author.is_human,
            author_agent_id=author.agent_id,
            author_seniority=str(author.seniority) if author.seniority else None,
            reason=msg,
        )
        raise OrgMemoryAccessDeniedError(msg)

Consolidation

config

Memory consolidation configuration models.

Frozen Pydantic models for consolidation interval, retention, and archival settings.

RetentionConfig pydantic-model

Bases: BaseModel

Per-category retention configuration (company-level defaults).

These rules apply as the baseline for all agents. Individual agents can override specific categories via :attr:~synthorg.core.agent.MemoryConfig.retention_overrides.

Resolution order per category (highest priority first):

  1. Agent per-category override
  2. Company per-category rule (this config)
  3. Agent global default (MemoryConfig.retention_days)
  4. Company global default (default_retention_days)
  5. Keep forever (no expiry)

Attributes:

Name Type Description
rules tuple[RetentionRule, ...]

Per-category retention rules (unique categories).

default_retention_days int | None

Default retention in days (None = keep forever).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_categories

rules pydantic-field

rules = ()

Per-category retention rules

default_retention_days pydantic-field

default_retention_days = None

Default retention in days (None = forever)

DualModeConfig pydantic-model

Bases: BaseModel

Configuration for dual-mode archival.

Controls density-aware archival: LLM abstractive summaries for sparse/conversational content vs extractive preservation (verbatim key facts + start/mid/end anchors) for dense/factual content.

Attributes:

Name Type Description
enabled bool

Whether dual-mode density classification is active. When False, the dual-mode strategy is not used.

dense_threshold float

Density score threshold for DENSE classification (0.0 = classify everything as dense, 1.0 = everything sparse).

summarization_model NotBlankStr | None

Model ID for abstractive summarization.

max_summary_tokens int

Maximum tokens for LLM summary responses.

max_facts int

Maximum number of extracted key facts for extractive mode.

anchor_length int

Character length for each extractive anchor snippet (start/mid/end).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_model_when_enabled

enabled pydantic-field

enabled = False

Whether dual-mode density classification is active

dense_threshold pydantic-field

dense_threshold = 0.5

Density score threshold for DENSE classification

summarization_model pydantic-field

summarization_model = None

Model ID for abstractive summarization

max_summary_tokens pydantic-field

max_summary_tokens = 200

Maximum tokens for LLM summary responses

max_facts pydantic-field

max_facts = 20

Maximum extracted key facts for extractive mode

anchor_length pydantic-field

anchor_length = 150

Character length for each extractive anchor

ArchivalConfig pydantic-model

Bases: BaseModel

Archival configuration.

Attributes:

Name Type Description
enabled bool

Whether archival is enabled.

age_threshold_days int

Minimum age in days before archival.

dual_mode DualModeConfig

Dual-mode archival configuration.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

enabled pydantic-field

enabled = False

Whether archival is enabled

age_threshold_days pydantic-field

age_threshold_days = 90

Minimum age in days before archival

dual_mode pydantic-field

dual_mode

Dual-mode archival configuration

ConsolidationConfig pydantic-model

Bases: BaseModel

Top-level memory consolidation configuration.

Attributes:

Name Type Description
interval ConsolidationInterval

How often to run consolidation.

max_memories_per_agent int

Upper bound on memories per agent.

retention RetentionConfig

Per-category retention settings.

archival ArchivalConfig

Archival settings.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

interval pydantic-field

interval = DAILY

How often to run consolidation

max_memories_per_agent pydantic-field

max_memories_per_agent = 10000

Upper bound on memories per agent

retention pydantic-field

retention

Per-category retention settings

archival pydantic-field

archival

Archival settings

models

Memory consolidation domain models.

Frozen Pydantic models for consolidation results, archival entries, retention rules, and dual-mode archival types.

ArchivalMode

Bases: StrEnum

How a memory entry was archived during consolidation.

Determines the preservation strategy applied before archival.

ABSTRACTIVE class-attribute instance-attribute

ABSTRACTIVE = 'abstractive'

LLM-generated summary for sparse/conversational content.

EXTRACTIVE class-attribute instance-attribute

EXTRACTIVE = 'extractive'

Verbatim key-fact extraction for dense/factual content.

ArchivalModeAssignment pydantic-model

Bases: BaseModel

Maps a removed memory entry to the archival mode applied.

Attributes:

Name Type Description
original_id NotBlankStr

ID of the removed memory entry.

mode ArchivalMode

Archival mode applied to this entry.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

original_id pydantic-field

original_id

ID of the removed memory entry

mode pydantic-field

mode

Archival mode applied to this entry

ArchivalIndexEntry pydantic-model

Bases: BaseModel

Maps a removed memory entry to its archival store ID.

Enables deterministic index-based restore: agents can look up their own archived entries by original ID without semantic search.

Attributes:

Name Type Description
original_id NotBlankStr

ID of the original memory entry.

archival_id NotBlankStr

ID assigned by the archival store.

mode ArchivalMode

Archival mode used for this entry.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

original_id pydantic-field

original_id

ID of the original memory entry

archival_id pydantic-field

archival_id

ID assigned by the archival store

mode pydantic-field

mode

Archival mode used for this entry

ConsolidationResult pydantic-model

Bases: BaseModel

Result of a memory consolidation run.

Attributes:

Name Type Description
removed_ids tuple[NotBlankStr, ...]

IDs of removed memory entries.

summary_id NotBlankStr | None

ID of the summary entry (if created).

archived_count int

Number of entries archived.

consolidated_count int

Derived from len(removed_ids).

mode_assignments tuple[ArchivalModeAssignment, ...]

Per-entry archival mode assignments (set by strategy, empty for strategies that don't classify density).

archival_index tuple[ArchivalIndexEntry, ...]

Maps original memory IDs to archival store IDs (built by service after archival completes).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_archival_consistency

removed_ids pydantic-field

removed_ids = ()

IDs of removed memory entries

summary_id pydantic-field

summary_id = None

ID of the summary entry (if created)

archived_count pydantic-field

archived_count = 0

Number of entries archived

mode_assignments pydantic-field

mode_assignments = ()

Per-entry archival mode assignments

archival_index pydantic-field

archival_index = ()

Original-to-archival ID mapping

consolidated_count property

consolidated_count

Number of memories consolidated (derived from removed_ids).

ArchivalEntry pydantic-model

Bases: BaseModel

An archived memory entry.

Attributes:

Name Type Description
original_id NotBlankStr

ID from the hot store.

agent_id NotBlankStr

Owning agent identifier.

content NotBlankStr

Memory content text.

category MemoryCategory

Memory type category.

metadata MemoryMetadata

Associated metadata.

created_at AwareDatetime

Original creation timestamp.

archived_at AwareDatetime

When this entry was archived.

archival_mode ArchivalMode

How this entry was archived.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_temporal_order

original_id pydantic-field

original_id

ID from the hot store

agent_id pydantic-field

agent_id

Owning agent identifier

content pydantic-field

content

Memory content text

category pydantic-field

category

Memory type category

metadata pydantic-field

metadata

Associated metadata

created_at pydantic-field

created_at

Original creation timestamp

archived_at pydantic-field

archived_at

When this entry was archived

archival_mode pydantic-field

archival_mode

Archival mode used for this entry

RetentionRule pydantic-model

Bases: BaseModel

Per-category retention rule.

Attributes:

Name Type Description
category MemoryCategory

Memory category this rule applies to.

retention_days int

Number of days to retain memories.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

category pydantic-field

category

Memory category this rule applies to

retention_days pydantic-field

retention_days

Number of days to retain memories

strategy

Consolidation strategy protocol.

Defines the interface for memory consolidation algorithms that compress and summarize older memories.

ConsolidationStrategy

Bases: Protocol

Protocol for memory consolidation strategies.

Implementations receive a batch of memory entries and produce a ConsolidationResult indicating which entries were merged, removed, or summarized.

consolidate async

consolidate(entries, *, agent_id)

Consolidate a batch of memory entries.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to consolidate.

required
agent_id NotBlankStr

Owning agent identifier.

required

Returns:

Type Description
ConsolidationResult

Result describing what was consolidated.

Source code in src/synthorg/memory/consolidation/strategy.py
async def consolidate(
    self,
    entries: tuple[MemoryEntry, ...],
    *,
    agent_id: NotBlankStr,
) -> ConsolidationResult:
    """Consolidate a batch of memory entries.

    Args:
        entries: Memory entries to consolidate.
        agent_id: Owning agent identifier.

    Returns:
        Result describing what was consolidated.
    """
    ...

service

Memory consolidation service.

Orchestrates retention cleanup, consolidation, archival, and max-memories enforcement into a single maintenance entry point.

MemoryConsolidationService

MemoryConsolidationService(*, backend, config, strategy=None, archival_store=None)

Orchestrates memory consolidation, retention, and archival.

Parameters:

Name Type Description Default
backend MemoryBackend

Memory backend for CRUD operations.

required
config ConsolidationConfig

Consolidation configuration.

required
strategy ConsolidationStrategy | None

Optional consolidation strategy (skips consolidation step if None).

None
archival_store ArchivalStore | None

Optional archival store (skips archival if None or disabled in config).

None
Source code in src/synthorg/memory/consolidation/service.py
def __init__(
    self,
    *,
    backend: MemoryBackend,
    config: ConsolidationConfig,
    strategy: ConsolidationStrategy | None = None,
    archival_store: ArchivalStore | None = None,
) -> None:
    self._backend = backend
    self._config = config
    self._strategy = strategy
    self._archival_store = archival_store
    self._retention = RetentionEnforcer(
        config=config.retention,
        backend=backend,
    )

run_consolidation async

run_consolidation(agent_id)

Run memory consolidation for an agent.

Retrieves up to 1000 entries per invocation and applies the consolidation strategy, then archives removed entries if archival is configured and enabled. Per-entry archival failures are logged and skipped -- they do not abort the entire batch.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose memories to consolidate.

required

Returns:

Type Description
ConsolidationResult

Consolidation result (including archival count).

Source code in src/synthorg/memory/consolidation/service.py
async def run_consolidation(
    self,
    agent_id: NotBlankStr,
) -> ConsolidationResult:
    """Run memory consolidation for an agent.

    Retrieves up to 1000 entries per invocation and applies the
    consolidation strategy, then archives removed entries if archival
    is configured and enabled.  Per-entry archival failures are
    logged and skipped -- they do not abort the entire batch.

    Args:
        agent_id: Agent whose memories to consolidate.

    Returns:
        Consolidation result (including archival count).
    """
    if self._strategy is None:
        logger.info(CONSOLIDATION_SKIPPED, agent_id=agent_id)
        return ConsolidationResult()

    logger.info(CONSOLIDATION_START, agent_id=agent_id)

    try:
        query = MemoryQuery(limit=1000)
        entries = await self._backend.retrieve(agent_id, query)

        result = await self._strategy.consolidate(
            entries,
            agent_id=agent_id,
        )

        if self._archival_store is not None and self._config.archival.enabled:
            archived, index = await self._archive_entries(
                agent_id,
                entries,
                result.removed_ids,
                result.mode_assignments,
            )
            result = ConsolidationResult(
                removed_ids=result.removed_ids,
                summary_id=result.summary_id,
                archived_count=archived,
                mode_assignments=result.mode_assignments,
                archival_index=index,
            )
    except Exception as exc:
        logger.exception(
            CONSOLIDATION_FAILED,
            agent_id=agent_id,
            error=str(exc),
            error_type=type(exc).__name__,
        )
        raise
    else:
        logger.info(
            CONSOLIDATION_COMPLETE,
            agent_id=agent_id,
            consolidated_count=result.consolidated_count,
            archived_count=result.archived_count,
        )
        return result

enforce_max_memories async

enforce_max_memories(agent_id)

Enforce the maximum memories limit for an agent.

Retrieves excess entries in batches (up to 1000 per query, the MemoryQuery.limit cap) and deletes them. The entries selected for deletion depend on the backend's default query ordering -- typically oldest-first, but consult the concrete backend for specifics.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to check.

required

Returns:

Type Description
int

Number of entries deleted.

Source code in src/synthorg/memory/consolidation/service.py
async def enforce_max_memories(
    self,
    agent_id: NotBlankStr,
) -> int:
    """Enforce the maximum memories limit for an agent.

    Retrieves excess entries in batches (up to 1000 per query,
    the ``MemoryQuery.limit`` cap) and deletes them.  The entries
    selected for deletion depend on the backend's default query
    ordering -- typically oldest-first, but consult the concrete
    backend for specifics.

    Args:
        agent_id: Agent to check.

    Returns:
        Number of entries deleted.
    """
    try:
        total = await self._backend.count(agent_id)
        excess = total - self._config.max_memories_per_agent

        if excess <= 0:
            return 0

        deleted = 0
        remaining = excess
        while remaining > 0:
            batch_size = min(remaining, _MAX_ENFORCE_BATCH)
            query = MemoryQuery(limit=batch_size)
            entries = await self._backend.retrieve(agent_id, query)
            if not entries:
                break
            for entry in entries:
                if await self._backend.delete(agent_id, entry.id):
                    deleted += 1
            remaining -= len(entries)
    except Exception as exc:
        logger.exception(
            MAX_MEMORIES_ENFORCE_FAILED,
            agent_id=agent_id,
            error=str(exc),
            error_type=type(exc).__name__,
        )
        raise
    else:
        logger.info(
            MAX_MEMORIES_ENFORCED,
            agent_id=agent_id,
            total_before=total,
            deleted=deleted,
        )
        return deleted

cleanup_retention async

cleanup_retention(
    agent_id, *, agent_category_overrides=None, agent_default_retention_days=None
)

Run retention cleanup for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose expired memories to clean up.

required
agent_category_overrides Mapping[MemoryCategory, int] | None

Per-category retention overrides for this agent.

None
agent_default_retention_days int | None

Agent-level default retention in days.

None

Returns:

Type Description
int

Number of expired memories deleted.

Source code in src/synthorg/memory/consolidation/service.py
async def cleanup_retention(
    self,
    agent_id: NotBlankStr,
    *,
    agent_category_overrides: Mapping[MemoryCategory, int] | None = None,
    agent_default_retention_days: int | None = None,
) -> int:
    """Run retention cleanup for an agent.

    Args:
        agent_id: Agent whose expired memories to clean up.
        agent_category_overrides: Per-category retention overrides
            for this agent.
        agent_default_retention_days: Agent-level default retention
            in days.

    Returns:
        Number of expired memories deleted.
    """
    return await self._retention.cleanup_expired(
        agent_id,
        agent_category_overrides=agent_category_overrides,
        agent_default_retention_days=agent_default_retention_days,
    )

run_maintenance async

run_maintenance(
    agent_id, *, agent_category_overrides=None, agent_default_retention_days=None
)

Run full maintenance cycle for an agent.

Orchestrates: retention cleanup -> consolidation -> max enforcement.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to maintain.

required
agent_category_overrides Mapping[MemoryCategory, int] | None

Per-category retention overrides for this agent.

None
agent_default_retention_days int | None

Agent-level default retention in days.

None

Returns:

Type Description
ConsolidationResult

Consolidation result from the consolidation step.

Source code in src/synthorg/memory/consolidation/service.py
async def run_maintenance(
    self,
    agent_id: NotBlankStr,
    *,
    agent_category_overrides: Mapping[MemoryCategory, int] | None = None,
    agent_default_retention_days: int | None = None,
) -> ConsolidationResult:
    """Run full maintenance cycle for an agent.

    Orchestrates: retention cleanup -> consolidation -> max enforcement.

    Args:
        agent_id: Agent to maintain.
        agent_category_overrides: Per-category retention overrides
            for this agent.
        agent_default_retention_days: Agent-level default retention
            in days.

    Returns:
        Consolidation result from the consolidation step.
    """
    logger.info(MAINTENANCE_START, agent_id=agent_id)
    try:
        await self.cleanup_retention(
            agent_id,
            agent_category_overrides=agent_category_overrides,
            agent_default_retention_days=agent_default_retention_days,
        )
        result = await self.run_consolidation(agent_id)
        await self.enforce_max_memories(agent_id)
    except Exception as exc:
        logger.exception(
            MAINTENANCE_FAILED,
            agent_id=agent_id,
            error=str(exc),
            error_type=type(exc).__name__,
        )
        raise
    else:
        logger.info(MAINTENANCE_COMPLETE, agent_id=agent_id)
        return result

retention

Retention enforcer for memory lifecycle management.

Deletes memories that have exceeded their per-category retention period. Supports per-agent overrides that take priority over company-level defaults.

RetentionEnforcer

RetentionEnforcer(*, config, backend)

Enforces per-category memory retention policies.

Queries for memories older than the configured retention period and deletes them from the backend.

Parameters:

Name Type Description Default
config RetentionConfig

Retention configuration with per-category rules.

required
backend MemoryBackend

Memory backend for querying and deleting.

required
Source code in src/synthorg/memory/consolidation/retention.py
def __init__(
    self,
    *,
    config: RetentionConfig,
    backend: MemoryBackend,
) -> None:
    self._config = config
    self._backend = backend
    self._categories_to_check = self._build_categories_to_check(config)
    # Explicit per-category rules ONLY (not default-filled entries).
    # _resolve_categories depends on this distinction -- do not
    # include categories filled by the company global default here.
    self._explicit_rules = tuple(
        (rule.category, rule.retention_days) for rule in config.rules
    )

cleanup_expired async

cleanup_expired(
    agent_id,
    now=None,
    *,
    agent_category_overrides=None,
    agent_default_retention_days=None,
)

Delete memories that have exceeded their retention period.

Processes each category independently so that a failure in one category does not prevent cleanup of the remaining categories.

Processes up to 1000 expired entries per category per invocation. Multiple calls may be needed for categories with a large backlog.

When agent_category_overrides or agent_default_retention_days is provided, per-agent retention rules are merged with company defaults using the internal resolution chain (_resolve_categories).

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose memories to clean up.

required
now datetime | None

Current time (defaults to UTC now).

None
agent_category_overrides Mapping[MemoryCategory, int] | None

Per-category retention overrides for this agent (mapping of category to days).

None
agent_default_retention_days int | None

Agent-level default retention in days.

None

Returns:

Type Description
int

Number of expired memories deleted.

Source code in src/synthorg/memory/consolidation/retention.py
async def cleanup_expired(
    self,
    agent_id: NotBlankStr,
    now: datetime | None = None,
    *,
    agent_category_overrides: Mapping[MemoryCategory, int] | None = None,
    agent_default_retention_days: int | None = None,
) -> int:
    """Delete memories that have exceeded their retention period.

    Processes each category independently so that a failure in one
    category does not prevent cleanup of the remaining categories.

    Processes up to 1000 expired entries per category per
    invocation.  Multiple calls may be needed for categories
    with a large backlog.

    When *agent_category_overrides* or *agent_default_retention_days*
    is provided, per-agent retention rules are merged with company
    defaults using the internal resolution chain
    (``_resolve_categories``).

    Args:
        agent_id: Agent whose memories to clean up.
        now: Current time (defaults to UTC now).
        agent_category_overrides: Per-category retention overrides
            for this agent (mapping of category to days).
        agent_default_retention_days: Agent-level default retention
            in days.

    Returns:
        Number of expired memories deleted.
    """
    if now is None:
        now = datetime.now(UTC)

    categories_to_check = self._resolve_for_agent(
        agent_id,
        agent_category_overrides,
        agent_default_retention_days,
    )

    logger.info(RETENTION_CLEANUP_START, agent_id=agent_id)
    total_deleted = 0

    for category, retention_days in categories_to_check:
        try:
            cutoff = now - timedelta(days=retention_days)
            query = MemoryQuery(
                categories=frozenset({category}),
                until=cutoff,
                limit=1000,
            )
            expired = await self._backend.retrieve(agent_id, query)
            for entry in expired:
                deleted = await self._backend.delete(agent_id, entry.id)
                if deleted:
                    total_deleted += 1
                else:
                    logger.debug(
                        RETENTION_DELETE_SKIPPED,
                        agent_id=agent_id,
                        entry_id=entry.id,
                        category=category.value,
                    )
        except Exception as exc:
            logger.warning(
                RETENTION_CLEANUP_FAILED,
                agent_id=agent_id,
                category=category.value,
                error=str(exc),
                error_type=type(exc).__name__,
            )
            continue

    logger.info(
        RETENTION_CLEANUP_COMPLETE,
        agent_id=agent_id,
        deleted_count=total_deleted,
    )
    return total_deleted

archival

Archival store protocol for long-term memory storage.

Defines the protocol for moving memories from the hot store into cold (archival) storage, with search and restore capabilities.

ArchivalStore

Bases: Protocol

Protocol for long-term memory archival storage.

Concrete implementations handle moving memories from the hot (active) store into cold storage for long-term preservation.

archive async

archive(entry)

Archive a memory entry.

Parameters:

Name Type Description Default
entry ArchivalEntry

The archival entry to store.

required

Returns:

Type Description
NotBlankStr

The assigned archive entry ID.

Source code in src/synthorg/memory/consolidation/archival.py
async def archive(self, entry: ArchivalEntry) -> NotBlankStr:
    """Archive a memory entry.

    Args:
        entry: The archival entry to store.

    Returns:
        The assigned archive entry ID.
    """
    ...

search async

search(agent_id, query)

Search archived entries for a specific agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose archived entries to search.

required
query MemoryQuery

Search parameters.

required

Returns:

Type Description
tuple[ArchivalEntry, ...]

Matching archived entries owned by the agent.

Source code in src/synthorg/memory/consolidation/archival.py
async def search(
    self,
    agent_id: NotBlankStr,
    query: MemoryQuery,
) -> tuple[ArchivalEntry, ...]:
    """Search archived entries for a specific agent.

    Args:
        agent_id: Agent whose archived entries to search.
        query: Search parameters.

    Returns:
        Matching archived entries owned by the agent.
    """
    ...

restore async

restore(agent_id, entry_id)

Restore a specific archived entry for a specific agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent who owns the archived entry.

required
entry_id NotBlankStr

The archive entry ID.

required

Returns:

Type Description
ArchivalEntry | None

The archived entry, or None if not found or not

ArchivalEntry | None

owned by the agent.

Source code in src/synthorg/memory/consolidation/archival.py
async def restore(
    self,
    agent_id: NotBlankStr,
    entry_id: NotBlankStr,
) -> ArchivalEntry | None:
    """Restore a specific archived entry for a specific agent.

    Args:
        agent_id: Agent who owns the archived entry.
        entry_id: The archive entry ID.

    Returns:
        The archived entry, or ``None`` if not found or not
        owned by the agent.
    """
    ...

count async

count(agent_id)

Count archived entries for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
int

Number of archived entries.

Source code in src/synthorg/memory/consolidation/archival.py
async def count(self, agent_id: NotBlankStr) -> int:
    """Count archived entries for an agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Number of archived entries.
    """
    ...

simple_strategy

Simple consolidation strategy.

Groups entries by category, keeps the most relevant entry per group (with most recent as tiebreaker), and creates a summary entry from the rest.

SimpleConsolidationStrategy

SimpleConsolidationStrategy(*, backend, group_threshold=_DEFAULT_GROUP_THRESHOLD)

Simple memory consolidation strategy.

Groups entries by category. For each group exceeding a threshold, keeps the entry with the highest relevance score (with most recent as tiebreaker), creates a summary entry from the rest, and deletes consolidated entries from the backend.

Parameters:

Name Type Description Default
backend MemoryBackend

Memory backend for storing summaries.

required
group_threshold int

Minimum group size to trigger consolidation (must be >= 2).

_DEFAULT_GROUP_THRESHOLD

Raises:

Type Description
ValueError

If group_threshold is less than 2.

Source code in src/synthorg/memory/consolidation/simple_strategy.py
def __init__(
    self,
    *,
    backend: MemoryBackend,
    group_threshold: int = _DEFAULT_GROUP_THRESHOLD,
) -> None:
    if group_threshold < _MIN_GROUP_THRESHOLD:
        msg = (
            f"group_threshold must be >= {_MIN_GROUP_THRESHOLD}, "
            f"got {group_threshold}"
        )
        raise ValueError(msg)
    self._backend = backend
    self._group_threshold = group_threshold

consolidate async

consolidate(entries, *, agent_id)

Consolidate entries by grouping and summarizing per category.

Groups with fewer than group_threshold entries are left unchanged.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to consolidate.

required
agent_id NotBlankStr

Owning agent identifier.

required

Returns:

Type Description
ConsolidationResult

Result describing what was consolidated.

Source code in src/synthorg/memory/consolidation/simple_strategy.py
async def consolidate(
    self,
    entries: tuple[MemoryEntry, ...],
    *,
    agent_id: NotBlankStr,
) -> ConsolidationResult:
    """Consolidate entries by grouping and summarizing per category.

    Groups with fewer than ``group_threshold`` entries are left
    unchanged.

    Args:
        entries: Memory entries to consolidate.
        agent_id: Owning agent identifier.

    Returns:
        Result describing what was consolidated.
    """
    if not entries:
        return ConsolidationResult()

    logger.info(
        STRATEGY_START,
        agent_id=agent_id,
        entry_count=len(entries),
    )

    removed_ids: list[NotBlankStr] = []
    summary_id: NotBlankStr | None = None

    sorted_entries = sorted(entries, key=attrgetter("category"))
    groups = groupby(sorted_entries, key=attrgetter("category"))

    for category, group_iter in groups:
        group = list(group_iter)
        if len(group) < self._group_threshold:
            continue

        _, to_remove = self._select_entries(group)
        summary_content = self._build_summary(category, to_remove)

        store_request = MemoryStoreRequest(
            category=category,
            content=summary_content,
            metadata=MemoryMetadata(
                source="consolidation",
                tags=("consolidated",),
            ),
        )
        new_id = await self._backend.store(agent_id, store_request)
        if summary_id is None:
            summary_id = new_id

        for entry in to_remove:
            await self._backend.delete(agent_id, entry.id)
            removed_ids.append(entry.id)

    result = ConsolidationResult(
        removed_ids=tuple(removed_ids),
        summary_id=summary_id,
    )

    logger.info(
        STRATEGY_COMPLETE,
        agent_id=agent_id,
        consolidated_count=result.consolidated_count,
        summary_id=result.summary_id,
    )

    return result

density

Content density classification for dual-mode archival.

Heuristic-based classifier that determines whether memory content is sparse (conversational, narrative) or dense (code, structured data, identifiers). Classification is deterministic -- no LLM calls.

ContentDensity

Bases: StrEnum

Classification of memory content density.

Determines the archival mode: sparse content receives abstractive LLM summarization, dense content receives extractive preservation.

SPARSE class-attribute instance-attribute

SPARSE = 'sparse'

Conversational, narrative, low information density.

DENSE class-attribute instance-attribute

DENSE = 'dense'

Code, structured data, identifiers, high information density.

DensityClassifier

DensityClassifier(*, dense_threshold=0.5)

Heuristic content density classifier.

Classifies text as SPARSE or DENSE based on structural signals: code patterns, structured data markers, identifier density, numeric density, and line structure.

Parameters:

Name Type Description Default
dense_threshold float

Score threshold for DENSE classification (0.0-1.0). Lower values classify more content as dense.

0.5

Raises:

Type Description
ValueError

If dense_threshold is outside [0.0, 1.0].

Source code in src/synthorg/memory/consolidation/density.py
def __init__(self, *, dense_threshold: float = 0.5) -> None:
    if not 0.0 <= dense_threshold <= 1.0:
        msg = f"dense_threshold must be in [0.0, 1.0], got {dense_threshold}"
        raise ValueError(msg)
    self._threshold = dense_threshold

classify

classify(content)

Classify content density.

Parameters:

Name Type Description Default
content str

Text to classify.

required

Returns:

Type Description
ContentDensity

DENSE if score >= threshold, SPARSE otherwise.

Source code in src/synthorg/memory/consolidation/density.py
def classify(self, content: str) -> ContentDensity:
    """Classify content density.

    Args:
        content: Text to classify.

    Returns:
        DENSE if score >= threshold, SPARSE otherwise.
    """
    score = (
        _WEIGHT_CODE * _code_pattern_score(content)
        + _WEIGHT_STRUCTURED * _structured_data_score(content)
        + _WEIGHT_IDENTIFIERS * _identifier_density_score(content)
        + _WEIGHT_NUMERIC * _numeric_density_score(content)
        + _WEIGHT_LINE_STRUCTURE * _line_structure_score(content)
    )
    return (
        ContentDensity.DENSE if score >= self._threshold else ContentDensity.SPARSE
    )

classify_batch

classify_batch(entries)

Classify density for a batch of memory entries.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to classify.

required

Returns:

Type Description
tuple[tuple[MemoryEntry, ContentDensity], ...]

Tuple of (entry, density) pairs in input order.

Source code in src/synthorg/memory/consolidation/density.py
def classify_batch(
    self,
    entries: tuple[MemoryEntry, ...],
) -> tuple[tuple[MemoryEntry, ContentDensity], ...]:
    """Classify density for a batch of memory entries.

    Args:
        entries: Memory entries to classify.

    Returns:
        Tuple of (entry, density) pairs in input order.
    """
    results = tuple((entry, self.classify(entry.content)) for entry in entries)

    if results:
        dense_count = sum(1 for _, d in results if d == ContentDensity.DENSE)
        logger.debug(
            DENSITY_CLASSIFICATION_COMPLETE,
            entry_count=len(results),
            dense_count=dense_count,
            sparse_count=len(results) - dense_count,
        )

    return results

extractive

Extractive preservation for dense memory content.

For dense content (code, structured data, identifiers), summarization is catastrophically lossy. Instead, this module extracts verbatim key facts and structural anchors (start/mid/end) to preserve the most important information.

ExtractivePreserver

ExtractivePreserver(*, max_facts=20, anchor_length=150)

Extracts key facts and structural anchors from dense content.

For dense content (code, structured data, IDs), summarization is catastrophically lossy. Instead, this preserver extracts verbatim key facts (identifiers, URLs, version numbers, key-value pairs) and structural anchors (start/mid/end snippets of the original).

Parameters:

Name Type Description Default
max_facts int

Maximum number of key facts to extract.

20
anchor_length int

Character length of each anchor snippet.

150

Raises:

Type Description
ValueError

If max_facts or anchor_length is < 1.

Source code in src/synthorg/memory/consolidation/extractive.py
def __init__(
    self,
    *,
    max_facts: int = 20,
    anchor_length: int = 150,
) -> None:
    if max_facts < 1:
        msg = f"max_facts must be >= 1, got {max_facts}"
        raise ValueError(msg)
    if anchor_length < 1:
        msg = f"anchor_length must be >= 1, got {anchor_length}"
        raise ValueError(msg)
    self._max_facts = max_facts
    self._anchor_length = anchor_length

extract

extract(content)

Extract key facts and anchors from dense content.

Parameters:

Name Type Description Default
content str

The dense text to extract from.

required

Returns:

Type Description
str

Structured text block with extracted facts and anchors.

Source code in src/synthorg/memory/consolidation/extractive.py
def extract(self, content: str) -> str:
    """Extract key facts and anchors from dense content.

    Args:
        content: The dense text to extract from.

    Returns:
        Structured text block with extracted facts and anchors.
    """
    # Collect all facts, deduplicated, order-preserving
    all_facts: list[str] = []
    seen: set[str] = set()

    for fact in (
        *_extract_urls(content),
        *_extract_identifiers(content),
        *_extract_versions(content),
        *_extract_key_values(content),
    ):
        if fact not in seen:
            seen.add(fact)
            all_facts.append(fact)

    facts = all_facts[: self._max_facts]
    start, mid, end = _build_anchors(content, self._anchor_length)

    lines = ["[Extractive preservation]"]
    if facts:
        lines.append("Key facts:")
        lines.extend(f"- {fact}" for fact in facts)
    lines.append(f"[START] {start}")
    if mid:
        lines.append(f"[MID] {mid}")
    if end:
        lines.append(f"[END] {end}")

    result = "\n".join(lines)

    logger.debug(
        DUAL_MODE_EXTRACTIVE_PRESERVED,
        content_length=len(content),
        fact_count=len(facts),
        anchor_length=self._anchor_length,
    )

    return result

abstractive

Abstractive summarizer for sparse memory content.

Uses an LLM (via CompletionProvider) to generate concise summaries of conversational/narrative memory content. Falls back to truncation if the LLM call fails.

AbstractiveSummarizer

AbstractiveSummarizer(*, provider, model, max_summary_tokens=200, temperature=0.3)

LLM-based abstractive summarizer for sparse content.

Uses a CompletionProvider to generate concise summaries of conversational/narrative memory content. Falls back to truncation if the LLM call fails with a retryable error.

Parameters:

Name Type Description Default
provider CompletionProvider

Completion provider for LLM calls.

required
model NotBlankStr

Model identifier to use for summarization.

required
max_summary_tokens int

Maximum tokens for the summary response.

200
temperature float

Sampling temperature for summarization.

0.3

Raises:

Type Description
ValueError

If model is empty or whitespace-only.

Source code in src/synthorg/memory/consolidation/abstractive.py
def __init__(
    self,
    *,
    provider: CompletionProvider,
    model: NotBlankStr,
    max_summary_tokens: int = 200,
    temperature: float = 0.3,
) -> None:
    if not model or not model.strip():
        msg = "model must be a non-blank string"
        raise ValueError(msg)
    self._provider = provider
    self._model = model
    self._config = CompletionConfig(
        temperature=temperature,
        max_tokens=max_summary_tokens,
    )

summarize async

summarize(content)

Generate an abstractive summary of the given content.

Falls back to truncation if the LLM call fails with a retryable error or returns empty content. Non-retryable provider errors (authentication, invalid model) propagate.

Parameters:

Name Type Description Default
content str

The sparse/conversational text to summarize.

required

Returns:

Type Description
str

Summary text.

Source code in src/synthorg/memory/consolidation/abstractive.py
async def summarize(self, content: str) -> str:
    """Generate an abstractive summary of the given content.

    Falls back to truncation if the LLM call fails with a
    retryable error or returns empty content.  Non-retryable
    provider errors (authentication, invalid model) propagate.

    Args:
        content: The sparse/conversational text to summarize.

    Returns:
        Summary text.
    """
    try:
        messages = [
            ChatMessage(role=MessageRole.SYSTEM, content=_SYSTEM_PROMPT),
            ChatMessage(role=MessageRole.USER, content=content),
        ]
        response = await self._provider.complete(
            messages,
            self._model,
            config=self._config,
        )
        if response.content and response.content.strip():
            logger.debug(
                DUAL_MODE_ABSTRACTIVE_SUMMARY,
                content_length=len(content),
                summary_length=len(response.content),
                model=self._model,
            )
            return response.content.strip()
    except MemoryError, RecursionError:
        raise
    except ProviderError as exc:
        if not exc.is_retryable:
            logger.warning(
                DUAL_MODE_ABSTRACTIVE_FALLBACK,
                content_length=len(content),
                error=str(exc),
                error_type=type(exc).__name__,
                retryable=False,
            )
            raise
        logger.warning(
            DUAL_MODE_ABSTRACTIVE_FALLBACK,
            content_length=len(content),
            error=str(exc),
            error_type=type(exc).__name__,
        )
        return _truncate_fallback(content)
    except Exception as exc:
        logger.warning(
            DUAL_MODE_ABSTRACTIVE_FALLBACK,
            content_length=len(content),
            error=str(exc),
            error_type=type(exc).__name__,
        )
        return _truncate_fallback(content)

    # Fallback: empty/whitespace-only LLM response
    logger.debug(
        DUAL_MODE_ABSTRACTIVE_FALLBACK,
        content_length=len(content),
        reason="empty_response",
    )
    return _truncate_fallback(content)

summarize_batch async

summarize_batch(entries)

Summarize multiple entries concurrently.

Each entry is summarized independently via asyncio.TaskGroup. Failures for individual entries fall back to truncation without aborting the batch.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to summarize.

required

Returns:

Type Description
tuple[tuple[NotBlankStr, str], ...]

Tuple of (entry_id, summary) pairs in input order.

Source code in src/synthorg/memory/consolidation/abstractive.py
async def summarize_batch(
    self,
    entries: tuple[MemoryEntry, ...],
) -> tuple[tuple[NotBlankStr, str], ...]:
    """Summarize multiple entries concurrently.

    Each entry is summarized independently via ``asyncio.TaskGroup``.
    Failures for individual entries fall back to truncation without
    aborting the batch.

    Args:
        entries: Memory entries to summarize.

    Returns:
        Tuple of ``(entry_id, summary)`` pairs in input order.
    """
    if not entries:
        return ()

    results: dict[NotBlankStr, str] = {}
    async with asyncio.TaskGroup() as tg:
        tasks: dict[NotBlankStr, asyncio.Task[str]] = {}
        for entry in entries:
            tasks[entry.id] = tg.create_task(
                self.summarize(entry.content),
            )

    for entry_id, task in tasks.items():
        results[entry_id] = task.result()

    return tuple((entry.id, results[entry.id]) for entry in entries)

dual_mode_strategy

Dual-mode consolidation strategy.

Density-aware consolidation: classifies entries as sparse or dense, then applies LLM abstractive summarization (sparse) or extractive preservation (dense) accordingly.

DualModeConsolidationStrategy

DualModeConsolidationStrategy(
    *,
    backend,
    classifier,
    extractor,
    summarizer,
    group_threshold=_DEFAULT_GROUP_THRESHOLD,
)

Density-aware consolidation strategy.

Classifies entries by content density and applies the appropriate archival mode: LLM abstractive summarization for sparse content, extractive key-fact preservation for dense content.

Groups entries by category. For each group exceeding the threshold, classifies density per-entry, determines the group mode by majority vote, selects the best entry to keep, and processes the rest.

Parameters:

Name Type Description Default
backend MemoryBackend

Memory backend for storing summaries/extractions.

required
classifier DensityClassifier

Density classifier instance.

required
extractor ExtractivePreserver

Extractive preserver instance.

required
summarizer AbstractiveSummarizer

Abstractive summarizer instance.

required
group_threshold int

Minimum group size to trigger consolidation (must be >= 2).

_DEFAULT_GROUP_THRESHOLD

Raises:

Type Description
ValueError

If group_threshold is less than 2.

Source code in src/synthorg/memory/consolidation/dual_mode_strategy.py
def __init__(
    self,
    *,
    backend: MemoryBackend,
    classifier: DensityClassifier,
    extractor: ExtractivePreserver,
    summarizer: AbstractiveSummarizer,
    group_threshold: int = _DEFAULT_GROUP_THRESHOLD,
) -> None:
    if group_threshold < _MIN_GROUP_THRESHOLD:
        msg = (
            f"group_threshold must be >= {_MIN_GROUP_THRESHOLD}, "
            f"got {group_threshold}"
        )
        raise ValueError(msg)
    self._backend = backend
    self._classifier = classifier
    self._extractor = extractor
    self._summarizer = summarizer
    self._group_threshold = group_threshold

consolidate async

consolidate(entries, *, agent_id)

Consolidate entries using density-aware dual-mode approach.

Groups entries by category, classifies density, selects archival mode by majority vote, then processes entries accordingly.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to consolidate.

required
agent_id NotBlankStr

Owning agent identifier.

required

Returns:

Type Description
ConsolidationResult

Result describing what was consolidated.

Source code in src/synthorg/memory/consolidation/dual_mode_strategy.py
async def consolidate(
    self,
    entries: tuple[MemoryEntry, ...],
    *,
    agent_id: NotBlankStr,
) -> ConsolidationResult:
    """Consolidate entries using density-aware dual-mode approach.

    Groups entries by category, classifies density, selects archival
    mode by majority vote, then processes entries accordingly.

    Args:
        entries: Memory entries to consolidate.
        agent_id: Owning agent identifier.

    Returns:
        Result describing what was consolidated.
    """
    if not entries:
        logger.debug(
            STRATEGY_COMPLETE,
            agent_id=agent_id,
            consolidated_count=0,
            strategy="dual_mode",
        )
        return ConsolidationResult()

    logger.info(
        STRATEGY_START,
        agent_id=agent_id,
        entry_count=len(entries),
        strategy="dual_mode",
    )

    removed_ids: list[NotBlankStr] = []
    summary_id: NotBlankStr | None = None
    mode_assignments: list[ArchivalModeAssignment] = []

    sorted_entries = sorted(entries, key=attrgetter("category"))
    groups = groupby(sorted_entries, key=attrgetter("category"))

    for category, group_iter in groups:
        group = list(group_iter)
        if len(group) < self._group_threshold:
            continue
        new_id, group_removed, group_modes = await self._process_group(
            category,
            group,
            agent_id,
        )
        if summary_id is None:
            summary_id = new_id
        removed_ids.extend(group_removed)
        mode_assignments.extend(group_modes)

    result = ConsolidationResult(
        removed_ids=tuple(removed_ids),
        summary_id=summary_id,
        mode_assignments=tuple(mode_assignments),
    )

    logger.info(
        STRATEGY_COMPLETE,
        agent_id=agent_id,
        consolidated_count=result.consolidated_count,
        summary_id=result.summary_id,
        strategy="dual_mode",
    )

    return result