Skip to content

Memory

Persistent agent memory: protocol, retrieval pipeline, shared org memory, consolidation, and archival.

Protocol

protocol

MemoryBackend protocol -- lifecycle + memory operations.

Application code depends on this protocol for agent memory storage and retrieval. Concrete backends implement this protocol to provide connection management, health monitoring, and memory CRUD operations.

MemoryBackend

Bases: Protocol

Structural interface for agent memory storage backends.

Concrete backends implement this protocol to provide per-agent memory storage, retrieval, and lifecycle management.

All CRUD operations (store, retrieve, get, delete, count) require a connected backend and raise MemoryConnectionError if called before connect().

Attributes:

Name Type Description
is_connected bool

Whether the backend has an active connection.

backend_name NotBlankStr

Human-readable backend identifier.

is_connected property

is_connected

Whether the backend has an active connection.

backend_name property

backend_name

Human-readable backend identifier (e.g. "mem0").

connect async

connect()

Establish connection to the memory backend.

Raises:

Type Description
MemoryConnectionError

If the connection cannot be established.

Source code in src/synthorg/memory/protocol.py
async def connect(self) -> None:
    """Establish connection to the memory backend.

    Raises:
        MemoryConnectionError: If the connection cannot be
            established.
    """
    ...

disconnect async

disconnect()

Close the memory backend connection.

Safe to call even if not connected.

Source code in src/synthorg/memory/protocol.py
async def disconnect(self) -> None:
    """Close the memory backend connection.

    Safe to call even if not connected.
    """
    ...

health_check async

health_check()

Check whether the backend is healthy and responsive.

Returns:

Type Description
bool

True if the backend is reachable and operational,

bool

False if unreachable or unhealthy.

Note

Implementations should catch connection-level errors, log them at WARNING level with full exception context, and return False. The caught exception must never be silently discarded. Only raise for programming errors (e.g. backend not initialized).

Source code in src/synthorg/memory/protocol.py
async def health_check(self) -> bool:
    """Check whether the backend is healthy and responsive.

    Returns:
        ``True`` if the backend is reachable and operational,
        ``False`` if unreachable or unhealthy.

    Note:
        Implementations should catch connection-level errors,
        log them at WARNING level with full exception context,
        and return ``False``.  The caught exception must never be
        silently discarded.  Only raise for programming errors
        (e.g. backend not initialized).
    """
    ...

store async

store(agent_id, request)

Store a memory entry for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
request MemoryStoreRequest

Memory content and metadata.

required

Returns:

Type Description
NotBlankStr

The backend-assigned memory ID.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryStoreError

If the store operation fails.

Source code in src/synthorg/memory/protocol.py
async def store(
    self,
    agent_id: NotBlankStr,
    request: MemoryStoreRequest,
) -> NotBlankStr:
    """Store a memory entry for an agent.

    Args:
        agent_id: Owning agent identifier.
        request: Memory content and metadata.

    Returns:
        The backend-assigned memory ID.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryStoreError: If the store operation fails.
    """
    ...

retrieve async

retrieve(agent_id, query)

Retrieve memories for an agent, ordered by relevance.

When query.text is None, performs metadata-only filtering (no semantic search).

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
query MemoryQuery

Retrieval parameters.

required

Returns:

Type Description
tuple[MemoryEntry, ...]

Matching memory entries ordered by relevance.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryRetrievalError

If the retrieval fails.

Source code in src/synthorg/memory/protocol.py
async def retrieve(
    self,
    agent_id: NotBlankStr,
    query: MemoryQuery,
) -> tuple[MemoryEntry, ...]:
    """Retrieve memories for an agent, ordered by relevance.

    When ``query.text`` is ``None``, performs metadata-only
    filtering (no semantic search).

    Args:
        agent_id: Owning agent identifier.
        query: Retrieval parameters.

    Returns:
        Matching memory entries ordered by relevance.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryRetrievalError: If the retrieval fails.
    """
    ...

get async

get(agent_id, memory_id)

Get a specific memory entry by ID.

Returns None when the entry does not exist; MemoryNotFoundError is never raised by this method.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
memory_id NotBlankStr

Memory identifier.

required

Returns:

Type Description
MemoryEntry | None

The memory entry, or None if not found.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryRetrievalError

If the backend query fails.

Source code in src/synthorg/memory/protocol.py
async def get(
    self,
    agent_id: NotBlankStr,
    memory_id: NotBlankStr,
) -> MemoryEntry | None:
    """Get a specific memory entry by ID.

    Returns ``None`` when the entry does not exist;
    ``MemoryNotFoundError`` is never raised by this method.

    Args:
        agent_id: Owning agent identifier.
        memory_id: Memory identifier.

    Returns:
        The memory entry, or ``None`` if not found.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryRetrievalError: If the backend query fails.
    """
    ...

delete async

delete(agent_id, memory_id)

Delete a specific memory entry.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
memory_id NotBlankStr

Memory identifier.

required

Returns:

Type Description
bool

True if the entry was deleted, False if not found.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryStoreError

If the delete operation fails.

Source code in src/synthorg/memory/protocol.py
async def delete(
    self,
    agent_id: NotBlankStr,
    memory_id: NotBlankStr,
) -> bool:
    """Delete a specific memory entry.

    Args:
        agent_id: Owning agent identifier.
        memory_id: Memory identifier.

    Returns:
        ``True`` if the entry was deleted, ``False`` if not found.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryStoreError: If the delete operation fails.
    """
    ...

count async

count(agent_id, *, category=None)

Count memory entries for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Owning agent identifier.

required
category MemoryCategory | None

Optional category filter.

None

Returns:

Type Description
int

Number of matching entries.

Raises:

Type Description
MemoryConnectionError

If the backend is not connected.

MemoryRetrievalError

If the count query fails.

Source code in src/synthorg/memory/protocol.py
async def count(
    self,
    agent_id: NotBlankStr,
    *,
    category: MemoryCategory | None = None,
) -> int:
    """Count memory entries for an agent.

    Args:
        agent_id: Owning agent identifier.
        category: Optional category filter.

    Returns:
        Number of matching entries.

    Raises:
        MemoryConnectionError: If the backend is not connected.
        MemoryRetrievalError: If the count query fails.
    """
    ...

Config

config

Memory configuration models.

Frozen Pydantic models for company-wide memory backend selection and backend-specific settings.

MemoryStorageConfig pydantic-model

Bases: BaseModel

Storage-specific memory configuration.

Attributes:

Name Type Description
data_dir NotBlankStr

Directory path for memory data persistence.

vector_store NotBlankStr

Vector store backend name.

history_store NotBlankStr

History store backend name.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_store_names
  • _reject_traversal

data_dir pydantic-field

data_dir = '/data/memory'

Directory path for memory data persistence. Default targets a Docker volume mount -- override for local development.

vector_store pydantic-field

vector_store = 'qdrant'

Vector store backend name

history_store pydantic-field

history_store = 'sqlite'

History store backend name

MemoryOptionsConfig pydantic-model

Bases: BaseModel

Memory behaviour options.

Attributes:

Name Type Description
retention_days int | None

Days to retain memories (None = forever).

max_memories_per_agent int

Maximum memories per agent.

consolidation_interval ConsolidationInterval

How often to consolidate memories.

shared_knowledge_base bool

Whether shared knowledge is enabled.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

retention_days pydantic-field

retention_days = None

Days to retain memories (None = forever)

max_memories_per_agent pydantic-field

max_memories_per_agent = 10000

Maximum memories per agent

consolidation_interval pydantic-field

consolidation_interval = DAILY

How often to consolidate memories

shared_knowledge_base pydantic-field

shared_knowledge_base = True

Whether shared knowledge is enabled

EmbedderOverrideConfig pydantic-model

Bases: BaseModel

User-facing embedder override configuration.

Allows users to override the auto-selected embedding model via company YAML config, runtime settings, or template config. All fields are optional -- None means "use auto-selection".

Attributes:

Name Type Description
provider NotBlankStr | None

Embedding provider name override.

model NotBlankStr | None

Embedding model identifier override.

dims int | None

Embedding vector dimensions (required when model is set, since dimensions are model-dependent).

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

Validators:

  • _model_requires_dims

provider pydantic-field

provider = None

Embedding provider name override

model pydantic-field

model = None

Embedding model identifier override

dims pydantic-field

dims = None

Embedding vector dimensions

CompanyMemoryConfig pydantic-model

Bases: BaseModel

Top-level company-wide memory configuration.

Attributes:

Name Type Description
backend NotBlankStr

Memory backend name (validated against _VALID_BACKENDS).

level MemoryLevel

Default memory persistence level.

storage MemoryStorageConfig

Storage-specific settings.

options MemoryOptionsConfig

Memory behaviour options.

retrieval MemoryRetrievalConfig

Memory retrieval pipeline settings.

consolidation ConsolidationConfig

Memory consolidation settings.

embedder EmbedderOverrideConfig | None

Optional embedder override (None = auto-select).

procedural ProceduralMemoryConfig

Procedural memory auto-generation settings.

composite CompositeBackendConfig | None

Composite backend routing config (required when backend is "composite").

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors
  • _validate_backend_name
  • _validate_composite_config

backend pydantic-field

backend = 'mem0'

Memory backend name

level pydantic-field

level = SESSION

Default memory persistence level

storage pydantic-field

storage

Storage-specific settings

options pydantic-field

options

Memory behaviour options

retrieval pydantic-field

retrieval

Memory retrieval pipeline settings

consolidation pydantic-field

consolidation

Memory consolidation settings

embedder pydantic-field

embedder = None

Optional embedder override. When set, overrides auto-selection for provider, model, and/or dims.

procedural pydantic-field

procedural

Procedural memory auto-generation settings. Controls whether failure-driven skill proposals are generated, which model to use, and quality thresholds.

composite pydantic-field

composite = None

Composite backend routing configuration. Required when backend is 'composite'.

Models

models

Memory domain models.

Frozen Pydantic models for memory storage requests, entries, and queries. MemoryStoreRequest is what callers pass to store(); MemoryEntry is what comes back from retrieve().

MemoryMetadata pydantic-model

Bases: BaseModel

Metadata associated with a memory entry.

Attributes:

Name Type Description
source NotBlankStr | None

Origin of the memory (task ID, conversation, etc.).

confidence float

Confidence score for the memory (0.0 to 1.0).

tags tuple[NotBlankStr, ...]

Categorization tags for filtering.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _deduplicate_tagstags

source pydantic-field

source = None

Origin of the memory

confidence pydantic-field

confidence = 1.0

Confidence score

tags pydantic-field

tags = ()

Categorization tags

MemoryStoreRequest pydantic-model

Bases: BaseModel

Input to MemoryBackend.store().

The backend assigns id and created_at; callers should not fabricate them.

Attributes:

Name Type Description
category MemoryCategory

Memory type category.

namespace NotBlankStr

Storage namespace for routing (e.g. "memories", "scratch"). The composite backend uses this to dispatch to durable vs thread-scoped backends.

content NotBlankStr

Memory content text.

metadata MemoryMetadata

Associated metadata.

expires_at AwareDatetime | None

Optional expiration timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

category pydantic-field

category

Memory type category

namespace pydantic-field

namespace = 'default'

Storage namespace for composite routing

content pydantic-field

content

Memory content text

metadata pydantic-field

metadata

Associated metadata

expires_at pydantic-field

expires_at = None

Optional expiration timestamp

MemoryEntry pydantic-model

Bases: BaseModel

A memory entry returned from the backend.

Attributes:

Name Type Description
id NotBlankStr

Unique memory identifier (assigned by backend).

agent_id NotBlankStr

Owning agent identifier.

namespace NotBlankStr

Storage namespace (routing key for composite backend).

category MemoryCategory

Memory type category.

content NotBlankStr

Memory content text.

metadata MemoryMetadata

Associated metadata.

created_at AwareDatetime

Creation timestamp.

updated_at AwareDatetime | None

Last update timestamp.

expires_at AwareDatetime | None

Optional expiration timestamp.

relevance_score float | None

Relevance score set by backend on retrieval.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_timestamps

id pydantic-field

id

Unique memory identifier

agent_id pydantic-field

agent_id

Owning agent identifier

namespace pydantic-field

namespace = 'default'

Storage namespace for composite routing

category pydantic-field

category

Memory type category

content pydantic-field

content

Memory content text

metadata pydantic-field

metadata

Associated metadata

created_at pydantic-field

created_at

Creation timestamp

updated_at pydantic-field

updated_at = None

Last update timestamp

expires_at pydantic-field

expires_at = None

Optional expiration timestamp

relevance_score pydantic-field

relevance_score = None

Relevance score set by backend on retrieval

MemoryQuery pydantic-model

Bases: BaseModel

Query parameters for MemoryBackend.retrieve().

When text is None, the backend performs metadata-only filtering (no semantic search).

Attributes:

Name Type Description
text NotBlankStr | None

Semantic search text (None for metadata-only).

namespaces frozenset[NotBlankStr] | None

Filter by storage namespaces.

categories frozenset[MemoryCategory] | None

Filter by memory categories.

tags tuple[NotBlankStr, ...]

Filter by tags (AND semantics).

min_relevance float

Minimum relevance score threshold.

limit int

Maximum number of results.

since AwareDatetime | None

Only memories created at or after this timestamp.

until AwareDatetime | None

Only memories created before this timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _deduplicate_tagstags
  • _validate_time_range

text pydantic-field

text = None

Semantic search text

namespaces pydantic-field

namespaces = None

Filter by storage namespaces

categories pydantic-field

categories = None

Filter by memory categories

tags pydantic-field

tags = ()

Filter by tags (AND semantics)

min_relevance pydantic-field

min_relevance = 0.0

Minimum relevance score threshold

limit pydantic-field

limit = 10

Maximum number of results

since pydantic-field

since = None

Only memories created at or after this timestamp

until pydantic-field

until = None

Only memories created before this timestamp

Capabilities

capabilities

MemoryCapabilities protocol -- capability discovery.

Backends that implement MemoryCapabilities expose what features they support, enabling runtime capability checks.

MemoryCapabilities

Bases: Protocol

Capability discovery for memory backends.

Attributes:

Name Type Description
supported_categories frozenset[MemoryCategory]

Memory categories this backend supports.

supports_graph bool

Whether graph-based memory is available.

supports_temporal bool

Whether temporal tracking is available.

supports_vector_search bool

Whether vector/semantic search is available.

supports_shared_access bool

Whether cross-agent shared memory is available.

max_memories_per_agent int | None

Maximum memories per agent, or None for unlimited.

supported_categories property

supported_categories

Memory categories this backend supports.

supports_graph property

supports_graph

Whether graph-based memory is available.

supports_temporal property

supports_temporal

Whether temporal tracking is available.

supports_vector_search

Whether vector/semantic search is available.

supports_shared_access property

supports_shared_access

Whether cross-agent shared memory is available.

max_memories_per_agent property

max_memories_per_agent

Maximum memories per agent, or None for unlimited.

Errors

errors

Memory error hierarchy.

All memory-related errors inherit from MemoryError so callers can catch the entire family with a single except clause.

Note: this shadows the built-in MemoryError (which signals out-of-memory conditions in CPython). Within the synthorg namespace the domain-specific meaning is unambiguous; callers outside the package should import explicitly.

MemoryError

MemoryError(message=None)

Bases: DomainError

Base exception for all memory operations.

Inherits :class:DomainError so the prefix-vs-category validator runs on every subclass. Subclasses keep the inherited ErrorCode.INTERNAL_ERROR default (8000, INTERNAL category) -- callers that need a more specific code per memory failure mode should override the ClassVars on the subclass.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MemoryConnectionError

MemoryConnectionError(message=None)

Bases: MemoryError

Raised when a backend connection cannot be established or is lost.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MemoryStoreError

MemoryStoreError(message=None)

Bases: MemoryError

Raised when a store operation fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MemoryRetrievalError

MemoryRetrievalError(message=None)

Bases: MemoryError

Raised when a retrieve or search operation fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MemoryNotFoundError

MemoryNotFoundError(message=None)

Bases: MemoryError

Raised when a specific memory ID is not found.

Note: The MemoryBackend.get() protocol method returns None for missing entries rather than raising this error. This exception is available for concrete backend implementations that need to signal "not found" in non-protocol internal methods or batch operations.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MemoryConfigError

MemoryConfigError(message=None)

Bases: MemoryError

Raised when memory configuration is invalid.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MemoryCapabilityError

MemoryCapabilityError(message=None)

Bases: MemoryError

Raised when an unsupported operation is attempted for a backend.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

FineTuneDependencyError

FineTuneDependencyError(message=None)

Bases: MemoryError

Raised when fine-tuning ML dependencies are not installed.

In the default Docker-orchestrated deployment torch and sentence-transformers ship inside the synthorg-fine-tune-gpu / synthorg-fine-tune-cpu container that the backend spawns on demand; this error indicates the feature is turned off for the current install (synthorg config set fine_tuning true enables it). The optional synthorg[fine-tune-gpu] / synthorg[fine-tune-cpu] extras only apply when running fine-tuning in-process (dev / testing).

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

FineTuneCancelledError

FineTuneCancelledError(message=None)

Bases: MemoryError

Raised when a fine-tuning pipeline run is cancelled.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

Factory

factory

Factory for creating memory backends from configuration.

Each company gets its own MemoryBackend instance. The factory dispatches to concrete backend implementations based on config.backend via :class:MemoryBackendRegistry.

default_registry

default_registry()

Return the module-level registry containing the built-in backends.

Source code in src/synthorg/memory/factory.py
def default_registry() -> MemoryBackendRegistry:
    """Return the module-level registry containing the built-in backends."""
    return _REGISTRY

create_memory_backend

create_memory_backend(config, *, embedder=None)

Create a memory backend from configuration.

Parameters:

Name Type Description Default
config CompanyMemoryConfig

Memory configuration (includes backend selection and backend-specific settings).

required
embedder Mem0EmbedderConfig | None

Backend-specific embedder configuration. Required for the "mem0" backend (must be a Mem0EmbedderConfig instance).

None

Returns:

Type Description
MemoryBackend

A new, disconnected backend instance. The caller must call

MemoryBackend

connect() before use.

Raises:

Type Description
MemoryConfigError

If the backend is not recognized or required configuration is missing.

Source code in src/synthorg/memory/factory.py
def create_memory_backend(
    config: CompanyMemoryConfig,
    *,
    embedder: Mem0EmbedderConfig | None = None,
) -> MemoryBackend:
    """Create a memory backend from configuration.

    Args:
        config: Memory configuration (includes backend selection and
            backend-specific settings).
        embedder: Backend-specific embedder configuration.  Required
            for the ``"mem0"`` backend (must be a
            ``Mem0EmbedderConfig`` instance).

    Returns:
        A new, disconnected backend instance.  The caller must call
        ``connect()`` before use.

    Raises:
        MemoryConfigError: If the backend is not recognized or
            required configuration is missing.
    """
    try:
        return _REGISTRY.build(config.backend, config, embedder=embedder)
    except StrategyFactoryNotFoundError as exc:
        msg = f"Unknown memory backend: {config.backend!r}"
        logger.warning(
            MEMORY_BACKEND_UNKNOWN,
            backend=config.backend,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise MemoryConfigError(msg) from exc

Retriever

retriever

Context injection strategy -- pre-retrieves and injects memories.

Orchestrates the full retrieval pipeline: backend query → ranking → budget-fit → format. Implements MemoryInjectionStrategy protocol.

ContextInjectionStrategy

ContextInjectionStrategy(
    *,
    backend,
    config,
    shared_store=None,
    token_estimator=None,
    memory_filter=None,
    hierarchical_retriever=None,
    reranker=None,
)

Context injection strategy -- pre-retrieves and injects memories.

Implements MemoryInjectionStrategy protocol. Orchestrates the full pipeline: retrieve → rank → budget-fit → format.

Initialise the context injection strategy.

Parameters:

Name Type Description Default
backend MemoryBackend

Memory backend for personal memories.

required
config MemoryRetrievalConfig

Retrieval pipeline configuration.

required
shared_store SharedKnowledgeStore | None

Optional shared knowledge store.

None
token_estimator TokenEstimator | None

Optional custom token estimator.

None
memory_filter MemoryFilterStrategy | None

Optional filter applied after ranking, before formatting. When None and config.non_inferable_only is True, a TagBasedMemoryFilter is auto-created. When None and non_inferable_only is False, all ranked memories are injected (backward-compatible).

None
hierarchical_retriever HierarchicalRetriever | None

Optional hierarchical retriever (used when config.retriever == "hierarchical").

None
reranker QuerySpecificReranker | None

Optional query-specific re-ranker (used when config.query_specific_rerank_enabled is True).

None
Source code in src/synthorg/memory/retriever.py
def __init__(  # noqa: PLR0913
    self,
    *,
    backend: MemoryBackend,
    config: MemoryRetrievalConfig,
    shared_store: SharedKnowledgeStore | None = None,
    token_estimator: TokenEstimator | None = None,
    memory_filter: MemoryFilterStrategy | None = None,
    hierarchical_retriever: HierarchicalRetriever | None = None,
    reranker: QuerySpecificReranker | None = None,
) -> None:
    """Initialise the context injection strategy.

    Args:
        backend: Memory backend for personal memories.
        config: Retrieval pipeline configuration.
        shared_store: Optional shared knowledge store.
        token_estimator: Optional custom token estimator.
        memory_filter: Optional filter applied after ranking,
            before formatting.  When ``None`` and
            ``config.non_inferable_only`` is ``True``, a
            ``TagBasedMemoryFilter`` is auto-created.  When ``None``
            and ``non_inferable_only`` is ``False``, all ranked
            memories are injected (backward-compatible).
        hierarchical_retriever: Optional hierarchical retriever
            (used when ``config.retriever == "hierarchical"``).
        reranker: Optional query-specific re-ranker (used when
            ``config.query_specific_rerank_enabled`` is ``True``).
    """
    self._backend = backend
    self._config = config
    self._shared_store = shared_store
    if memory_filter is None and config.non_inferable_only:
        memory_filter = TagBasedMemoryFilter()
    elif memory_filter is not None and config.non_inferable_only:
        logger.debug(
            MEMORY_FILTER_INIT,
            note="explicit memory_filter overrides non_inferable_only config",
            filter_strategy=getattr(memory_filter, "strategy_name", "unknown"),
        )
    self._memory_filter = memory_filter
    self._estimator = (
        token_estimator if token_estimator is not None else DefaultTokenEstimator()
    )
    if config.retriever == "hierarchical" and hierarchical_retriever is None:
        msg = "retriever='hierarchical' requires a hierarchical_retriever instance"
        logger.error(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline_init",
            error_type="misconfiguration",
            reason=msg,
        )
        raise ValueError(msg)
    if config.query_specific_rerank_enabled and reranker is None:
        msg = "query_specific_rerank_enabled=True requires a reranker instance"
        logger.error(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline_init",
            error_type="misconfiguration",
            reason=msg,
        )
        raise ValueError(msg)
    self._hierarchical_retriever = hierarchical_retriever
    self._reranker = reranker

strategy_name property

strategy_name

Human-readable strategy identifier.

Returns:

Type Description
str

"context_injection".

prepare_messages async

prepare_messages(agent_id, query_text, token_budget, *, categories=None)

Full pipeline: retrieve → rank → budget-fit → format.

Returns empty tuple on any failure (graceful degradation). Never raises domain memory errors to the caller. Re-raises builtins.MemoryError and RecursionError.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent requesting memories.

required
query_text NotBlankStr

Text for semantic retrieval.

required
token_budget int

Maximum tokens for memory content.

required
categories frozenset[MemoryCategory] | None

Optional category filter.

None

Returns:

Type Description
tuple[ChatMessage, ...]

Tuple of ChatMessage instances (may be empty).

Source code in src/synthorg/memory/retriever.py
async def prepare_messages(
    self,
    agent_id: NotBlankStr,
    query_text: NotBlankStr,
    token_budget: int,
    *,
    categories: frozenset[MemoryCategory] | None = None,
) -> tuple[ChatMessage, ...]:
    """Full pipeline: retrieve → rank → budget-fit → format.

    Returns empty tuple on any failure (graceful degradation).
    Never raises domain memory errors to the caller.
    Re-raises ``builtins.MemoryError`` and ``RecursionError``.

    Args:
        agent_id: The agent requesting memories.
        query_text: Text for semantic retrieval.
        token_budget: Maximum tokens for memory content.
        categories: Optional category filter.

    Returns:
        Tuple of ``ChatMessage`` instances (may be empty).
    """
    logger.info(
        MEMORY_RETRIEVAL_START,
        agent_id=agent_id,
        token_budget=token_budget,
    )

    if token_budget <= 0:
        logger.info(
            MEMORY_RETRIEVAL_SKIPPED,
            agent_id=agent_id,
            reason="non-positive token budget",
            token_budget=token_budget,
        )
        return ()

    try:
        return await self._execute_pipeline(
            agent_id=agent_id,
            query_text=query_text,
            token_budget=token_budget,
            categories=categories,
        )
    except builtins.MemoryError, RecursionError:
        logger.error(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline",
            agent_id=agent_id,
            error_type="system",
        )
        raise
    except memory_errors.MemoryError:
        logger.warning(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline",
            agent_id=agent_id,
        )
        return ()
    except Exception as exc:
        # ExceptionGroup may wrap system-level errors that must
        # propagate -- inspect and re-raise them.
        if isinstance(exc, ExceptionGroup):
            system_errors = exc.subgroup(
                lambda e: isinstance(
                    e,
                    builtins.MemoryError | RecursionError,
                ),
            )
            if system_errors is not None:
                logger.error(
                    MEMORY_RETRIEVAL_DEGRADED,
                    source="pipeline",
                    agent_id=agent_id,
                    error_type="system_in_exception_group",
                )
                raise system_errors.exceptions[0] from exc
        logger.error(
            MEMORY_RETRIEVAL_DEGRADED,
            source="pipeline",
            agent_id=agent_id,
            error_type=type(exc).__qualname__,
        )
        return ()

get_tool_definitions

get_tool_definitions()

Context injection provides no tools.

Returns:

Type Description
tuple[ToolDefinition, ...]

Empty tuple.

Source code in src/synthorg/memory/retriever.py
def get_tool_definitions(self) -> tuple[ToolDefinition, ...]:
    """Context injection provides no tools.

    Returns:
        Empty tuple.
    """
    return ()

Retrieval Config

retrieval_config

Memory retrieval pipeline configuration.

Frozen Pydantic config for the retrieval pipeline -- weights, thresholds, strategy selection, hierarchical retriever, and query-specific re-ranking.

MemoryRetrievalConfig pydantic-model

Bases: BaseModel

Configuration for the memory retrieval and ranking pipeline.

Attributes:

Name Type Description
strategy InjectionStrategy

Which injection strategy to use.

relevance_weight float

Weight for backend relevance score (0.0-1.0).

recency_weight float

Weight for recency decay score (0.0-1.0).

recency_decay_rate float

Exponential decay rate per hour.

personal_boost float

Boost applied to personal over shared (0.0-1.0).

min_relevance float

Minimum combined (relevance + recency) score to include.

max_memories int

Maximum candidates to retrieve (1-100).

include_shared bool

Whether to query SharedKnowledgeStore.

default_relevance float

Score for entries missing relevance_score.

injection_point InjectionPoint

Message role for context injection.

non_inferable_only bool

When True, auto-creates a TagBasedMemoryFilter in ContextInjectionStrategy if no explicit filter is provided.

fusion_strategy FusionStrategy

Ranking fusion strategy -- LINEAR for single-source relevance+recency, RRF for multi-source ranked list merging.

rrf_k int

RRF smoothing constant (1-1000, only used with RRF strategy).

diversity_penalty_enabled bool

When True, apply MMR-style diversity penalty to re-rank retrieval results, reducing redundancy. Only consumed by ContextInjectionStrategy; a validator raises if combined with another strategy. Defaults to False.

diversity_lambda float

MMR trade-off parameter (0.0-1.0). 1.0 means pure relevance (no diversity), 0.0 means maximum diversity. Defaults to 0.7. Only consulted when diversity_penalty_enabled is True.

query_reformulation_enabled bool

When True, enables the Search-and-Ask iterative query-reformulation loop in the TOOL_BASED strategy. Requires ToolBasedInjectionStrategy to be constructed with both reformulator and sufficiency_checker; the strategy constructor raises when the flag is set but either dependency is missing. Defaults to False.

max_reformulation_rounds int

Maximum rounds of query reformulation in the Search-and-Ask loop (1-5). Defaults to 2.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_weight_sum
  • _validate_rrf_k_strategy_consistency
  • _validate_diversity_lambda_consistency
  • _validate_diversity_strategy_consistency
  • _validate_reformulation_requires_tool_based
  • _validate_personal_boost_rrf_consistency
  • _validate_pool_multiplier_consistency
  • _validate_supported_strategy
  • _validate_hierarchical_requires_context
  • _validate_hierarchical_field_consistency
  • _validate_rerank_cache_ttl_consistency

strategy pydantic-field

strategy = CONTEXT

Which injection strategy to use

relevance_weight pydantic-field

relevance_weight = 0.7

Weight for backend relevance score

recency_weight pydantic-field

recency_weight = 0.3

Weight for recency decay score

recency_decay_rate pydantic-field

recency_decay_rate = 0.01

Exponential decay rate per hour

personal_boost pydantic-field

personal_boost = 0.1

Boost applied to personal over shared memories

min_relevance pydantic-field

min_relevance = 0.3

Minimum combined (relevance + recency) score to include

max_memories pydantic-field

max_memories = 20

Maximum candidates to retrieve

include_shared pydantic-field

include_shared = True

Whether to query SharedKnowledgeStore

default_relevance pydantic-field

default_relevance = 0.5

Score for entries missing relevance_score

injection_point pydantic-field

injection_point = SYSTEM

Message role for context injection

non_inferable_only pydantic-field

non_inferable_only = False

When True, only inject memories tagged as non-inferable

fusion_strategy pydantic-field

fusion_strategy = LINEAR

Ranking fusion strategy: linear for single-source relevance+recency, rrf for multi-source ranked list merging

rrf_k pydantic-field

rrf_k = _DEFAULT_RRF_K

RRF smoothing constant k (only used with RRF strategy)

diversity_penalty_enabled pydantic-field

diversity_penalty_enabled = False

When True, apply MMR-style diversity penalty to re-rank retrieval results, reducing redundancy

diversity_lambda pydantic-field

diversity_lambda = _DEFAULT_DIVERSITY_LAMBDA

MMR trade-off parameter: 1.0 = pure relevance (no diversity), 0.0 = maximum diversity

candidate_pool_multiplier pydantic-field

candidate_pool_multiplier = _DEFAULT_CANDIDATE_POOL_MULTIPLIER

Over-fetch multiplier for the candidate pool when diversity_penalty_enabled is True. The backend query fetches max_memories * candidate_pool_multiplier entries so MMR can promote diverse candidates that would otherwise fall below the top-K cutoff. Ignored when diversity_penalty_enabled is False.

query_reformulation_enabled pydantic-field

query_reformulation_enabled = False

Enables iterative query reformulation in the TOOL_BASED strategy. When True, ToolBasedInjectionStrategy runs a Search-and-Ask loop (retrieve -> check sufficiency -> reformulate -> re-retrieve) up to max_reformulation_rounds rounds. Requires both reformulator AND sufficiency_checker to be passed to the strategy constructor -- the constructor raises ValueError when the flag is set but either collaborator is missing (fail-fast at wiring time rather than silent no-op at retrieval time). A config-level validator also rejects this flag with strategies other than TOOL_BASED.

max_reformulation_rounds pydantic-field

max_reformulation_rounds = 2

Maximum rounds of query reformulation in the Search-and-Ask loop when query_reformulation_enabled is True (1-5).

retriever pydantic-field

retriever = 'flat'

Retriever topology: flat uses the existing single-pass pipeline, hierarchical uses supervisor-worker routing with semantic, episodic, and procedural workers.

max_workers_per_query pydantic-field

max_workers_per_query = _DEFAULT_MAX_WORKERS_PER_QUERY

Maximum workers the supervisor may invoke per query (only used when retriever is hierarchical).

reflective_retry_enabled pydantic-field

reflective_retry_enabled = True

When True, the hierarchical supervisor evaluates result quality and retries with corrected queries on poor results.

max_retry_count pydantic-field

max_retry_count = _DEFAULT_MAX_RETRY_COUNT

Maximum reflective retry attempts (only used when retriever is hierarchical and reflective_retry_enabled is True).

query_specific_rerank_enabled pydantic-field

query_specific_rerank_enabled = False

When True, apply query-specific LLM-based re-ranking after RRF/linear fusion. Works with both flat and hierarchical retrievers. Adds an LLM call per retrieve.

rerank_cache_ttl_seconds pydantic-field

rerank_cache_ttl_seconds = _DEFAULT_RERANK_CACHE_TTL_SECONDS

TTL in seconds for the re-ranker cache (only used when query_specific_rerank_enabled is True).

Ranking

ranking

Memory ranking -- scoring and sorting functions.

All functions are functionally pure (deterministic given the same inputs). Logging calls are the only side effect.

rank_memories scores entries via linear combination of relevance and recency (single-source). fuse_ranked_lists merges multiple pre-ranked lists via Reciprocal Rank Fusion (multi-source). apply_diversity_penalty re-ranks using MMR to reduce redundancy.

FusionStrategy

Bases: StrEnum

Ranking fusion strategy selection.

Attributes:

Name Type Description
LINEAR

Weighted linear combination of relevance and recency (default, for single-source scoring).

RRF

Reciprocal Rank Fusion for merging multiple ranked lists (for multi-source hybrid search).

ScoredMemory pydantic-model

Bases: BaseModel

Memory entry with computed ranking scores.

Produced by either rank_memories (LINEAR fusion) or fuse_ranked_lists (RRF fusion). Field semantics depend on which producer created the instance:

  • LINEAR: relevance_score is raw backend relevance plus personal_boost (for personal entries), recency_score is the exponential decay based on age, and combined_score is the weighted linear combination of the two.
  • RRF: relevance_score preserves the raw backend relevance (or 0.0 if absent), recency_score is always 0.0 (RRF is rank-based, not time-based), and combined_score is the min-max-normalized fusion score.

Attributes:

Name Type Description
entry MemoryEntry

The original memory entry.

relevance_score float

For LINEAR, post-boost relevance; for RRF, raw backend relevance (0.0-1.0).

recency_score float

Exponential decay based on age (LINEAR) or always 0.0 (RRF).

combined_score float

Final ranking signal (0.0-1.0). LINEAR weighted combination or RRF normalized fusion score.

is_shared bool

Whether this came from SharedKnowledgeStore.

scoring_strategy FusionStrategy | None

Which fusion strategy produced this instance, or None when unset (backward compatibility).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

entry pydantic-field

entry

The original memory entry

relevance_score pydantic-field

relevance_score

LINEAR: post-boost relevance. RRF: raw backend relevance.

recency_score pydantic-field

recency_score

Recency decay score (always 0.0 for RRF).

combined_score pydantic-field

combined_score

LINEAR: weighted combination. RRF: normalized fusion score.

is_shared pydantic-field

is_shared = False

Whether from SharedKnowledgeStore

scoring_strategy pydantic-field

scoring_strategy = None

Which fusion strategy produced this instance (None when unset by the producer)

compute_recency_score

compute_recency_score(created_at, now, decay_rate)

Compute exponential recency decay score.

exp(-decay_rate * age_hours). Returns 1.0 for zero age, decays toward 0.0 over time. Future timestamps are clamped to 1.0.

Parameters:

Name Type Description Default
created_at datetime

When the memory was created.

required
now datetime

Current timestamp for age calculation.

required
decay_rate float

Exponential decay rate per hour.

required

Returns:

Type Description
float

Recency score between 0.0 and 1.0.

Source code in src/synthorg/memory/ranking.py
def compute_recency_score(
    created_at: datetime,
    now: datetime,
    decay_rate: float,
) -> float:
    """Compute exponential recency decay score.

    ``exp(-decay_rate * age_hours)``.  Returns 1.0 for zero age,
    decays toward 0.0 over time.  Future timestamps are clamped to
    1.0.

    Args:
        created_at: When the memory was created.
        now: Current timestamp for age calculation.
        decay_rate: Exponential decay rate per hour.

    Returns:
        Recency score between 0.0 and 1.0.
    """
    age_seconds = (now - created_at).total_seconds()
    if age_seconds <= 0:
        return 1.0
    age_hours = age_seconds / 3600.0
    return math.exp(-decay_rate * age_hours)

compute_combined_score

compute_combined_score(relevance, recency, relevance_weight, recency_weight)

Weighted linear combination of relevance and recency.

Parameters:

Name Type Description Default
relevance float

Relevance score (0.0-1.0).

required
recency float

Recency score (0.0-1.0).

required
relevance_weight float

Weight for relevance.

required
recency_weight float

Weight for recency.

required

Returns:

Type Description
float

Combined score clamped to [0.0, 1.0]. When

float

relevance_weight + recency_weight == 1.0 and inputs are

float

in [0.0, 1.0], the result is naturally bounded; the clamp

float

guards against floating-point tolerance in the weight sum.

Source code in src/synthorg/memory/ranking.py
def compute_combined_score(
    relevance: float,
    recency: float,
    relevance_weight: float,
    recency_weight: float,
) -> float:
    """Weighted linear combination of relevance and recency.

    Args:
        relevance: Relevance score (0.0-1.0).
        recency: Recency score (0.0-1.0).
        relevance_weight: Weight for relevance.
        recency_weight: Weight for recency.

    Returns:
        Combined score clamped to [0.0, 1.0].  When
        ``relevance_weight + recency_weight == 1.0`` and inputs are
        in [0.0, 1.0], the result is naturally bounded; the clamp
        guards against floating-point tolerance in the weight sum.
    """
    return min(1.0, relevance_weight * relevance + recency_weight * recency)

rank_memories

rank_memories(entries, *, config, now, shared_entries=())

Score, merge, sort, filter, and truncate memory entries.

  1. Score personal entries (with personal_boost).
  2. Score shared entries (no boost).
  3. Merge both sets.
  4. Filter by min_relevance threshold on combined_score.
  5. Sort descending by combined_score.
  6. Truncate to max_memories.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Personal memory entries.

required
config MemoryRetrievalConfig

Retrieval pipeline configuration.

required
now datetime

Current timestamp for recency calculations.

required
shared_entries tuple[MemoryEntry, ...]

Shared memory entries (no personal boost).

()

Returns:

Type Description
tuple[ScoredMemory, ...]

Sorted and filtered tuple of ScoredMemory.

Source code in src/synthorg/memory/ranking.py
def rank_memories(
    entries: tuple[MemoryEntry, ...],
    *,
    config: MemoryRetrievalConfig,
    now: datetime,
    shared_entries: tuple[MemoryEntry, ...] = (),
) -> tuple[ScoredMemory, ...]:
    """Score, merge, sort, filter, and truncate memory entries.

    1. Score personal entries (with ``personal_boost``).
    2. Score shared entries (no boost).
    3. Merge both sets.
    4. Filter by ``min_relevance`` threshold on ``combined_score``.
    5. Sort descending by ``combined_score``.
    6. Truncate to ``max_memories``.

    Args:
        entries: Personal memory entries.
        config: Retrieval pipeline configuration.
        now: Current timestamp for recency calculations.
        shared_entries: Shared memory entries (no personal boost).

    Returns:
        Sorted and filtered tuple of ``ScoredMemory``.
    """
    scored = [
        _score_entry(entry, config=config, now=now, is_shared=False)
        for entry in entries
    ]
    scored.extend(
        _score_entry(entry, config=config, now=now, is_shared=True)
        for entry in shared_entries
    )

    filtered = [s for s in scored if s.combined_score >= config.min_relevance]
    filtered.sort(key=lambda s: s.combined_score, reverse=True)

    result = tuple(filtered[: config.max_memories])

    logger.debug(
        MEMORY_RANKING_COMPLETE,
        total_candidates=len(scored),
        after_filter=len(filtered),
        after_truncation=len(result),
        min_relevance=config.min_relevance,
        max_memories=config.max_memories,
    )

    return result

fuse_ranked_lists

fuse_ranked_lists(ranked_lists, *, k=_DEFAULT_K, max_results=_DEFAULT_MAX_RESULTS)

Merge multiple pre-ranked lists via Reciprocal Rank Fusion.

RRF_score(doc) = sum(1 / (k + rank_i)) across all lists containing the document. Scores are min-max normalized to [0.0, 1.0].

For RRF output, only combined_score is the meaningful ranking signal. relevance_score preserves the entry's raw backend relevance (or 0.0 if absent); recency_score is 0.0.

When the same entry ID appears in multiple lists, the first MemoryEntry object encountered is retained.

Unlike rank_memories, this function does not apply a min_relevance threshold -- callers are responsible for post-filtering if needed.

Parameters:

Name Type Description Default
ranked_lists tuple[tuple[MemoryEntry, ...], ...]

Each inner tuple is a pre-sorted ranked list of memory entries (best first).

required
k int

RRF smoothing constant (default 60, must be >= 1). Smaller values amplify rank differences.

_DEFAULT_K
max_results int

Maximum entries to return (must be >= 1).

_DEFAULT_MAX_RESULTS

Returns:

Type Description
tuple[ScoredMemory, ...]

Sorted tuple of ScoredMemory by descending RRF score.

Raises:

Type Description
ValueError

If k < 1 or max_results < 1.

Source code in src/synthorg/memory/ranking.py
def fuse_ranked_lists(
    ranked_lists: tuple[tuple[MemoryEntry, ...], ...],
    *,
    k: int = _DEFAULT_K,
    max_results: int = _DEFAULT_MAX_RESULTS,
) -> tuple[ScoredMemory, ...]:
    """Merge multiple pre-ranked lists via Reciprocal Rank Fusion.

    ``RRF_score(doc) = sum(1 / (k + rank_i))`` across all lists
    containing the document.  Scores are min-max normalized to
    [0.0, 1.0].

    For RRF output, only ``combined_score`` is the meaningful
    ranking signal.  ``relevance_score`` preserves the entry's raw
    backend relevance (or 0.0 if absent); ``recency_score`` is 0.0.

    When the same entry ID appears in multiple lists, the first
    ``MemoryEntry`` object encountered is retained.

    Unlike ``rank_memories``, this function does **not** apply a
    ``min_relevance`` threshold -- callers are responsible for
    post-filtering if needed.

    Args:
        ranked_lists: Each inner tuple is a pre-sorted ranked list
            of memory entries (best first).
        k: RRF smoothing constant (default 60, must be >= 1).
            Smaller values amplify rank differences.
        max_results: Maximum entries to return (must be >= 1).

    Returns:
        Sorted tuple of ``ScoredMemory`` by descending RRF score.

    Raises:
        ValueError: If ``k < 1`` or ``max_results < 1``.
    """
    if k < 1:
        msg = f"k must be >= 1, got {k}"
        logger.warning(MEMORY_RRF_VALIDATION_FAILED, param="k", value=k)
        raise ValueError(msg)
    if max_results < 1:
        msg = f"max_results must be >= 1, got {max_results}"
        logger.warning(
            MEMORY_RRF_VALIDATION_FAILED,
            param="max_results",
            value=max_results,
        )
        raise ValueError(msg)

    scores, entries, duplicate_count = _accumulate_rrf_scores(ranked_lists, k)

    if not entries:
        logger.info(
            MEMORY_RRF_FUSION_COMPLETE,
            num_lists=len(ranked_lists),
            unique_entries=0,
            after_truncation=0,
            duplicate_ids_skipped=duplicate_count,
            k=k,
        )
        return ()

    normalized = _normalize_rrf_scores(scores)
    scored_list = _build_rrf_scored_memories(entries, normalized)
    scored_list.sort(key=lambda s: s.combined_score, reverse=True)
    result = tuple(scored_list[:max_results])

    logger.info(
        MEMORY_RRF_FUSION_COMPLETE,
        num_lists=len(ranked_lists),
        unique_entries=len(entries),
        after_truncation=len(result),
        duplicate_ids_skipped=duplicate_count,
        k=k,
    )

    return result

bigram_jaccard

bigram_jaccard(text_a, text_b)

Word-bigram Jaccard similarity between two texts.

Returns 0.0 when either text has fewer than 2 words (no bigrams possible).

Parameters:

Name Type Description Default
text_a str

First text.

required
text_b str

Second text.

required

Returns:

Type Description
float

Similarity score between 0.0 and 1.0.

Source code in src/synthorg/memory/ranking.py
def bigram_jaccard(text_a: str, text_b: str) -> float:
    """Word-bigram Jaccard similarity between two texts.

    Returns 0.0 when either text has fewer than 2 words (no bigrams
    possible).

    Args:
        text_a: First text.
        text_b: Second text.

    Returns:
        Similarity score between 0.0 and 1.0.
    """
    bigrams_a = _word_bigrams(text_a)
    bigrams_b = _word_bigrams(text_b)
    if not bigrams_a or not bigrams_b:
        return 0.0
    intersection = len(bigrams_a & bigrams_b)
    union = len(bigrams_a | bigrams_b)
    return intersection / union

apply_diversity_penalty

apply_diversity_penalty(
    scored, *, diversity_lambda=_DEFAULT_DIVERSITY_LAMBDA, similarity_fn=None
)

Re-rank scored memories using Maximal Marginal Relevance.

Iteratively selects entries that balance relevance (via combined_score) with diversity (via pairwise dissimilarity to already-selected entries).

MMR score: lambda * combined_score - (1 - lambda) * max_sim

When similarity_fn is None (the default), the implementation pre-computes each entry's word bigrams once and computes Jaccard from the cached sets, avoiding O(n**2 * k) re-tokenization of already-selected content on each iteration.

Parameters:

Name Type Description Default
scored tuple[ScoredMemory, ...]

Pre-ranked scored memories.

required
diversity_lambda float

Trade-off between relevance (1.0) and diversity (0.0). Must be in [0.0, 1.0].

_DEFAULT_DIVERSITY_LAMBDA
similarity_fn Callable[[str, str], float] | None

Optional pairwise text similarity function. Defaults to bigram Jaccard (with precomputed bigram cache) when None.

None

Returns:

Type Description
tuple[ScoredMemory, ...]

Re-ordered tuple of the same length as scored.

Raises:

Type Description
ValueError

If diversity_lambda is outside [0.0, 1.0].

Source code in src/synthorg/memory/ranking.py
def apply_diversity_penalty(
    scored: tuple[ScoredMemory, ...],
    *,
    diversity_lambda: float = _DEFAULT_DIVERSITY_LAMBDA,
    similarity_fn: Callable[[str, str], float] | None = None,
) -> tuple[ScoredMemory, ...]:
    """Re-rank scored memories using Maximal Marginal Relevance.

    Iteratively selects entries that balance relevance (via
    ``combined_score``) with diversity (via pairwise dissimilarity
    to already-selected entries).

    MMR score: ``lambda * combined_score - (1 - lambda) * max_sim``

    When ``similarity_fn`` is ``None`` (the default), the implementation
    pre-computes each entry's word bigrams once and computes Jaccard
    from the cached sets, avoiding ``O(n**2 * k)`` re-tokenization of
    already-selected content on each iteration.

    Args:
        scored: Pre-ranked scored memories.
        diversity_lambda: Trade-off between relevance (1.0) and
            diversity (0.0).  Must be in [0.0, 1.0].
        similarity_fn: Optional pairwise text similarity function.
            Defaults to bigram Jaccard (with precomputed bigram cache)
            when ``None``.

    Returns:
        Re-ordered tuple of the same length as ``scored``.

    Raises:
        ValueError: If ``diversity_lambda`` is outside [0.0, 1.0].
    """
    if (
        not math.isfinite(diversity_lambda)
        or diversity_lambda < 0.0
        or diversity_lambda > 1.0
    ):
        msg = (
            f"diversity_lambda must be a finite float in [0.0, 1.0], "
            f"got {diversity_lambda}"
        )
        logger.warning(
            MEMORY_DIVERSITY_RERANK_FAILED,
            param="diversity_lambda",
            value=diversity_lambda,
            reason=msg,
        )
        raise ValueError(msg)

    if len(scored) <= 1:
        return scored

    if similarity_fn is None:
        return _mmr_rerank_bigram_cached(
            scored,
            diversity_lambda=diversity_lambda,
        )

    return _mmr_rerank_generic(
        scored,
        diversity_lambda=diversity_lambda,
        similarity_fn=similarity_fn,
    )

Filter

filter

Memory filter strategies for non-inferable principle enforcement.

Filters scored memories before injection into agent prompts. The TagBasedMemoryFilter (initial D23 implementation) retains only memories tagged with "non-inferable"; the PassthroughMemoryFilter is a no-op for backward compatibility and testing.

Both satisfy the MemoryFilterStrategy runtime-checkable protocol.

MemoryFilterStrategy

Bases: Protocol

Protocol for filtering scored memories before prompt injection.

strategy_name property

strategy_name

Human-readable name of the filter strategy.

filter_for_injection

filter_for_injection(memories)

Filter memories suitable for injection.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Ranked scored memories from the retrieval pipeline.

required

Returns:

Type Description
tuple[ScoredMemory, ...]

Subset of memories that pass the filter.

Source code in src/synthorg/memory/filter.py
def filter_for_injection(
    self,
    memories: tuple[ScoredMemory, ...],
) -> tuple[ScoredMemory, ...]:
    """Filter memories suitable for injection.

    Args:
        memories: Ranked scored memories from the retrieval pipeline.

    Returns:
        Subset of memories that pass the filter.
    """
    ...

TagBasedMemoryFilter

TagBasedMemoryFilter(required_tag=NON_INFERABLE_TAG)

Filter that retains only memories with a required tag.

The default required tag is "non-inferable" per D23. Memories whose entry.metadata.tags do not contain the required tag are excluded from prompt injection.

Parameters:

Name Type Description Default
required_tag str

Tag that must be present for a memory to pass.

NON_INFERABLE_TAG
Source code in src/synthorg/memory/filter.py
def __init__(self, required_tag: str = NON_INFERABLE_TAG) -> None:
    if not isinstance(required_tag, str) or not required_tag.strip():
        msg = "required_tag must be a non-empty string"
        raise ValueError(msg)
    self._required_tag = required_tag.strip()
    logger.debug(
        MEMORY_FILTER_INIT,
        strategy=self.strategy_name,
        required_tag=required_tag,
    )

strategy_name property

strategy_name

Human-readable name of the filter strategy.

Returns:

Type Description
str

"tag_based".

filter_for_injection

filter_for_injection(memories)

Return only memories containing the required tag.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Ranked scored memories.

required

Returns:

Type Description
tuple[ScoredMemory, ...]

Filtered tuple with only tagged memories.

Source code in src/synthorg/memory/filter.py
def filter_for_injection(
    self,
    memories: tuple[ScoredMemory, ...],
) -> tuple[ScoredMemory, ...]:
    """Return only memories containing the required tag.

    Args:
        memories: Ranked scored memories.

    Returns:
        Filtered tuple with only tagged memories.
    """
    retained = tuple(
        m for m in memories if self._required_tag in m.entry.metadata.tags
    )

    logger.info(
        MEMORY_FILTER_APPLIED,
        strategy=self.strategy_name,
        candidates=len(memories),
        retained=len(retained),
        required_tag=self._required_tag,
    )

    return retained

PassthroughMemoryFilter

No-op filter that returns all memories unchanged.

Useful for backward compatibility and testing -- all memories pass through without filtering.

strategy_name property

strategy_name

Human-readable name of the filter strategy.

Returns:

Type Description
str

"passthrough".

filter_for_injection

filter_for_injection(memories)

Return all memories unchanged.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Ranked scored memories.

required

Returns:

Type Description
tuple[ScoredMemory, ...]

The input tuple unchanged.

Source code in src/synthorg/memory/filter.py
def filter_for_injection(
    self,
    memories: tuple[ScoredMemory, ...],
) -> tuple[ScoredMemory, ...]:
    """Return all memories unchanged.

    Args:
        memories: Ranked scored memories.

    Returns:
        The input tuple unchanged.
    """
    logger.info(
        MEMORY_FILTER_APPLIED,
        strategy=self.strategy_name,
        candidates=len(memories),
        retained=len(memories),
    )
    return memories

Formatter

formatter

Memory context formatter: converts ranked memories to ChatMessages.

Handles token budget enforcement via greedy packing: iterates by rank, skips entries that exceed the remaining budget, and continues with smaller entries to maximise context within the token limit.

Each memory entry is wrapped under TAG_MEMORY_ENTRY via :func:wrap_untrusted so an attacker who plants a stored memory cannot break out of the fence and inject system-prompt-level instructions. Consumers that splice the formatted messages into an LLM call must append :func:untrusted_content_directive for TAG_MEMORY_ENTRY to their system prompt: :func:format_memory_context_with_directive is the canonical helper that bundles both steps.

format_memory_context_with_directive

format_memory_context_with_directive(
    memories, *, estimator, token_budget, injection_point=SYSTEM
)

Format memories with the untrusted-content directive prepended.

Calls the private :func:_format_memory_context and prepends a SYSTEM-role :class:ChatMessage carrying the untrusted-content directive for TAG_MEMORY_ENTRY. Returns an empty tuple when no memories fit.

This is the only public memory-formatter entry point: the directive is the contract that tells the model the memory blocks are data, not instructions.

Parameters:

Name Type Description Default
memories tuple[ScoredMemory, ...]

Pre-ranked memories (highest score first).

required
estimator TokenEstimator

Token estimation implementation.

required
token_budget int

Maximum tokens for the memory block.

required
injection_point InjectionPoint

Role for the memory message.

SYSTEM

Returns:

Type Description
ChatMessage

(directive_message, memory_message) on success, or empty

...

tuple when no memories fit (so the directive does not appear

tuple[ChatMessage, ...]

on its own).

Source code in src/synthorg/memory/formatter.py
def format_memory_context_with_directive(
    memories: tuple[ScoredMemory, ...],
    *,
    estimator: TokenEstimator,
    token_budget: int,
    injection_point: InjectionPoint = InjectionPoint.SYSTEM,
) -> tuple[ChatMessage, ...]:
    """Format memories with the untrusted-content directive prepended.

    Calls the private :func:`_format_memory_context` and prepends a
    SYSTEM-role :class:`ChatMessage` carrying the untrusted-content
    directive for ``TAG_MEMORY_ENTRY``. Returns an empty tuple when
    no memories fit.

    This is the only public memory-formatter entry point: the
    directive is the contract that tells the model the memory
    blocks are data, not instructions.

    Args:
        memories: Pre-ranked memories (highest score first).
        estimator: Token estimation implementation.
        token_budget: Maximum tokens for the memory block.
        injection_point: Role for the memory message.

    Returns:
        ``(directive_message, memory_message)`` on success, or empty
        tuple when no memories fit (so the directive does not appear
        on its own).
    """
    memory_messages = _format_memory_context(
        memories,
        estimator=estimator,
        token_budget=token_budget,
        injection_point=injection_point,
    )
    if not memory_messages:
        return ()
    directive = ChatMessage(
        role=MessageRole.SYSTEM,
        content=untrusted_content_directive((TAG_MEMORY_ENTRY,)),
    )
    return (directive, *memory_messages)

Injection

injection

Memory injection strategy protocol and supporting types.

Defines the pluggable MemoryInjectionStrategy protocol that controls how memories reach agents during execution. Three strategies are planned (context injection, tool-based, self-editing); this module provides the protocol and enums for all. ContextInjectionStrategy (in synthorg.memory.retriever) and ToolBasedInjectionStrategy (in synthorg.memory.tool_retriever) are implemented.

TokenEstimator is a local structural protocol that avoids a memory -> engine import cycle (PromptTokenEstimator lives in engine/prompt.py). Any object with estimate_tokens(str) -> int satisfies it automatically.

InjectionStrategy

Bases: StrEnum

Which injection strategy to use for surfacing memories.

Attributes:

Name Type Description
CONTEXT

Pre-execution context injection (implemented).

TOOL_BASED

On-demand via agent tools (implemented).

SELF_EDITING

Structured read/write memory blocks (implemented).

InjectionPoint

Bases: StrEnum

Role of the injected memory message.

Attributes:

Name Type Description
SYSTEM

Memory injected as a SYSTEM message (default).

USER

Memory injected as a USER message.

TokenEstimator

Bases: Protocol

Token estimation protocol (avoids memory → engine dependency).

Any object with estimate_tokens(str) -> int satisfies this protocol structurally.

estimate_tokens

estimate_tokens(text)

Estimate the number of tokens in text.

Implementations must return non-negative values.

Parameters:

Name Type Description Default
text str

The text to estimate tokens for.

required

Returns:

Type Description
int

Estimated token count (non-negative).

Source code in src/synthorg/memory/injection.py
def estimate_tokens(self, text: str) -> int:
    """Estimate the number of tokens in *text*.

    Implementations must return non-negative values.

    Args:
        text: The text to estimate tokens for.

    Returns:
        Estimated token count (non-negative).
    """
    ...

DefaultTokenEstimator

Heuristic token estimator: len(text) // 4.

Suitable for rough budget enforcement when a model-specific tokenizer is unavailable.

estimate_tokens

estimate_tokens(text)

Estimate tokens as max(1, len(text) // 4) for non-empty text.

Returns 0 for empty text, at least 1 for any non-empty text.

Parameters:

Name Type Description Default
text str

The text to estimate tokens for.

required

Returns:

Type Description
int

Estimated token count (non-negative).

Source code in src/synthorg/memory/injection.py
def estimate_tokens(self, text: str) -> int:
    """Estimate tokens as ``max(1, len(text) // 4)`` for non-empty text.

    Returns 0 for empty text, at least 1 for any non-empty text.

    Args:
        text: The text to estimate tokens for.

    Returns:
        Estimated token count (non-negative).
    """
    if not text:
        return 0
    return max(1, len(text) // 4)

MemoryInjectionStrategy

Bases: Protocol

Pluggable strategy for making memories available to agents.

Implementations determine how memories reach the agent:

  • Context injection: pre-execution message injection.
  • Tool-based: on-demand retrieval via agent tools.
  • Self-editing: structured read/write memory blocks.

strategy_name property

strategy_name

Human-readable strategy identifier.

Returns:

Type Description
str

Strategy name string.

prepare_messages async

prepare_messages(agent_id, query_text, token_budget)

Return memory messages to inject into agent context.

Context injection returns ranked, formatted memories. Tool-based may return empty (tools handle retrieval). Self-editing returns the core memory block.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent requesting memories.

required
query_text NotBlankStr

Text to use for semantic retrieval.

required
token_budget int

Maximum tokens for memory content.

required

Returns:

Type Description
tuple[ChatMessage, ...]

Tuple of ChatMessage instances (may be empty).

Source code in src/synthorg/memory/injection.py
async def prepare_messages(
    self,
    agent_id: NotBlankStr,
    query_text: NotBlankStr,
    token_budget: int,
) -> tuple[ChatMessage, ...]:
    """Return memory messages to inject into agent context.

    Context injection returns ranked, formatted memories.
    Tool-based may return empty (tools handle retrieval).
    Self-editing returns the core memory block.

    Args:
        agent_id: The agent requesting memories.
        query_text: Text to use for semantic retrieval.
        token_budget: Maximum tokens for memory content.

    Returns:
        Tuple of ``ChatMessage`` instances (may be empty).
    """
    ...

get_tool_definitions

get_tool_definitions()

Return tool definitions this strategy provides.

Context injection returns (). Tool-based returns recall/search tools. Self-editing returns read/write tools.

Returns:

Type Description
tuple[ToolDefinition, ...]

Tuple of ToolDefinition instances.

Source code in src/synthorg/memory/injection.py
def get_tool_definitions(self) -> tuple[ToolDefinition, ...]:
    """Return tool definitions this strategy provides.

    Context injection returns ``()``.  Tool-based returns
    recall/search tools.  Self-editing returns read/write tools.

    Returns:
        Tuple of ``ToolDefinition`` instances.
    """
    ...

Org Memory

protocol

OrgMemoryBackend protocol -- lifecycle + org memory operations.

Application code depends on this protocol for shared organizational memory storage and retrieval. Concrete backends implement this protocol to provide company-wide knowledge management.

OrgMemoryBackend

Bases: Protocol

Structural interface for organizational memory backends.

Provides company-wide knowledge storage, retrieval, and lifecycle management. All operations require a connected backend.

Attributes:

Name Type Description
is_connected bool

Whether the backend has an active connection.

backend_name NotBlankStr

Human-readable backend identifier.

is_connected property

is_connected

Whether the backend has an active connection.

backend_name property

backend_name

Human-readable backend identifier.

connect async

connect()

Establish connection to the org memory backend.

Raises:

Type Description
OrgMemoryConnectionError

If the connection fails.

Source code in src/synthorg/memory/org/protocol.py
async def connect(self) -> None:
    """Establish connection to the org memory backend.

    Raises:
        OrgMemoryConnectionError: If the connection fails.
    """
    ...

disconnect async

disconnect()

Close the org memory backend connection.

Safe to call even if not connected.

Source code in src/synthorg/memory/org/protocol.py
async def disconnect(self) -> None:
    """Close the org memory backend connection.

    Safe to call even if not connected.
    """
    ...

health_check async

health_check()

Check whether the backend is healthy and responsive.

Returns:

Type Description
bool

True if the backend is reachable and operational.

Source code in src/synthorg/memory/org/protocol.py
async def health_check(self) -> bool:
    """Check whether the backend is healthy and responsive.

    Returns:
        ``True`` if the backend is reachable and operational.
    """
    ...

query async

query(query)

Query organizational facts.

Parameters:

Name Type Description Default
query OrgMemoryQuery

Query parameters.

required

Returns:

Type Description
tuple[OrgFact, ...]

Matching facts ordered by relevance.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryQueryError

If the query fails.

Source code in src/synthorg/memory/org/protocol.py
async def query(self, query: OrgMemoryQuery) -> tuple[OrgFact, ...]:
    """Query organizational facts.

    Args:
        query: Query parameters.

    Returns:
        Matching facts ordered by relevance.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryQueryError: If the query fails.
    """
    ...

write async

write(request, *, author)

Write a new organizational fact.

Parameters:

Name Type Description Default
request OrgFactWriteRequest

Fact content and category.

required
author OrgFactAuthor

The author of the fact.

required

Returns:

Type Description
NotBlankStr

The assigned fact ID.

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

OrgMemoryAccessDeniedError

If write access is denied.

OrgMemoryWriteError

If the write operation fails.

Source code in src/synthorg/memory/org/protocol.py
async def write(
    self,
    request: OrgFactWriteRequest,
    *,
    author: OrgFactAuthor,
) -> NotBlankStr:
    """Write a new organizational fact.

    Args:
        request: Fact content and category.
        author: The author of the fact.

    Returns:
        The assigned fact ID.

    Raises:
        OrgMemoryConnectionError: If not connected.
        OrgMemoryAccessDeniedError: If write access is denied.
        OrgMemoryWriteError: If the write operation fails.
    """
    ...

list_policies async

list_policies(*, limit=None, offset=0)

List core policy facts, optionally paginated.

limit=None (the default) preserves the historical "return everything" contract for callers that pre-date pagination. When limit is set, the implementation MUST honour offset and slice the policy snapshot consistently with its intrinsic ordering (static config first, then dynamically written facts, in the reference impl).

Returns:

Type Description
tuple[OrgFact, ...]

Tuple of core policy facts (full or sliced view).

Raises:

Type Description
OrgMemoryConnectionError

If not connected.

Source code in src/synthorg/memory/org/protocol.py
async def list_policies(
    self,
    *,
    limit: int | None = None,
    offset: int = 0,
) -> tuple[OrgFact, ...]:
    """List core policy facts, optionally paginated.

    ``limit=None`` (the default) preserves the historical
    "return everything" contract for callers that pre-date
    pagination. When ``limit`` is set, the implementation MUST
    honour ``offset`` and slice the policy snapshot consistently
    with its intrinsic ordering (static config first, then
    dynamically written facts, in the reference impl).

    Returns:
        Tuple of core policy facts (full or sliced view).

    Raises:
        OrgMemoryConnectionError: If not connected.
    """
    ...

count_policies async

count_policies()

Return the unfiltered count of core policy facts.

Companion to :meth:list_policies for paginated controllers that need a total alongside the page.

Source code in src/synthorg/memory/org/protocol.py
async def count_policies(self) -> int:
    """Return the unfiltered count of core policy facts.

    Companion to :meth:`list_policies` for paginated controllers
    that need a total alongside the page.
    """
    ...

config

Org memory configuration models.

Frozen Pydantic models for organizational memory behaviour settings.

ExtendedStoreConfig pydantic-model

Bases: BaseModel

Configuration for the extended org facts store.

Attributes:

Name Type Description
max_retrieved_per_query int

Maximum facts to retrieve per query.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

max_retrieved_per_query pydantic-field

max_retrieved_per_query = 5

Maximum facts to retrieve per query

OrgMemoryConfig pydantic-model

Bases: BaseModel

Top-level organizational memory configuration.

Attributes:

Name Type Description
core_policies tuple[NotBlankStr, ...]

Core policy texts injected into system prompts.

extended_store ExtendedStoreConfig

Extended facts store configuration.

write_access WriteAccessConfig

Write access control configuration.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

core_policies pydantic-field

core_policies = ()

Core policy texts injected into system prompts

extended_store pydantic-field

extended_store

Extended facts store configuration

write_access pydantic-field

write_access

Write access control configuration

models

Org memory domain models.

Frozen Pydantic models for organizational facts -- shared company-wide knowledge such as policies, ADRs, procedures, and conventions.

Includes MVCC models for the append-only operation log and the materialized snapshot.

OrgFactAuthor pydantic-model

Bases: BaseModel

Author of an organizational fact.

If is_human is True, agent_id must be None. If is_human is False, agent_id and seniority are required; autonomy_level is optional (captures the instance-specific value at write time).

Attributes:

Name Type Description
agent_id NotBlankStr | None

Agent identifier (None for human authors).

seniority SeniorityLevel | None

Agent seniority level (None for human authors).

is_human bool

Whether the author is a human operator.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_author_consistency

agent_id pydantic-field

agent_id = None

Agent identifier (None for human authors)

seniority pydantic-field

seniority = None

Agent seniority level (None for human authors)

autonomy_level pydantic-field

autonomy_level = None

Agent autonomy level at write time (None for human authors)

is_human pydantic-field

is_human = False

Whether the author is a human operator

OrgFact pydantic-model

Bases: BaseModel

An organizational fact -- a piece of shared company-wide knowledge.

Attributes:

Name Type Description
id NotBlankStr

Unique identifier for this fact.

content NotBlankStr

The fact content text.

category OrgFactCategory

Category classification.

tags tuple[NotBlankStr, ...]

Metadata tags for cross-cutting concerns.

author OrgFactAuthor

Who created this fact.

created_at AwareDatetime

Creation timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

id pydantic-field

id

Unique fact identifier

content pydantic-field

content

Fact content text

category pydantic-field

category

Category classification

tags pydantic-field

tags = ()

Metadata tags for cross-cutting concerns

author pydantic-field

author

Who created this fact

created_at pydantic-field

created_at

Creation timestamp

OrgFactWriteRequest pydantic-model

Bases: BaseModel

Request to write a new organizational fact.

Attributes:

Name Type Description
content NotBlankStr

The fact content text.

category OrgFactCategory

Category classification.

tags tuple[NotBlankStr, ...]

Metadata tags for cross-cutting concerns.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

content pydantic-field

content

Fact content text

category pydantic-field

category

Category classification

tags pydantic-field

tags = ()

Metadata tags for cross-cutting concerns

OrgMemoryQuery pydantic-model

Bases: BaseModel

Query parameters for org memory retrieval.

Attributes:

Name Type Description
context NotBlankStr | None

Text search context (None for metadata-only).

categories frozenset[OrgFactCategory] | None

Filter by fact categories.

limit int

Maximum number of results.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

context pydantic-field

context = None

Text search context

categories pydantic-field

categories = None

Filter by fact categories

limit pydantic-field

limit = 5

Maximum number of results

OperationLogEntry pydantic-model

Bases: BaseModel

Single row in the append-only operation log.

Every publish or retract is recorded as an immutable log entry. The version counter is monotonically increasing per fact_id.

Attributes:

Name Type Description
operation_id NotBlankStr

Globally unique operation identifier.

fact_id NotBlankStr

Logical fact identifier.

operation_type Literal['PUBLISH', 'RETRACT']

PUBLISH or RETRACT.

content NotBlankStr | None

Fact body (None for RETRACT operations).

category OrgFactCategory | None

Fact category at time of operation.

tags tuple[NotBlankStr, ...]

Metadata tags at time of operation.

author_agent_id NotBlankStr | None

Agent that performed the operation (None for human authors).

author_seniority SeniorityLevel | None

Agent seniority level at write time.

author_is_human bool

Whether the author is a human operator.

author_autonomy_level AutonomyLevel | None

Agent autonomy level at write time.

timestamp AwareDatetime

UTC timestamp of the operation.

version int

Per-fact version counter (starts at 1).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_content_alignment

operation_id pydantic-field

operation_id

Globally unique operation identifier

fact_id pydantic-field

fact_id

Logical fact identifier

operation_type pydantic-field

operation_type

Operation type

content pydantic-field

content = None

Fact body (None for RETRACT)

category pydantic-field

category = None

Fact category at time of operation

tags pydantic-field

tags = ()

Metadata tags at time of operation

author_agent_id pydantic-field

author_agent_id = None

Agent that performed the operation

author_seniority pydantic-field

author_seniority = None

Agent seniority level at write time

author_is_human pydantic-field

author_is_human = False

Whether the author is a human operator

author_autonomy_level pydantic-field

author_autonomy_level = None

Agent autonomy level at write time

timestamp pydantic-field

timestamp

UTC timestamp

version pydantic-field

version

Per-fact version counter

OperationLogSnapshot pydantic-model

Bases: BaseModel

Materialized snapshot row for current committed state.

Represents the state of a single fact at a point in time. Active facts have retracted_at=None.

Attributes:

Name Type Description
fact_id NotBlankStr

Logical fact identifier (primary key).

content NotBlankStr

Current fact body.

category OrgFactCategory

Fact category.

tags tuple[NotBlankStr, ...]

Current metadata tags.

created_at AwareDatetime

Timestamp of first PUBLISH.

retracted_at AwareDatetime | None

Timestamp of retraction (None = active).

version int

Version matching most recent operation log entry.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_created_before_retracted

fact_id pydantic-field

fact_id

Logical fact identifier

content pydantic-field

content

Current fact body

category pydantic-field

category

Fact category

tags pydantic-field

tags = ()

Current metadata tags

created_at pydantic-field

created_at

Timestamp of first PUBLISH

retracted_at pydantic-field

retracted_at = None

Retraction timestamp (None = active)

version pydantic-field

version

Most recent operation version

access_control

Write access control for organizational memory.

Provides seniority-based and human-based write restriction models, configuration, and enforcement functions.

CategoryWriteRule pydantic-model

Bases: BaseModel

Write permission rule for a single fact category.

Attributes:

Name Type Description
allowed_seniority SeniorityLevel | None

Minimum seniority level for agent writes (None means only humans can write).

human_allowed bool

Whether human operators can write.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

allowed_seniority pydantic-field

allowed_seniority = None

Minimum seniority level for agent writes (None = human-only)

human_allowed pydantic-field

human_allowed = True

Whether human operators can write

WriteAccessConfig pydantic-model

Bases: BaseModel

Write access configuration for all fact categories.

The runtime type of rules is :class:types.MappingProxyType, expressed in the annotation as :class:collections.abc.Mapping so callers see an immutable interface at the type boundary instead of a freely mutable dict.

Attributes:

Name Type Description
rules Mapping[OrgFactCategory, CategoryWriteRule]

Per-category write rules (read-only mapping).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _wrap_rules_readonly

rules pydantic-field

rules

Per-category write rules

check_write_access

check_write_access(config, category, author)

Check whether the given author may write to the given category.

Parameters:

Name Type Description Default
config WriteAccessConfig

Write access configuration.

required
category OrgFactCategory

Target fact category.

required
author OrgFactAuthor

The author attempting the write.

required

Returns:

Type Description
bool

True if write is permitted, False otherwise.

Source code in src/synthorg/memory/org/access_control.py
def check_write_access(
    config: WriteAccessConfig,
    category: OrgFactCategory,
    author: OrgFactAuthor,
) -> bool:
    """Check whether the given author may write to the given category.

    Args:
        config: Write access configuration.
        category: Target fact category.
        author: The author attempting the write.

    Returns:
        ``True`` if write is permitted, ``False`` otherwise.
    """
    # Fail closed: if a category has no explicit rule, deny all writes.
    rule = config.rules.get(
        category,
        CategoryWriteRule(allowed_seniority=None, human_allowed=False),
    )

    if author.is_human:
        return rule.human_allowed

    if rule.allowed_seniority is None:
        return False

    if author.seniority is None:
        return False

    return compare_seniority(author.seniority, rule.allowed_seniority) >= 0

require_write_access

require_write_access(config, category, author)

Check write access and raise if denied.

Parameters:

Name Type Description Default
config WriteAccessConfig

Write access configuration.

required
category OrgFactCategory

Target fact category.

required
author OrgFactAuthor

The author attempting the write.

required

Raises:

Type Description
OrgMemoryAccessDeniedError

If write is not permitted.

Source code in src/synthorg/memory/org/access_control.py
def require_write_access(
    config: WriteAccessConfig,
    category: OrgFactCategory,
    author: OrgFactAuthor,
) -> None:
    """Check write access and raise if denied.

    Args:
        config: Write access configuration.
        category: Target fact category.
        author: The author attempting the write.

    Raises:
        OrgMemoryAccessDeniedError: If write is not permitted.
    """
    if not check_write_access(config, category, author):
        author_desc = (
            "human"
            if author.is_human
            else f"agent {author.agent_id} ({author.seniority})"
        )
        msg = (
            f"Write access denied: {author_desc} cannot write "
            f"to category {category.value!r}"
        )
        logger.warning(
            ORG_MEMORY_WRITE_DENIED,
            category=category.value,
            author_is_human=author.is_human,
            author_agent_id=author.agent_id,
            author_seniority=str(author.seniority) if author.seniority else None,
            reason=msg,
        )
        raise OrgMemoryAccessDeniedError(msg)

Consolidation

config

Memory consolidation configuration models.

Frozen Pydantic models for consolidation interval, retention, archival, LLM consolidation strategy, experience compressor, and wiki export settings.

ConsolidationStrategyType

Bases: StrEnum

Discriminator selecting the consolidation composite to build.

Each value maps (in :mod:synthorg.memory.consolidation.factory) to a Composite(HighestRelevanceSelector, <op>):

  • SIMPLE -> ConcatenationOp (truncated-bullet summary).
  • DUAL_MODE -> DensityRoutingOp (density-routed extractive / abstractive).
  • LLM -> LLMSynthesisOp (LLM synthesis, parallel groups).

RetentionConfig pydantic-model

Bases: BaseModel

Per-category retention configuration (company-level defaults).

These rules apply as the baseline for all agents. Individual agents can override specific categories via :attr:~synthorg.core.agent.MemoryConfig.retention_overrides.

Resolution order per category (highest priority first):

  1. Agent per-category override
  2. Company per-category rule (this config)
  3. Agent global default (MemoryConfig.retention_days)
  4. Company global default (default_retention_days)
  5. Keep forever (no expiry)

Attributes:

Name Type Description
rules tuple[RetentionRule, ...]

Per-category retention rules (unique categories).

default_retention_days int | None

Default retention in days (None = keep forever).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_unique_categories

rules pydantic-field

rules = ()

Per-category retention rules

default_retention_days pydantic-field

default_retention_days = None

Default retention in days (None = forever)

DualModeConfig pydantic-model

Bases: BaseModel

Configuration for dual-mode archival.

Controls density-aware archival: LLM abstractive summaries for sparse/conversational content vs extractive preservation (verbatim key facts + start/mid/end anchors) for dense/factual content.

Attributes:

Name Type Description
enabled bool

Whether dual-mode density classification is active. When False, the dual-mode strategy is not used.

dense_threshold float

Density score threshold for DENSE classification (0.0 = classify everything as dense, 1.0 = everything sparse).

summarization_model NotBlankStr | None

Model ID for abstractive summarization.

max_summary_tokens int

Maximum tokens for LLM summary responses.

max_facts int

Maximum number of extracted key facts for extractive mode.

anchor_length int

Character length for each extractive anchor snippet (start/mid/end).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_model_when_enabled

enabled pydantic-field

enabled = False

Whether dual-mode density classification is active

dense_threshold pydantic-field

dense_threshold = 0.5

Density score threshold for DENSE classification

summarization_model pydantic-field

summarization_model = None

Model ID for abstractive summarization

max_summary_tokens pydantic-field

max_summary_tokens = 200

Maximum tokens for LLM summary responses

max_facts pydantic-field

max_facts = 20

Maximum extracted key facts for extractive mode

anchor_length pydantic-field

anchor_length = 150

Character length for each extractive anchor

ArchivalConfig pydantic-model

Bases: BaseModel

Archival configuration.

Attributes:

Name Type Description
enabled bool

Whether archival is enabled.

age_threshold_days int

Minimum age in days before archival.

dual_mode DualModeConfig

Dual-mode archival configuration.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

enabled pydantic-field

enabled = False

Whether archival is enabled

age_threshold_days pydantic-field

age_threshold_days = 90

Minimum age in days before archival

dual_mode pydantic-field

dual_mode

Dual-mode archival configuration

ExperienceCompressorConfig pydantic-model

Bases: BaseModel

Configuration for the GEMS two-tier experience compressor.

Controls whether raw execution traces (DetailedExperience) are compressed into strategic learnings (CompressedExperience).

Attributes:

Name Type Description
enabled bool

Whether two-tier compression is active.

model NotBlankStr | None

Model identifier for the compressor LLM call (None = use medium-tier default).

temperature float

Sampling temperature for compression.

max_tokens int

Token budget for the compressor response.

min_compression_ratio float

Discard compressions with a ratio below this threshold (0.0 = keep all, closer to 1.0 = stricter).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

enabled pydantic-field

enabled = False

Whether two-tier compression is active

model pydantic-field

model = None

Model identifier for the compressor LLM call (None = use medium-tier default)

temperature pydantic-field

temperature = 0.3

Sampling temperature for compression

max_tokens pydantic-field

max_tokens = 1000

Token budget for the compressor response

min_compression_ratio pydantic-field

min_compression_ratio = 0.0

Discard compressions with a ratio below this threshold (0.0 = keep all)

WikiExportConfig pydantic-model

Bases: BaseModel

Configuration for post-consolidation wiki filesystem export.

Three-view export: raw/ (Tier 1 raw artifacts), wiki/ (Tier 2 compressed experiences), and index.md (navigation).

Attributes:

Name Type Description
enabled bool

Whether wiki export is enabled.

export_root NotBlankStr

Root directory for the wiki filesystem export.

trigger Literal['on_consolidation', 'manual']

When to trigger export ("on_consolidation" or "manual").

include_raw_tier bool

Export Tier 1 (DetailedExperience) to raw/ view.

include_compressed_tier bool

Export Tier 2 (CompressedExperience) to wiki/ view.

max_entries_per_view int | None

Maximum entries per view (None = all).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _reject_traversal

enabled pydantic-field

enabled = False

Whether wiki export is enabled

export_root pydantic-field

export_root = '/data/wiki'

Root directory for the wiki filesystem export

trigger pydantic-field

trigger = 'manual'

When to trigger export

include_raw_tier pydantic-field

include_raw_tier = True

Export Tier 1 raw artifacts to raw/ view

include_compressed_tier pydantic-field

include_compressed_tier = True

Export Tier 2 compressed experiences to wiki/ view

max_entries_per_view pydantic-field

max_entries_per_view = None

Maximum entries per view. None means 'use the backend's maximum page size' (MemoryQuery.limit is capped at 1000 by schema). Multi-page exports for collections larger than 1000 are not yet supported.

ConsolidationConfig pydantic-model

Bases: BaseModel

Top-level memory consolidation configuration.

Attributes:

Name Type Description
enabled bool

Master kill switch for memory consolidation. When False the consolidation scheduler is constructed but every tick short-circuits -- operator-safe way to pause consolidation without tearing down lifecycle plumbing.

interval ConsolidationInterval

How often to run consolidation.

max_memories_per_agent int

Upper bound on memories per agent.

retention RetentionConfig

Per-category retention settings.

archival ArchivalConfig

Archival settings.

experience_compressor ExperienceCompressorConfig

GEMS two-tier compressor settings.

wiki_export WikiExportConfig

Wiki filesystem export settings.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors

enabled pydantic-field

enabled = True

Master kill switch for memory consolidation. When False every consolidation tick short-circuits immediately.

interval pydantic-field

interval = DAILY

How often to run consolidation

max_memories_per_agent pydantic-field

max_memories_per_agent = 10000

Upper bound on memories per agent

retention pydantic-field

retention

Per-category retention settings

archival pydantic-field

archival

Archival settings

experience_compressor pydantic-field

experience_compressor

GEMS two-tier experience compressor settings

wiki_export pydantic-field

wiki_export

Wiki filesystem export settings

LLMConsolidationConfig pydantic-model

Bases: BaseModel

Configuration for the LLM-based consolidation strategy.

Encapsulates all tuning knobs previously passed as loose kwargs to LLMConsolidationStrategy.__init__ and module-level constants. Aligns with the frozen Pydantic config convention used by sibling strategies (DualModeConfig, RetentionConfig).

Attributes:

Name Type Description
group_threshold int

Minimum category group size for consolidation. At threshold 3, _select_entries keeps one entry and _synthesize receives two -- the smallest input for a meaningful LLM merge.

temperature float

Sampling temperature for the synthesis LLM call.

max_summary_tokens int

Maximum tokens for the synthesis response.

include_distillation_context bool

When True, fetches recent distillation entries as trajectory context for the synthesis prompt.

max_trajectory_context_entries int

Maximum distillation entries to include as trajectory context.

max_trajectory_chars_per_entry int

Character limit per trajectory snippet in the synthesis prompt.

max_entry_input_chars int

Per-entry content character limit before being sent to the LLM.

max_total_user_content_chars int

Total character cap for the concatenated user prompt sent to the LLM.

fallback_truncate_length int

Per-entry truncation limit in concatenation-fallback summaries.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_entry_vs_total_chars

group_threshold pydantic-field

group_threshold = 3

Minimum category group size for consolidation (must be >= 3)

temperature pydantic-field

temperature = 0.3

Sampling temperature for the synthesis LLM call

max_summary_tokens pydantic-field

max_summary_tokens = 500

Maximum tokens for the synthesis response

include_distillation_context pydantic-field

include_distillation_context = True

When True, fetch recent distillation entries as trajectory context for the synthesis prompt

max_trajectory_context_entries pydantic-field

max_trajectory_context_entries = 5

Maximum distillation entries to include as trajectory context

max_trajectory_chars_per_entry pydantic-field

max_trajectory_chars_per_entry = 500

Character limit per trajectory snippet in the synthesis prompt

max_entry_input_chars pydantic-field

max_entry_input_chars = 2000

Per-entry content character limit before being sent to the LLM

max_total_user_content_chars pydantic-field

max_total_user_content_chars = 20000

Total character cap for the concatenated user prompt sent to the LLM

fallback_truncate_length pydantic-field

fallback_truncate_length = 200

Per-entry truncation limit in concatenation-fallback summaries

models

Memory consolidation domain models.

Frozen Pydantic models for consolidation results, archival entries, retention rules, dual-mode archival types, and GEMS two-tier compressed/detailed experience models.

ArchivalMode

Bases: StrEnum

How a memory entry was archived during consolidation.

Determines the preservation strategy applied before archival.

ABSTRACTIVE class-attribute instance-attribute

ABSTRACTIVE = 'abstractive'

LLM-generated summary for sparse/conversational content.

EXTRACTIVE class-attribute instance-attribute

EXTRACTIVE = 'extractive'

Verbatim key-fact extraction for dense/factual content.

ArchivalModeAssignment pydantic-model

Bases: BaseModel

Maps a removed memory entry to the archival mode applied.

Attributes:

Name Type Description
original_id NotBlankStr

ID of the removed memory entry.

mode ArchivalMode

Archival mode applied to this entry.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

original_id pydantic-field

original_id

ID of the removed memory entry

mode pydantic-field

mode

Archival mode applied to this entry

ArchivalIndexEntry pydantic-model

Bases: BaseModel

Maps a removed memory entry to its archival store ID.

Enables deterministic index-based restore: agents can look up their own archived entries by original ID without semantic search.

Attributes:

Name Type Description
original_id NotBlankStr

ID of the original memory entry.

archival_id NotBlankStr

ID assigned by the archival store.

mode ArchivalMode

Archival mode used for this entry.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

original_id pydantic-field

original_id

ID of the original memory entry

archival_id pydantic-field

archival_id

ID assigned by the archival store

mode pydantic-field

mode

Archival mode used for this entry

ConsolidationResult pydantic-model

Bases: BaseModel

Result of a memory consolidation run.

Attributes:

Name Type Description
removed_ids tuple[NotBlankStr, ...]

IDs of removed memory entries.

summary_ids tuple[NotBlankStr, ...]

IDs of summary entries created during the run. Strategies that produce a single summary populate a one-element tuple; strategies that produce per-group summaries (e.g. LLMConsolidationStrategy) populate one entry per group so callers see every summary, not just the last one. Callers that previously passed a scalar summary_id= keyword (now a derived @computed_field) will hit a hard ValidationError because the model uses extra='forbid' -- no silent data loss.

summary_id NotBlankStr | None

Derived from summary_ids[-1] when any summary was produced, otherwise None. Kept as a @computed_field so callers that only need a single representative id keep working.

archived_count int

Number of entries archived.

consolidated_count int

Derived from len(removed_ids).

mode_assignments tuple[ArchivalModeAssignment, ...]

Per-entry archival mode assignments (set by strategy, empty for strategies that don't classify density).

archival_index tuple[ArchivalIndexEntry, ...]

Maps original memory IDs to archival store IDs (built by service after archival completes).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_archival_consistency

removed_ids pydantic-field

removed_ids = ()

IDs of removed memory entries

summary_ids pydantic-field

summary_ids = ()

IDs of every summary entry produced during the run

archived_count pydantic-field

archived_count = 0

Number of entries archived

mode_assignments pydantic-field

mode_assignments = ()

Per-entry archival mode assignments

archival_index pydantic-field

archival_index = ()

Original-to-archival ID mapping

consolidated_count property

consolidated_count

Number of memories consolidated (derived from removed_ids).

summary_id property

summary_id

Representative summary id (last one produced, or None).

Derived from summary_ids. Callers that need every summary (e.g. multi-category LLMConsolidationStrategy runs) should read summary_ids directly.

Exposed as a plain @property (not @computed_field) so it is NOT emitted by model_dump(). Otherwise the serialized payload would include summary_id and a round-trip through model_validate(result.model_dump()) would fail against the extra='forbid' guard -- a nasty surprise for any persistence or copy-through-JSON path.

ArchivalEntry pydantic-model

Bases: BaseModel

An archived memory entry.

Attributes:

Name Type Description
original_id NotBlankStr

ID from the hot store.

agent_id NotBlankStr

Owning agent identifier.

content NotBlankStr

Memory content text.

category MemoryCategory

Memory type category.

metadata MemoryMetadata

Associated metadata.

created_at AwareDatetime

Original creation timestamp.

archived_at AwareDatetime

When this entry was archived.

archival_mode ArchivalMode

How this entry was archived.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_temporal_order

original_id pydantic-field

original_id

ID from the hot store

agent_id pydantic-field

agent_id

Owning agent identifier

content pydantic-field

content

Memory content text

category pydantic-field

category

Memory type category

metadata pydantic-field

metadata

Associated metadata

created_at pydantic-field

created_at

Original creation timestamp

archived_at pydantic-field

archived_at

When this entry was archived

archival_mode pydantic-field

archival_mode

Archival mode used for this entry

RetentionRule pydantic-model

Bases: BaseModel

Per-category retention rule.

Attributes:

Name Type Description
category MemoryCategory

Memory category this rule applies to.

retention_days int

Number of days to retain memories.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

category pydantic-field

category

Memory category this rule applies to

retention_days pydantic-field

retention_days

Number of days to retain memories

ContentDensity

Bases: StrEnum

Content density classification for archival mode selection.

Attributes:

Name Type Description
SPARSE

Conversational, narrative, low information density.

DENSE

Code, structured data, identifiers, high density.

DetailedExperience pydantic-model

Bases: BaseModel

Tier 1 raw artifact -- append-only execution trace.

Stores the complete raw execution context for a single turn or task. Retrieved by entry.id only (audit/debug), NOT used for context injection.

Attributes:

Name Type Description
id NotBlankStr

Unique identifier.

agent_id NotBlankStr

Owning agent identifier.

prompt NotBlankStr

Raw prompt sent to the agent.

output NotBlankStr

Raw output produced by the agent.

verification_feedback NotBlankStr | None

Verification result text (None when no verification was performed).

reasoning_trace tuple[NotBlankStr, ...]

Step-by-step reasoning trace entries.

metadata MemoryMetadata

Associated metadata (tags include "detailed_experience").

created_at AwareDatetime

When the experience was captured.

source_task_id NotBlankStr | None

Optional originating task identifier.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_tier_tag

id pydantic-field

id

Unique identifier

agent_id pydantic-field

agent_id

Owning agent identifier

prompt pydantic-field

prompt

Raw prompt sent to the agent

output pydantic-field

output

Raw output produced by the agent

verification_feedback pydantic-field

verification_feedback = None

Verification result text

reasoning_trace pydantic-field

reasoning_trace = ()

Step-by-step reasoning trace entries

metadata pydantic-field

metadata

Associated metadata

created_at pydantic-field

created_at

When the experience was captured

source_task_id pydantic-field

source_task_id = None

Optional originating task identifier

CompressedExperience pydantic-model

Bases: BaseModel

Tier 2 compressed learning -- retrieval-primary.

Distilled from one or more DetailedExperience entries by the ExperienceCompressor. These are the entries agents see during context injection -- raw traces are NOT injected.

GEMS ablation shows compressed experiences outperform raw reasoning traces by 2.5x (GEMS section 4.3).

Attributes:

Name Type Description
id NotBlankStr

Unique identifier.

agent_id NotBlankStr

Owning agent identifier.

strategic_decisions tuple[NotBlankStr, ...]

Distilled "what worked, what didn't" learnings.

applicable_contexts tuple[NotBlankStr, ...]

When and where this experience applies.

source_artifact_ids tuple[NotBlankStr, ...]

Links back to DetailedExperience IDs.

compression_ratio float

compressed_len / raw_len (0.0-1.0).

compressor_version NotBlankStr

Version stamp for the compressor that produced this entry.

metadata MemoryMetadata

Associated metadata (tags include "compressed_experience").

created_at AwareDatetime

When the compression was performed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_non_empty

id pydantic-field

id

Unique identifier

agent_id pydantic-field

agent_id

Owning agent identifier

strategic_decisions pydantic-field

strategic_decisions

Distilled strategic learnings

applicable_contexts pydantic-field

applicable_contexts = ()

When and where this experience applies

source_artifact_ids pydantic-field

source_artifact_ids

Links back to DetailedExperience IDs (must contain at least one entry -- provenance required)

compression_ratio pydantic-field

compression_ratio

compressed_len / raw_len

compressor_version pydantic-field

compressor_version

Compressor version stamp

metadata pydantic-field

metadata

Associated metadata

created_at pydantic-field

created_at

When the compression was performed

strategy

Consolidation strategy protocol.

Defines the interface for memory consolidation algorithms that compress and summarize older memories.

ConsolidationStrategy

Bases: Protocol

Protocol for memory consolidation strategies.

Implementations receive a batch of memory entries and produce a ConsolidationResult indicating which entries were merged, removed, or summarized.

consolidate async

consolidate(entries, *, agent_id)

Consolidate a batch of memory entries.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to consolidate.

required
agent_id NotBlankStr

Owning agent identifier.

required

Returns:

Type Description
ConsolidationResult

Result describing what was consolidated.

Source code in src/synthorg/memory/consolidation/strategy.py
async def consolidate(
    self,
    entries: tuple[MemoryEntry, ...],
    *,
    agent_id: NotBlankStr,
) -> ConsolidationResult:
    """Consolidate a batch of memory entries.

    Args:
        entries: Memory entries to consolidate.
        agent_id: Owning agent identifier.

    Returns:
        Result describing what was consolidated.
    """
    ...

service

Memory consolidation service.

Orchestrates retention cleanup, consolidation, archival, and max-memories enforcement into a single maintenance entry point.

MemoryConsolidationService

MemoryConsolidationService(
    *,
    backend,
    config,
    strategy=None,
    archival_store=None,
    max_enforce_batch=_MAX_ENFORCE_BATCH,
    config_resolver=None,
)

Orchestrates memory consolidation, retention, and archival.

Parameters:

Name Type Description Default
backend MemoryBackend

Memory backend for CRUD operations.

required
config ConsolidationConfig

Consolidation configuration.

required
strategy ConsolidationStrategy | None

Optional consolidation strategy (skips consolidation step if None).

None
archival_store ArchivalStore | None

Optional archival store (skips archival if None or disabled in config).

None
max_enforce_batch int

Maximum memories to enforce retention on per invocation. Must match the bounds of the memory.consolidation_enforce_batch_size setting (MemoryBridgeConfig): between 100 and 10000 inclusive. Defaults to the module constant so the service still works standalone when the settings layer is not wired in.

_MAX_ENFORCE_BATCH
Source code in src/synthorg/memory/consolidation/service.py
def __init__(  # noqa: PLR0913
    self,
    *,
    backend: MemoryBackend,
    config: ConsolidationConfig,
    strategy: ConsolidationStrategy | None = None,
    archival_store: ArchivalStore | None = None,
    max_enforce_batch: int = _MAX_ENFORCE_BATCH,
    config_resolver: ConfigResolver | None = None,
) -> None:
    if not (_MAX_ENFORCE_BATCH_MIN <= max_enforce_batch <= _MAX_ENFORCE_BATCH_MAX):
        msg = (
            "max_enforce_batch must be between "
            f"{_MAX_ENFORCE_BATCH_MIN} and {_MAX_ENFORCE_BATCH_MAX}, "
            f"got {max_enforce_batch}"
        )
        raise ValueError(msg)
    self._backend = backend
    self._config = config
    self._strategy = strategy
    self._archival_store = archival_store
    self._max_enforce_batch = max_enforce_batch
    # Resolver gate for the runtime ``memory.consolidation_enabled``
    # kill switch.  When wired, ``run_consolidation`` reads the
    # current value per cycle so an operator can pause without
    # restart; when None (test harness, standalone construction),
    # we fall back to the YAML-baked ``config.enabled`` field.
    self._config_resolver = config_resolver
    self._retention = RetentionEnforcer(
        config=config.retention,
        backend=backend,
    )

run_consolidation async

run_consolidation(agent_id)

Run memory consolidation for an agent.

Retrieves up to 1000 entries per invocation and applies the consolidation strategy, then archives removed entries if archival is configured and enabled. Per-entry archival failures are logged and skipped -- they do not abort the entire batch.

Gated by the memory.consolidation_enabled kill-switch: when False the call short-circuits immediately without touching the backend or strategy, so operators can pause consolidation mid-flight without tearing down scheduling. The flag is resolved per call when a ConfigResolver is wired (runtime-controllable); without a resolver we fall back to the YAML-baked config.enabled field.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose memories to consolidate.

required

Returns:

Type Description
ConsolidationResult

Consolidation result (including archival count).

Source code in src/synthorg/memory/consolidation/service.py
async def run_consolidation(
    self,
    agent_id: NotBlankStr,
) -> ConsolidationResult:
    """Run memory consolidation for an agent.

    Retrieves up to 1000 entries per invocation and applies the
    consolidation strategy, then archives removed entries if archival
    is configured and enabled.  Per-entry archival failures are
    logged and skipped -- they do not abort the entire batch.

    Gated by the ``memory.consolidation_enabled`` kill-switch:
    when ``False`` the call short-circuits immediately without
    touching the backend or strategy, so operators can pause
    consolidation mid-flight without tearing down scheduling.
    The flag is resolved per call when a ``ConfigResolver`` is
    wired (runtime-controllable); without a resolver we fall back
    to the YAML-baked ``config.enabled`` field.

    Args:
        agent_id: Agent whose memories to consolidate.

    Returns:
        Consolidation result (including archival count).
    """
    enabled = await resolve_bool_with_fallback(
        resolver=self._config_resolver,
        namespace="memory",
        key="consolidation_enabled",
        fallback=self._config.enabled,
    )
    if not enabled:
        logger.info(
            CONSOLIDATION_SKIPPED,
            agent_id=agent_id,
            reason="disabled_by_setting",
        )
        return ConsolidationResult()
    return await self._run_consolidation_unchecked(agent_id)

enforce_max_memories async

enforce_max_memories(agent_id)

Enforce the maximum memories limit for an agent.

Retrieves excess entries in batches (up to 1000 per query, the MemoryQuery.limit cap) and deletes them. The entries selected for deletion depend on the backend's default query ordering -- typically oldest-first, but consult the concrete backend for specifics.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to check.

required

Returns:

Type Description
int

Number of entries deleted.

Source code in src/synthorg/memory/consolidation/service.py
async def enforce_max_memories(
    self,
    agent_id: NotBlankStr,
) -> int:
    """Enforce the maximum memories limit for an agent.

    Retrieves excess entries in batches (up to 1000 per query,
    the ``MemoryQuery.limit`` cap) and deletes them.  The entries
    selected for deletion depend on the backend's default query
    ordering -- typically oldest-first, but consult the concrete
    backend for specifics.

    Args:
        agent_id: Agent to check.

    Returns:
        Number of entries deleted.
    """
    try:
        total = await self._backend.count(agent_id)
        excess = total - self._config.max_memories_per_agent

        if excess <= 0:
            return 0

        deleted = 0
        remaining = excess
        while remaining > 0:
            batch_size = min(remaining, self._max_enforce_batch)
            # Cap the per-fetch size to MemoryQuery's own upper
            # bound; ``self._max_enforce_batch`` can exceed it
            # (up to 10k) and the query would then fail
            # validation before hitting the backend.
            effective_limit = min(batch_size, _MEMORY_QUERY_MAX_LIMIT)
            query = MemoryQuery(limit=effective_limit)
            entries = await self._backend.retrieve(agent_id, query)
            if not entries:
                break
            for entry in entries:
                if await self._backend.delete(agent_id, entry.id):
                    deleted += 1
            remaining -= len(entries)
    except Exception as exc:
        logger.warning(
            MAX_MEMORIES_ENFORCE_FAILED,
            agent_id=agent_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise
    else:
        logger.info(
            MAX_MEMORIES_ENFORCED,
            agent_id=agent_id,
            total_before=total,
            deleted=deleted,
        )
        return deleted

cleanup_retention async

cleanup_retention(
    agent_id, *, agent_category_overrides=None, agent_default_retention_days=None
)

Run retention cleanup for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose expired memories to clean up.

required
agent_category_overrides Mapping[MemoryCategory, int] | None

Per-category retention overrides for this agent.

None
agent_default_retention_days int | None

Agent-level default retention in days.

None

Returns:

Type Description
int

Number of expired memories deleted.

Source code in src/synthorg/memory/consolidation/service.py
async def cleanup_retention(
    self,
    agent_id: NotBlankStr,
    *,
    agent_category_overrides: Mapping[MemoryCategory, int] | None = None,
    agent_default_retention_days: int | None = None,
) -> int:
    """Run retention cleanup for an agent.

    Args:
        agent_id: Agent whose expired memories to clean up.
        agent_category_overrides: Per-category retention overrides
            for this agent.
        agent_default_retention_days: Agent-level default retention
            in days.

    Returns:
        Number of expired memories deleted.
    """
    return await self._retention.cleanup_expired(
        agent_id,
        agent_category_overrides=agent_category_overrides,
        agent_default_retention_days=agent_default_retention_days,
    )

run_maintenance async

run_maintenance(
    agent_id, *, agent_category_overrides=None, agent_default_retention_days=None
)

Run full maintenance cycle for an agent.

Orchestrates: retention cleanup -> consolidation -> max enforcement. Gated by the memory.consolidation_enabled kill-switch: when False the whole cycle short-circuits (retention, consolidation, and max-memory enforcement all pause together so the scheduler can be resumed without partial state).

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to maintain.

required
agent_category_overrides Mapping[MemoryCategory, int] | None

Per-category retention overrides for this agent.

None
agent_default_retention_days int | None

Agent-level default retention in days.

None

Returns:

Type Description
ConsolidationResult

Consolidation result from the consolidation step.

Source code in src/synthorg/memory/consolidation/service.py
async def run_maintenance(
    self,
    agent_id: NotBlankStr,
    *,
    agent_category_overrides: Mapping[MemoryCategory, int] | None = None,
    agent_default_retention_days: int | None = None,
) -> ConsolidationResult:
    """Run full maintenance cycle for an agent.

    Orchestrates: retention cleanup -> consolidation -> max enforcement.
    Gated by the ``memory.consolidation_enabled`` kill-switch: when
    ``False`` the whole cycle short-circuits (retention, consolidation,
    and max-memory enforcement all pause together so the scheduler can
    be resumed without partial state).

    Args:
        agent_id: Agent to maintain.
        agent_category_overrides: Per-category retention overrides
            for this agent.
        agent_default_retention_days: Agent-level default retention
            in days.

    Returns:
        Consolidation result from the consolidation step.
    """
    enabled = await resolve_bool_with_fallback(
        resolver=self._config_resolver,
        namespace="memory",
        key="consolidation_enabled",
        fallback=self._config.enabled,
    )
    if not enabled:
        logger.info(
            CONSOLIDATION_SKIPPED,
            agent_id=agent_id,
            reason="disabled_by_setting",
            scope="maintenance",
        )
        return ConsolidationResult()

    logger.info(MAINTENANCE_START, agent_id=agent_id)
    try:
        await self.cleanup_retention(
            agent_id,
            agent_category_overrides=agent_category_overrides,
            agent_default_retention_days=agent_default_retention_days,
        )
        result = await self._run_consolidation_unchecked(agent_id)
        await self.enforce_max_memories(agent_id)
    except Exception as exc:
        logger.warning(
            MAINTENANCE_FAILED,
            agent_id=agent_id,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        raise
    else:
        logger.info(MAINTENANCE_COMPLETE, agent_id=agent_id)
        return result

retention

Retention enforcer for memory lifecycle management.

Deletes memories that have exceeded their per-category retention period. Supports per-agent overrides that take priority over company-level defaults.

RetentionEnforcer

RetentionEnforcer(*, config, backend)

Enforces per-category memory retention policies.

Queries for memories older than the configured retention period and deletes them from the backend.

Parameters:

Name Type Description Default
config RetentionConfig

Retention configuration with per-category rules.

required
backend MemoryBackend

Memory backend for querying and deleting.

required
Source code in src/synthorg/memory/consolidation/retention.py
def __init__(
    self,
    *,
    config: RetentionConfig,
    backend: MemoryBackend,
) -> None:
    self._config = config
    self._backend = backend
    self._categories_to_check = self._build_categories_to_check(config)
    # Explicit per-category rules ONLY (not default-filled entries).
    # _resolve_categories depends on this distinction -- do not
    # include categories filled by the company global default here.
    self._explicit_rules = tuple(
        (rule.category, rule.retention_days) for rule in config.rules
    )

cleanup_expired async

cleanup_expired(
    agent_id,
    now=None,
    *,
    agent_category_overrides=None,
    agent_default_retention_days=None,
)

Delete memories that have exceeded their retention period.

Processes each category independently so that a failure in one category does not prevent cleanup of the remaining categories.

Processes up to 1000 expired entries per category per invocation. Multiple calls may be needed for categories with a large backlog.

When agent_category_overrides or agent_default_retention_days is provided, per-agent retention rules are merged with company defaults using the internal resolution chain (_resolve_categories).

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose memories to clean up.

required
now datetime | None

Current time (defaults to UTC now).

None
agent_category_overrides Mapping[MemoryCategory, int] | None

Per-category retention overrides for this agent (mapping of category to days).

None
agent_default_retention_days int | None

Agent-level default retention in days.

None

Returns:

Type Description
int

Number of expired memories deleted.

Source code in src/synthorg/memory/consolidation/retention.py
async def cleanup_expired(
    self,
    agent_id: NotBlankStr,
    now: datetime | None = None,
    *,
    agent_category_overrides: Mapping[MemoryCategory, int] | None = None,
    agent_default_retention_days: int | None = None,
) -> int:
    """Delete memories that have exceeded their retention period.

    Processes each category independently so that a failure in one
    category does not prevent cleanup of the remaining categories.

    Processes up to 1000 expired entries per category per
    invocation.  Multiple calls may be needed for categories
    with a large backlog.

    When *agent_category_overrides* or *agent_default_retention_days*
    is provided, per-agent retention rules are merged with company
    defaults using the internal resolution chain
    (``_resolve_categories``).

    Args:
        agent_id: Agent whose memories to clean up.
        now: Current time (defaults to UTC now).
        agent_category_overrides: Per-category retention overrides
            for this agent (mapping of category to days).
        agent_default_retention_days: Agent-level default retention
            in days.

    Returns:
        Number of expired memories deleted.
    """
    if now is None:
        now = datetime.now(UTC)

    categories_to_check = self._resolve_for_agent(
        agent_id,
        agent_category_overrides,
        agent_default_retention_days,
    )

    logger.info(RETENTION_CLEANUP_START, agent_id=agent_id)
    total_deleted = 0

    for category, retention_days in categories_to_check:
        try:
            cutoff = now - timedelta(days=retention_days)
            query = MemoryQuery(
                categories=frozenset({category}),
                until=cutoff,
                limit=1000,
            )
            expired = await self._backend.retrieve(agent_id, query)
            for entry in expired:
                deleted = await self._backend.delete(agent_id, entry.id)
                if deleted:
                    total_deleted += 1
                else:
                    logger.debug(
                        RETENTION_DELETE_SKIPPED,
                        agent_id=agent_id,
                        entry_id=entry.id,
                        category=category.value,
                    )
        except Exception as exc:
            logger.warning(
                RETENTION_CLEANUP_FAILED,
                agent_id=agent_id,
                category=category.value,
                error=safe_error_description(exc),
                error_type=type(exc).__name__,
            )
            continue

    logger.info(
        RETENTION_CLEANUP_COMPLETE,
        agent_id=agent_id,
        deleted_count=total_deleted,
    )
    return total_deleted

archival

Archival store protocol for long-term memory storage.

Defines the protocol for moving memories from the hot store into cold (archival) storage, with search and restore capabilities.

ArchivalStore

Bases: Protocol

Protocol for long-term memory archival storage.

Concrete implementations handle moving memories from the hot (active) store into cold storage for long-term preservation.

archive async

archive(entry)

Archive a memory entry.

Parameters:

Name Type Description Default
entry ArchivalEntry

The archival entry to store.

required

Returns:

Type Description
NotBlankStr

The assigned archive entry ID.

Source code in src/synthorg/memory/consolidation/archival.py
async def archive(self, entry: ArchivalEntry) -> NotBlankStr:
    """Archive a memory entry.

    Args:
        entry: The archival entry to store.

    Returns:
        The assigned archive entry ID.
    """
    ...

search async

search(agent_id, query)

Search archived entries for a specific agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose archived entries to search.

required
query MemoryQuery

Search parameters.

required

Returns:

Type Description
tuple[ArchivalEntry, ...]

Matching archived entries owned by the agent.

Source code in src/synthorg/memory/consolidation/archival.py
async def search(
    self,
    agent_id: NotBlankStr,
    query: MemoryQuery,
) -> tuple[ArchivalEntry, ...]:
    """Search archived entries for a specific agent.

    Args:
        agent_id: Agent whose archived entries to search.
        query: Search parameters.

    Returns:
        Matching archived entries owned by the agent.
    """
    ...

restore async

restore(agent_id, entry_id)

Restore a specific archived entry for a specific agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent who owns the archived entry.

required
entry_id NotBlankStr

The archive entry ID.

required

Returns:

Type Description
ArchivalEntry | None

The archived entry, or None if not found or not

ArchivalEntry | None

owned by the agent.

Source code in src/synthorg/memory/consolidation/archival.py
async def restore(
    self,
    agent_id: NotBlankStr,
    entry_id: NotBlankStr,
) -> ArchivalEntry | None:
    """Restore a specific archived entry for a specific agent.

    Args:
        agent_id: Agent who owns the archived entry.
        entry_id: The archive entry ID.

    Returns:
        The archived entry, or ``None`` if not found or not
        owned by the agent.
    """
    ...

count async

count(agent_id)

Count archived entries for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
int

Number of archived entries.

Source code in src/synthorg/memory/consolidation/archival.py
async def count(self, agent_id: NotBlankStr) -> int:
    """Count archived entries for an agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Number of archived entries.
    """
    ...

density

Content density classification for dual-mode archival.

Heuristic-based classifier that determines whether memory content is sparse (conversational, narrative) or dense (code, structured data, identifiers). Classification is deterministic -- no LLM calls.

ContentDensity

Bases: StrEnum

Classification of memory content density.

Determines the archival mode: sparse content receives abstractive LLM summarization, dense content receives extractive preservation.

SPARSE class-attribute instance-attribute

SPARSE = 'sparse'

Conversational, narrative, low information density.

DENSE class-attribute instance-attribute

DENSE = 'dense'

Code, structured data, identifiers, high information density.

DensityClassifier

DensityClassifier(*, dense_threshold=_DEFAULT_DENSE_THRESHOLD)

Heuristic content density classifier.

Classifies text as SPARSE or DENSE based on structural signals: code patterns, structured data markers, identifier density, numeric density, and line structure.

Parameters:

Name Type Description Default
dense_threshold float

Score threshold for DENSE classification (0.0-1.0). Lower values classify more content as dense.

_DEFAULT_DENSE_THRESHOLD

Raises:

Type Description
ValueError

If dense_threshold is outside [0.0, 1.0].

Source code in src/synthorg/memory/consolidation/density.py
def __init__(self, *, dense_threshold: float = _DEFAULT_DENSE_THRESHOLD) -> None:
    if not 0.0 <= dense_threshold <= 1.0:
        msg = f"dense_threshold must be in [0.0, 1.0], got {dense_threshold}"
        raise ValueError(msg)
    self._threshold = dense_threshold

classify

classify(content)

Classify content density.

Parameters:

Name Type Description Default
content str

Text to classify.

required

Returns:

Type Description
ContentDensity

DENSE if score >= threshold, SPARSE otherwise.

Source code in src/synthorg/memory/consolidation/density.py
def classify(self, content: str) -> ContentDensity:
    """Classify content density.

    Args:
        content: Text to classify.

    Returns:
        DENSE if score >= threshold, SPARSE otherwise.
    """
    score = (
        _WEIGHT_CODE * _code_pattern_score(content)
        + _WEIGHT_STRUCTURED * _structured_data_score(content)
        + _WEIGHT_IDENTIFIERS * _identifier_density_score(content)
        + _WEIGHT_NUMERIC * _numeric_density_score(content)
        + _WEIGHT_LINE_STRUCTURE * _line_structure_score(content)
    )
    return (
        ContentDensity.DENSE if score >= self._threshold else ContentDensity.SPARSE
    )

classify_batch

classify_batch(entries)

Classify density for a batch of memory entries.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to classify.

required

Returns:

Type Description
tuple[tuple[MemoryEntry, ContentDensity], ...]

Tuple of (entry, density) pairs in input order.

Source code in src/synthorg/memory/consolidation/density.py
def classify_batch(
    self,
    entries: tuple[MemoryEntry, ...],
) -> tuple[tuple[MemoryEntry, ContentDensity], ...]:
    """Classify density for a batch of memory entries.

    Args:
        entries: Memory entries to classify.

    Returns:
        Tuple of (entry, density) pairs in input order.
    """
    results = tuple((entry, self.classify(entry.content)) for entry in entries)

    if results:
        dense_count = sum(1 for _, d in results if d == ContentDensity.DENSE)
        logger.debug(
            DENSITY_CLASSIFICATION_COMPLETE,
            entry_count=len(results),
            dense_count=dense_count,
            sparse_count=len(results) - dense_count,
        )

    return results

extractive

Extractive preservation for dense memory content.

For dense content (code, structured data, identifiers), summarization is catastrophically lossy. Instead, this module extracts verbatim key facts and structural anchors (start/mid/end) to preserve the most important information.

ExtractivePreserver

ExtractivePreserver(
    *, max_facts=_DEFAULT_MAX_FACTS, anchor_length=_DEFAULT_ANCHOR_LENGTH
)

Extracts key facts and structural anchors from dense content.

For dense content (code, structured data, IDs), summarization is catastrophically lossy. Instead, this preserver extracts verbatim key facts (identifiers, URLs, version numbers, key-value pairs) and structural anchors (start/mid/end snippets of the original).

Parameters:

Name Type Description Default
max_facts int

Maximum number of key facts to extract.

_DEFAULT_MAX_FACTS
anchor_length int

Character length of each anchor snippet.

_DEFAULT_ANCHOR_LENGTH

Raises:

Type Description
ValueError

If max_facts or anchor_length is < 1.

Source code in src/synthorg/memory/consolidation/extractive.py
def __init__(
    self,
    *,
    max_facts: int = _DEFAULT_MAX_FACTS,
    anchor_length: int = _DEFAULT_ANCHOR_LENGTH,
) -> None:
    if max_facts < 1:
        msg = f"max_facts must be >= 1, got {max_facts}"
        raise ValueError(msg)
    if anchor_length < 1:
        msg = f"anchor_length must be >= 1, got {anchor_length}"
        raise ValueError(msg)
    self._max_facts = max_facts
    self._anchor_length = anchor_length

extract

extract(content)

Extract key facts and anchors from dense content.

Parameters:

Name Type Description Default
content str

The dense text to extract from.

required

Returns:

Type Description
str

Structured text block with extracted facts and anchors.

Source code in src/synthorg/memory/consolidation/extractive.py
def extract(self, content: str) -> str:
    """Extract key facts and anchors from dense content.

    Args:
        content: The dense text to extract from.

    Returns:
        Structured text block with extracted facts and anchors.
    """
    # Collect all facts, deduplicated, order-preserving
    all_facts: list[str] = []
    seen: set[str] = set()

    for fact in (
        *_extract_urls(content),
        *_extract_identifiers(content),
        *_extract_versions(content),
        *_extract_key_values(content),
    ):
        if fact not in seen:
            seen.add(fact)
            all_facts.append(fact)

    facts = all_facts[: self._max_facts]
    start, mid, end = _build_anchors(content, self._anchor_length)

    lines = ["[Extractive preservation]"]
    if facts:
        lines.append("Key facts:")
        lines.extend(f"- {fact}" for fact in facts)
    lines.append(f"[START] {start}")
    if mid:
        lines.append(f"[MID] {mid}")
    if end:
        lines.append(f"[END] {end}")

    result = "\n".join(lines)

    logger.debug(
        DUAL_MODE_EXTRACTIVE_PRESERVED,
        content_length=len(content),
        fact_count=len(facts),
        anchor_length=self._anchor_length,
    )

    return result

abstractive

Abstractive summarizer for sparse memory content.

Uses an LLM (via CompletionProvider) to generate concise summaries of conversational/narrative memory content. Falls back to truncation if the LLM call fails.

AbstractiveSummarizer

AbstractiveSummarizer(
    *,
    provider,
    model,
    max_summary_tokens=_DEFAULT_MAX_SUMMARY_TOKENS,
    temperature=_DEFAULT_TEMPERATURE,
    cost_tracker=None,
)

LLM-based abstractive summarizer for sparse content.

Uses a CompletionProvider to generate concise summaries of conversational/narrative memory content. Falls back to truncation if the LLM call fails with a retryable error.

Parameters:

Name Type Description Default
provider CompletionProvider

Completion provider for LLM calls.

required
model NotBlankStr

Model identifier to use for summarization.

required
max_summary_tokens int

Maximum tokens for the summary response.

_DEFAULT_MAX_SUMMARY_TOKENS
temperature float

Sampling temperature for summarization.

_DEFAULT_TEMPERATURE

Raises:

Type Description
ValueError

If model is empty or whitespace-only.

Source code in src/synthorg/memory/consolidation/abstractive.py
def __init__(
    self,
    *,
    provider: CompletionProvider,
    model: NotBlankStr,
    max_summary_tokens: int = _DEFAULT_MAX_SUMMARY_TOKENS,
    temperature: float = _DEFAULT_TEMPERATURE,
    cost_tracker: CostTracker | None = None,
) -> None:
    if not model or not model.strip():
        msg = "model must be a non-blank string"
        raise ValueError(msg)
    self._provider = provider
    self._model = model
    self._cost_tracker = cost_tracker
    self._config = CompletionConfig(
        temperature=temperature,
        max_tokens=max_summary_tokens,
    )

summarize async

summarize(content, *, agent_id=None)

Generate an abstractive summary of the given content.

Falls back to truncation if the LLM call fails with a retryable error or returns empty content. Non-retryable provider errors (authentication, invalid model) propagate.

Parameters:

Name Type Description Default
content str

The sparse/conversational text to summarize.

required
agent_id NotBlankStr | None

Owning agent for cost attribution. When None and a cost_tracker was wired, the call is attributed to "system" with task_id "system:memory:abstractive".

None

Returns:

Type Description
str

Summary text.

Source code in src/synthorg/memory/consolidation/abstractive.py
async def summarize(
    self,
    content: str,
    *,
    agent_id: NotBlankStr | None = None,
) -> str:
    """Generate an abstractive summary of the given content.

    Falls back to truncation if the LLM call fails with a
    retryable error or returns empty content.  Non-retryable
    provider errors (authentication, invalid model) propagate.

    Args:
        content: The sparse/conversational text to summarize.
        agent_id: Owning agent for cost attribution.  When
            ``None`` and a ``cost_tracker`` was wired, the call
            is attributed to ``"system"`` with ``task_id``
            ``"system:memory:abstractive"``.

    Returns:
        Summary text.
    """
    try:
        # ``content`` is the raw memory body, which may have
        # absorbed adversarial peer/tool output upstream. Wrap
        # it in a ``<untrusted-artifact>`` fence; the system
        # prompt carries the matching directive.
        messages = [
            ChatMessage(role=MessageRole.SYSTEM, content=_SYSTEM_PROMPT),
            ChatMessage(
                role=MessageRole.USER,
                content=wrap_untrusted(TAG_UNTRUSTED_ARTIFACT, content),
            ),
        ]
        attribution_agent: NotBlankStr = agent_id or NotBlankStr("system")
        attribution_task: NotBlankStr = NotBlankStr("system:memory:abstractive")
        async with cost_recording_scope(
            cost_tracker=self._cost_tracker,
            agent_id=attribution_agent,
            task_id=attribution_task,
            call_category=LLMCallCategory.SYSTEM,
        ):
            response = await self._provider.complete(
                messages,
                self._model,
                config=self._config,
            )
        if response.content and response.content.strip():
            logger.debug(
                DUAL_MODE_ABSTRACTIVE_SUMMARY,
                content_length=len(content),
                summary_length=len(response.content),
                model=self._model,
            )
            return response.content.strip()
    except MemoryError, RecursionError:
        raise
    except ProviderError as exc:
        if not exc.is_retryable:
            logger.warning(
                DUAL_MODE_ABSTRACTIVE_FALLBACK,
                content_length=len(content),
                error=safe_error_description(exc),
                error_type=type(exc).__name__,
                retryable=False,
            )
            raise
        logger.warning(
            DUAL_MODE_ABSTRACTIVE_FALLBACK,
            content_length=len(content),
            error=safe_error_description(exc),
            error_type=type(exc).__name__,
        )
        return _truncate_fallback(content)
    except Exception as exc:
        logger.warning(
            DUAL_MODE_ABSTRACTIVE_FALLBACK,
            content_length=len(content),
            error=safe_error_description(exc),
            error_type=type(exc).__name__,
        )
        return _truncate_fallback(content)

    # Fallback: empty/whitespace-only LLM response
    logger.debug(
        DUAL_MODE_ABSTRACTIVE_FALLBACK,
        content_length=len(content),
        reason="empty_response",
    )
    return _truncate_fallback(content)

summarize_batch async

summarize_batch(entries)

Summarize multiple entries concurrently.

Each entry is summarized independently via asyncio.TaskGroup. Failures for individual entries fall back to truncation without aborting the batch.

Parameters:

Name Type Description Default
entries tuple[MemoryEntry, ...]

Memory entries to summarize.

required

Returns:

Type Description
tuple[tuple[NotBlankStr, str], ...]

Tuple of (entry_id, summary) pairs in input order.

Source code in src/synthorg/memory/consolidation/abstractive.py
async def summarize_batch(
    self,
    entries: tuple[MemoryEntry, ...],
) -> tuple[tuple[NotBlankStr, str], ...]:
    """Summarize multiple entries concurrently.

    Each entry is summarized independently via ``asyncio.TaskGroup``.
    Failures for individual entries fall back to truncation without
    aborting the batch.

    Args:
        entries: Memory entries to summarize.

    Returns:
        Tuple of ``(entry_id, summary)`` pairs in input order.
    """
    if not entries:
        return ()

    results: dict[NotBlankStr, str] = {}
    async with asyncio.TaskGroup() as tg:
        tasks: dict[NotBlankStr, asyncio.Task[str]] = {}
        for entry in entries:
            tasks[entry.id] = tg.create_task(
                self.summarize(entry.content, agent_id=entry.agent_id),
            )

    for entry_id, task in tasks.items():
        results[entry_id] = task.result()

    return tuple((entry.id, results[entry.id]) for entry in entries)