Skip to content

Communication

Inter-agent messaging: bus, dispatcher, delegation, loop prevention, conflict resolution, and meeting protocols.

Message

message

Message domain models (see Communication design page).

Part module-attribute

Part = Annotated[TextPart | DataPart | FilePart | UriPart, Discriminator('type')]

Discriminated union of message content parts.

Pydantic uses the type literal field on each part to determine which subtype to deserialize into.

TextPart pydantic-model

Bases: BaseModel

Plain text message content.

Attributes:

Name Type Description
type Literal['text']

Discriminator literal (always "text").

text NotBlankStr

The text content (must not be blank).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

type pydantic-field

type = 'text'

Part type discriminator

text pydantic-field

text

Text content

DataPart pydantic-model

Bases: BaseModel

Structured JSON data attached to a message.

The data field is deep-copied and recursively frozen at construction (dict -> MappingProxyType, list -> tuple, set -> frozenset) to preserve the immutability contract of frozen Pydantic models.

Attributes:

Name Type Description
type Literal['data']

Discriminator literal (always "data").

data MappingProxyType[str, Any]

Structured JSON content (read-only after construction).

Config:

  • frozen: True
  • allow_inf_nan: False
  • arbitrary_types_allowed: True
  • extra: forbid

Fields:

  • type (Literal['data'])
  • data (MappingProxyType[str, Any])

Validators:

  • _deep_copy_and_freeze_datadata

type pydantic-field

type = 'data'

Part type discriminator

data pydantic-field

data

Structured JSON content (read-only)

model_copy

model_copy(*, update=None, deep=False)

Override to re-freeze data when model_copy updates it.

Source code in src/synthorg/communication/message.py
def model_copy(
    self,
    *,
    update: Mapping[str, Any] | None = None,
    deep: bool = False,
) -> Self:
    """Override to re-freeze data when model_copy updates it."""
    result = super().model_copy(update=update, deep=deep)
    if update and "data" in update:
        raw = update["data"]
        if isinstance(raw, MappingProxyType):
            raw = dict(raw)
        frozen = freeze_recursive(deep_copy_mapping(raw))
        object.__setattr__(result, "data", frozen)
    return result

FilePart pydantic-model

Bases: BaseModel

Reference to a file resource.

Attributes:

Name Type Description
type Literal['file']

Discriminator literal (always "file").

uri NotBlankStr

File URI or path.

mime_type NotBlankStr | None

Optional MIME type of the file.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

type pydantic-field

type = 'file'

Part type discriminator

uri pydantic-field

uri

File URI or path

mime_type pydantic-field

mime_type = None

Optional MIME type

UriPart pydantic-model

Bases: BaseModel

Reference to an external URI resource.

Attributes:

Name Type Description
type Literal['uri']

Discriminator literal (always "uri").

uri NotBlankStr

The URI or URL.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

type pydantic-field

type = 'uri'

Part type discriminator

uri pydantic-field

uri

URI or URL

MessageMetadata pydantic-model

Bases: BaseModel

Optional metadata carried with a message.

Extends the Communication design page metadata with an additional extra field for arbitrary key-value pairs.

Attributes:

Name Type Description
task_id NotBlankStr | None

Related task identifier.

project_id NotBlankStr | None

Related project identifier.

tokens_used int | None

LLM tokens consumed producing the message.

cost float | None

Estimated cost of the message in the configured currency.

extra tuple[tuple[str, str], ...]

Immutable key-value pairs for arbitrary metadata (extension).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_extra

task_id pydantic-field

task_id = None

Related task identifier

project_id pydantic-field

project_id = None

Related project identifier

tokens_used pydantic-field

tokens_used = None

LLM tokens consumed

cost pydantic-field

cost = None

Estimated cost in the configured currency

extra pydantic-field

extra = ()

Immutable key-value pairs for arbitrary metadata

Message pydantic-model

Bases: BaseModel

An inter-agent message.

Field schema is based on the Communication design page with typed refinements. The sender field is aliased to "from" for JSON compatibility with the spec format.

Content is represented as a tuple of typed Part objects (text, data, file, URI) rather than a flat string, enabling rich multi-part messages.

Attributes:

Name Type Description
id UUID

Unique message identifier.

timestamp AwareDatetime

When the message was created (must be timezone-aware).

sender NotBlankStr

Agent ID of the sender (aliased to "from" in JSON).

to NotBlankStr

Recipient agent or channel identifier.

type MessageType

Message type classification.

priority MessagePriority

Message priority level.

channel NotBlankStr

Channel the message is sent through.

parts tuple[Part, ...]

Ordered content parts (text, data, files, URIs).

metadata MessageMetadata

Optional message metadata.

Config:

  • frozen: True
  • populate_by_name: True
  • allow_inf_nan: False
  • arbitrary_types_allowed: True

Fields:

id pydantic-field

id

Unique message identifier

timestamp pydantic-field

timestamp

When the message was created (must be timezone-aware)

sender pydantic-field

sender

Sender agent ID

to pydantic-field

to

Recipient agent or channel

type pydantic-field

type

Message type classification

priority pydantic-field

priority = NORMAL

Message priority level

channel pydantic-field

channel

Channel the message is sent through

parts pydantic-field

parts

Ordered content parts (text, data, files, URIs)

attachments pydantic-field

attachments = ()

Out-of-band parts attached to the message (files, data blobs, URIs) that are not part of the primary content flow. Persisted alongside the message and round-tripped by both backends.

metadata pydantic-field

metadata

Optional message metadata

text property

text

Extract the first TextPart text, or empty string.

Convenience accessor for consumers that only need the primary text content of a message.

Messenger

messenger

Per-agent messenger facade over the message bus (see Communication design page).

AgentMessenger

AgentMessenger(agent_id, agent_name, bus, dispatcher=None)

Per-agent facade for sending, receiving, and dispatching messages.

Wraps a :class:MessageBus and optional :class:MessageDispatcher to provide a high-level API that auto-fills sender, timestamp, and message ID.

Parameters:

Name Type Description Default
agent_id str

Identifier of the owning agent.

required
agent_name str

Human-readable name of the agent.

required
bus MessageBus

The underlying message bus.

required
dispatcher MessageDispatcher | None

Optional message dispatcher for handler routing.

None

Raises:

Type Description
ValueError

If agent_id or agent_name is blank.

Source code in src/synthorg/communication/messenger.py
def __init__(
    self,
    agent_id: str,
    agent_name: str,
    bus: MessageBus,
    dispatcher: MessageDispatcher | None = None,
) -> None:
    if not agent_id.strip():
        logger.warning(
            COMM_MESSENGER_INVALID_AGENT,
            field="agent_id",
            value=repr(agent_id),
        )
        msg = "agent_id must not be blank"
        raise ValueError(msg)
    if not agent_name.strip():
        logger.warning(
            COMM_MESSENGER_INVALID_AGENT,
            field="agent_name",
            value=repr(agent_name),
        )
        msg = "agent_name must not be blank"
        raise ValueError(msg)
    self._agent_id = agent_id
    self._agent_name = agent_name
    self._bus = bus
    self._dispatcher = dispatcher
    logger.debug(
        COMM_MESSENGER_CREATED,
        agent_id=agent_id,
        agent_name=agent_name,
    )

send_message async

send_message(*, to, channel, content=None, parts=None, message_type, priority=NORMAL)

Send a message to a channel.

Auto-fills sender, timestamp, and message ID. Provide either content (auto-wrapped as a TextPart) or parts for rich multi-part messages.

Parameters:

Name Type Description Default
to str

Recipient agent or channel identifier.

required
channel str

Channel to publish through.

required
content str | None

Message body text (auto-wrapped as TextPart).

None
parts tuple[Part, ...] | None

Explicit message parts (mutually exclusive with content).

None
message_type MessageType

Message type classification.

required
priority MessagePriority

Message priority level.

NORMAL

Returns:

Type Description
Message

The constructed and published message.

Raises:

Type Description
ValueError

If neither or both content/parts are provided.

ChannelNotFoundError

If the channel does not exist.

MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def send_message(  # noqa: PLR0913
    self,
    *,
    to: str,
    channel: str,
    content: str | None = None,
    parts: tuple[Part, ...] | None = None,
    message_type: MessageType,
    priority: MessagePriority = MessagePriority.NORMAL,
) -> Message:
    """Send a message to a channel.

    Auto-fills sender, timestamp, and message ID.  Provide either
    *content* (auto-wrapped as a ``TextPart``) or *parts* for rich
    multi-part messages.

    Args:
        to: Recipient agent or channel identifier.
        channel: Channel to publish through.
        content: Message body text (auto-wrapped as TextPart).
        parts: Explicit message parts (mutually exclusive with content).
        message_type: Message type classification.
        priority: Message priority level.

    Returns:
        The constructed and published message.

    Raises:
        ValueError: If neither or both content/parts are provided.
        ChannelNotFoundError: If the channel does not exist.
        MessageBusNotRunningError: If the bus is not running.
    """
    resolved = _resolve_parts(content, parts)
    msg = Message(
        timestamp=datetime.now(UTC),
        sender=self._agent_id,
        to=to,
        type=message_type,
        priority=priority,
        channel=channel,
        parts=resolved,
    )
    await self._bus.publish(msg)
    logger.info(
        COMM_MESSAGE_SENT,
        agent_id=self._agent_id,
        to=to,
        channel=channel,
        message_id=str(msg.id),
    )
    return msg

send_direct async

send_direct(*, to, content=None, parts=None, message_type, priority=NORMAL)

Send a direct message to another agent.

Auto-fills sender, timestamp, and message ID. The message's channel field is set to the deterministic @{a}:{b} channel name computed from the sorted agent ID pair.

Parameters:

Name Type Description Default
to str

Recipient agent ID.

required
content str | None

Message body text (auto-wrapped as TextPart).

None
parts tuple[Part, ...] | None

Explicit message parts (mutually exclusive with content).

None
message_type MessageType

Message type classification.

required
priority MessagePriority

Message priority level.

NORMAL

Returns:

Type Description
Message

The constructed and sent message.

Raises:

Type Description
ValueError

If neither or both content/parts are provided.

MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def send_direct(
    self,
    *,
    to: str,
    content: str | None = None,
    parts: tuple[Part, ...] | None = None,
    message_type: MessageType,
    priority: MessagePriority = MessagePriority.NORMAL,
) -> Message:
    """Send a direct message to another agent.

    Auto-fills sender, timestamp, and message ID. The message's
    ``channel`` field is set to the deterministic ``@{a}:{b}``
    channel name computed from the sorted agent ID pair.

    Args:
        to: Recipient agent ID.
        content: Message body text (auto-wrapped as TextPart).
        parts: Explicit message parts (mutually exclusive with content).
        message_type: Message type classification.
        priority: Message priority level.

    Returns:
        The constructed and sent message.

    Raises:
        ValueError: If neither or both content/parts are provided.
        MessageBusNotRunningError: If the bus is not running.
    """
    resolved = _resolve_parts(content, parts)
    channel = _direct_channel_name(self._agent_id, to)
    msg = Message(
        timestamp=datetime.now(UTC),
        sender=self._agent_id,
        to=to,
        type=message_type,
        priority=priority,
        channel=channel,
        parts=resolved,
    )
    await self._bus.send_direct(msg, recipient=to)
    logger.info(
        COMM_MESSAGE_SENT,
        agent_id=self._agent_id,
        to=to,
        channel=channel,
        message_id=str(msg.id),
    )
    return msg

broadcast async

broadcast(
    *, content=None, parts=None, message_type, priority=NORMAL, channel="#all-hands"
)

Publish an announcement to a shared channel.

Agents must be subscribed to the target channel to receive the message. For true fan-out to all known agents, use a channel of type :attr:ChannelType.BROADCAST.

Parameters:

Name Type Description Default
content str | None

Message body text (auto-wrapped as TextPart).

None
parts tuple[Part, ...] | None

Explicit message parts (mutually exclusive with content).

None
message_type MessageType

Message type classification.

required
priority MessagePriority

Message priority level.

NORMAL
channel str

Channel name (default "#all-hands").

'#all-hands'

Returns:

Type Description
Message

The constructed and published message.

Raises:

Type Description
ValueError

If neither or both content/parts are provided.

ChannelNotFoundError

If the channel does not exist.

MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def broadcast(
    self,
    *,
    content: str | None = None,
    parts: tuple[Part, ...] | None = None,
    message_type: MessageType,
    priority: MessagePriority = MessagePriority.NORMAL,
    channel: str = "#all-hands",
) -> Message:
    """Publish an announcement to a shared channel.

    Agents must be subscribed to the target channel to receive
    the message.  For true fan-out to all known agents, use a
    channel of type :attr:`ChannelType.BROADCAST`.

    Args:
        content: Message body text (auto-wrapped as TextPart).
        parts: Explicit message parts (mutually exclusive with content).
        message_type: Message type classification.
        priority: Message priority level.
        channel: Channel name (default ``"#all-hands"``).

    Returns:
        The constructed and published message.

    Raises:
        ValueError: If neither or both content/parts are provided.
        ChannelNotFoundError: If the channel does not exist.
        MessageBusNotRunningError: If the bus is not running.
    """
    resolved = _resolve_parts(content, parts)
    msg = Message(
        timestamp=datetime.now(UTC),
        sender=self._agent_id,
        to=channel,
        type=message_type,
        priority=priority,
        channel=channel,
        parts=resolved,
    )
    await self._bus.publish(msg)
    logger.info(
        COMM_MESSAGE_BROADCAST,
        agent_id=self._agent_id,
        channel=channel,
        message_id=str(msg.id),
    )
    return msg

subscribe async

subscribe(channel_name)

Subscribe this agent to a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to subscribe to.

required

Returns:

Type Description
Subscription

The subscription record.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def subscribe(self, channel_name: NotBlankStr) -> Subscription:
    """Subscribe this agent to a channel.

    Args:
        channel_name: Channel to subscribe to.

    Returns:
        The subscription record.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
        MessageBusNotRunningError: If the bus is not running.
    """
    sub = await self._bus.subscribe(channel_name, self._agent_id)
    logger.info(
        COMM_MESSENGER_SUBSCRIBED,
        agent_id=self._agent_id,
        channel=channel_name,
    )
    return sub

unsubscribe async

unsubscribe(channel_name)

Unsubscribe this agent from a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to unsubscribe from.

required

Raises:

Type Description
NotSubscribedError

If not currently subscribed.

Source code in src/synthorg/communication/messenger.py
async def unsubscribe(self, channel_name: NotBlankStr) -> None:
    """Unsubscribe this agent from a channel.

    Args:
        channel_name: Channel to unsubscribe from.

    Raises:
        NotSubscribedError: If not currently subscribed.
    """
    await self._bus.unsubscribe(channel_name, self._agent_id)
    logger.info(
        COMM_MESSENGER_UNSUBSCRIBED,
        agent_id=self._agent_id,
        channel=channel_name,
    )

receive async

receive(channel_name, *, timeout=None)

Receive the next message from a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to receive from.

required
timeout float | None

Max seconds to wait, or None for indefinite.

None

Returns:

Type Description
DeliveryEnvelope | None

The next delivery envelope, or None when:

DeliveryEnvelope | None
  • timeout expires without a message arriving.
DeliveryEnvelope | None
  • The bus is shut down while waiting.
DeliveryEnvelope | None
  • The subscription is cancelled via :meth:unsubscribe while a receive() call is in flight.
Source code in src/synthorg/communication/messenger.py
async def receive(
    self,
    channel_name: NotBlankStr,
    *,
    timeout: float | None = None,  # noqa: ASYNC109
) -> DeliveryEnvelope | None:
    """Receive the next message from a channel.

    Args:
        channel_name: Channel to receive from.
        timeout: Max seconds to wait, or ``None`` for indefinite.

    Returns:
        The next delivery envelope, or ``None`` when:

        - *timeout* expires without a message arriving.
        - The bus is shut down while waiting.
        - The subscription is cancelled via :meth:`unsubscribe`
            while a ``receive()`` call is in flight.
    """
    return await self._bus.receive(
        channel_name,
        self._agent_id,
        timeout=timeout,
    )

register_handler

register_handler(handler, *, message_types=None, min_priority=LOW, name='unnamed')

Register a message handler.

Creates a dispatcher automatically if one was not provided at construction time.

Parameters:

Name Type Description Default
handler MessageHandler | MessageHandlerFunc

The handler instance or async function.

required
message_types frozenset[MessageType] | None

Message types to match (empty/None = all).

None
min_priority MessagePriority

Minimum priority to accept.

LOW
name str

Human-readable label for debugging.

'unnamed'

Returns:

Type Description
str

The unique handler registration ID.

Source code in src/synthorg/communication/messenger.py
def register_handler(
    self,
    handler: MessageHandler | MessageHandlerFunc,
    *,
    message_types: frozenset[MessageType] | None = None,
    min_priority: MessagePriority = MessagePriority.LOW,
    name: str = "unnamed",
) -> str:
    """Register a message handler.

    Creates a dispatcher automatically if one was not provided
    at construction time.

    Args:
        handler: The handler instance or async function.
        message_types: Message types to match (empty/None = all).
        min_priority: Minimum priority to accept.
        name: Human-readable label for debugging.

    Returns:
        The unique handler registration ID.
    """
    if self._dispatcher is None:
        self._dispatcher = MessageDispatcher(
            agent_id=self._agent_id,
        )
    return self._dispatcher.register(
        handler,
        message_types=message_types,
        min_priority=min_priority,
        name=name,
    )

deregister_handler

deregister_handler(handler_id)

Remove a previously registered handler.

Parameters:

Name Type Description Default
handler_id str

The registration ID.

required

Returns:

Type Description
bool

True if the handler was found and removed.

Source code in src/synthorg/communication/messenger.py
def deregister_handler(self, handler_id: str) -> bool:
    """Remove a previously registered handler.

    Args:
        handler_id: The registration ID.

    Returns:
        True if the handler was found and removed.
    """
    if self._dispatcher is None:
        logger.debug(
            COMM_HANDLER_DEREGISTER_MISS,
            agent_id=self._agent_id,
            handler_id=handler_id,
        )
        return False
    return self._dispatcher.deregister(handler_id)

dispatch_message async

dispatch_message(message)

Dispatch an incoming message to registered handlers.

Parameters:

Name Type Description Default
message Message

The message to dispatch.

required

Returns:

Name Type Description
A DispatchResult

class:DispatchResult summarising the outcome.

Source code in src/synthorg/communication/messenger.py
async def dispatch_message(self, message: Message) -> DispatchResult:
    """Dispatch an incoming message to registered handlers.

    Args:
        message: The message to dispatch.

    Returns:
        A :class:`DispatchResult` summarising the outcome.
    """
    if self._dispatcher is None:
        logger.debug(
            COMM_DISPATCH_NO_DISPATCHER,
            agent_id=self._agent_id,
            message_id=str(message.id),
        )
        return DispatchResult(
            message_id=message.id,
            handlers_succeeded=0,
            handlers_failed=0,
        )
    return await self._dispatcher.dispatch(message)

Dispatcher

dispatcher

Message dispatcher -- routes incoming messages to registered handlers.

See the Communication design page.

DispatchResult pydantic-model

Bases: BaseModel

Immutable outcome of dispatching a single message.

Attributes:

Name Type Description
message_id UUID

The dispatched message's identifier.

handlers_matched int

Derived count (succeeded + failed).

handlers_succeeded int

Number of handlers that completed without error.

handlers_failed int

Number of handlers that raised an exception.

errors tuple[str, ...]

Error descriptions from failed handlers.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • message_id (UUID)
  • handlers_succeeded (int)
  • handlers_failed (int)
  • errors (tuple[str, ...])

handlers_matched property

handlers_matched

Total handlers that matched (succeeded + failed).

MessageDispatcher

MessageDispatcher(agent_id='unknown')

Per-agent dispatcher that routes messages to registered handlers.

Parameters:

Name Type Description Default
agent_id str

Identifier of the owning agent (for logging context).

'unknown'
Source code in src/synthorg/communication/dispatcher.py
def __init__(self, agent_id: str = "unknown") -> None:
    self._agent_id = agent_id
    self._registrations: dict[str, HandlerRegistration] = {}

register

register(handler, *, message_types=None, min_priority=LOW, name='unnamed')

Register a handler for incoming messages.

If handler is a bare async function, it is automatically wrapped in a :class:FunctionHandler.

Parameters:

Name Type Description Default
handler MessageHandler | MessageHandlerFunc

The handler instance or async function.

required
message_types frozenset[MessageType] | None

Message types to match (empty/None = all).

None
min_priority MessagePriority

Minimum priority to accept.

LOW
name str

Human-readable label for debugging.

'unnamed'

Returns:

Type Description
str

The unique handler registration ID.

Source code in src/synthorg/communication/dispatcher.py
def register(
    self,
    handler: MessageHandler | MessageHandlerFunc,
    *,
    message_types: frozenset[MessageType] | None = None,
    min_priority: MessagePriority = MessagePriority.LOW,
    name: str = "unnamed",
) -> str:
    """Register a handler for incoming messages.

    If *handler* is a bare async function, it is automatically wrapped
    in a :class:`FunctionHandler`.

    Args:
        handler: The handler instance or async function.
        message_types: Message types to match (empty/None = all).
        min_priority: Minimum priority to accept.
        name: Human-readable label for debugging.

    Returns:
        The unique handler registration ID.
    """
    if not isinstance(handler, MessageHandler):
        handler = FunctionHandler(handler)
    elif not inspect.iscoroutinefunction(handler.handle):
        msg = (
            f"MessageHandler {type(handler).__name__!r} has a "
            f"synchronous handle() -- must be async"
        )
        logger.warning(
            COMM_HANDLER_INVALID,
            agent_id=self._agent_id,
            handler_name=name,
            handler_type=type(handler).__name__,
            error=msg,
        )
        raise TypeError(msg)

    registration = HandlerRegistration(
        handler=handler,
        message_types=message_types or frozenset(),
        min_priority=min_priority,
        name=name,
    )
    self._registrations[registration.handler_id] = registration
    logger.info(
        COMM_HANDLER_REGISTERED,
        agent_id=self._agent_id,
        handler_id=registration.handler_id,
        handler_name=name,
    )
    return registration.handler_id

deregister

deregister(handler_id)

Remove a previously registered handler.

Parameters:

Name Type Description Default
handler_id str

The registration ID returned by :meth:register.

required

Returns:

Type Description
bool

True if the handler was found and removed, False otherwise.

Source code in src/synthorg/communication/dispatcher.py
def deregister(self, handler_id: str) -> bool:
    """Remove a previously registered handler.

    Args:
        handler_id: The registration ID returned by :meth:`register`.

    Returns:
        True if the handler was found and removed, False otherwise.
    """
    removed = self._registrations.pop(handler_id, None)
    if removed is not None:
        logger.info(
            COMM_HANDLER_DEREGISTERED,
            agent_id=self._agent_id,
            handler_id=handler_id,
            handler_name=removed.name,
        )
        return True
    logger.debug(
        COMM_HANDLER_DEREGISTER_MISS,
        agent_id=self._agent_id,
        handler_id=handler_id,
    )
    return False

dispatch async

dispatch(message)

Route a message to all matching handlers concurrently.

Most handler Exception subclasses are isolated; their errors are captured into the per-handler errors slot without affecting siblings. MemoryError and RecursionError are deliberate carve-outs: the interpreter is in catastrophic state, so they re-raise to abort the TaskGroup and cancel all remaining handlers rather than being absorbed silently. BaseException subclasses (e.g. KeyboardInterrupt, asyncio.CancelledError) likewise propagate through the TaskGroup, cancelling all remaining handlers.

Parameters:

Name Type Description Default
message Message

The message to dispatch.

required

Returns:

Name Type Description
A DispatchResult

class:DispatchResult summarising the outcome.

Source code in src/synthorg/communication/dispatcher.py
async def dispatch(self, message: Message) -> DispatchResult:
    """Route a message to all matching handlers concurrently.

    Most handler ``Exception`` subclasses are isolated; their
    errors are captured into the per-handler ``errors`` slot
    without affecting siblings. ``MemoryError`` and
    ``RecursionError`` are deliberate carve-outs: the interpreter
    is in catastrophic state, so they re-raise to abort the
    ``TaskGroup`` and cancel all remaining handlers rather than
    being absorbed silently. ``BaseException`` subclasses (e.g.
    ``KeyboardInterrupt``, ``asyncio.CancelledError``) likewise
    propagate through the ``TaskGroup``, cancelling all remaining
    handlers.

    Args:
        message: The message to dispatch.

    Returns:
        A :class:`DispatchResult` summarising the outcome.
    """
    matched = [
        reg for reg in self._registrations.values() if self._matches(reg, message)
    ]

    logger.debug(
        COMM_DISPATCH_START,
        agent_id=self._agent_id,
        message_id=str(message.id),
        message_type=message.type,
    )

    if not matched:
        logger.debug(
            COMM_DISPATCH_NO_HANDLERS,
            agent_id=self._agent_id,
            message_id=str(message.id),
        )
        return DispatchResult(
            message_id=message.id,
            handlers_succeeded=0,
            handlers_failed=0,
        )

    logger.debug(
        COMM_DISPATCH_HANDLER_MATCHED,
        agent_id=self._agent_id,
        message_id=str(message.id),
        count=len(matched),
    )

    errors: list[str | None] = [None] * len(matched)

    async with asyncio.TaskGroup() as tg:
        for idx, reg in enumerate(matched):
            tg.create_task(
                self._guarded_handle(reg, message, errors, idx),
            )

    error_msgs = tuple(e for e in errors if e is not None)
    succeeded = len(matched) - len(error_msgs)

    logger.info(
        COMM_DISPATCH_COMPLETE,
        agent_id=self._agent_id,
        message_id=str(message.id),
        matched=len(matched),
        succeeded=succeeded,
        failed=len(error_msgs),
    )

    return DispatchResult(
        message_id=message.id,
        handlers_succeeded=succeeded,
        handlers_failed=len(error_msgs),
        errors=error_msgs,
    )

Bus Protocol

bus_protocol

Message bus protocol (see Communication design page).

Defines the swappable interface for message bus backends. The default implementation is :class:InMemoryMessageBus in synthorg.communication.bus.memory.

MessageBus

Bases: Protocol

Protocol for message bus backends.

All implementations must support the full lifecycle (start/stop), channel management, pub/sub messaging, direct messaging, and channel history.

Uses a pull model: consumers call :meth:receive to get the next message rather than registering push callbacks.

is_running property

is_running

Whether the bus client thinks it is running.

Local state check: flipped by :meth:start / :meth:stop. Does not reflect live server reachability -- use :meth:health_check for that.

start async

start()

Start the bus and create pre-configured channels.

Raises:

Type Description
MessageBusAlreadyRunningError

If the bus is already running.

Source code in src/synthorg/communication/bus_protocol.py
async def start(self) -> None:
    """Start the bus and create pre-configured channels.

    Raises:
        MessageBusAlreadyRunningError: If the bus is already running.
    """
    ...

stop async

stop()

Stop the bus gracefully. Idempotent.

Source code in src/synthorg/communication/bus_protocol.py
async def stop(self) -> None:
    """Stop the bus gracefully.  Idempotent."""
    ...

health_check async

health_check()

Live liveness probe against the underlying transport.

Unlike :attr:is_running, this performs an actual round-trip to confirm the broker is reachable and the connection is healthy. Implementations return False rather than raising when the probe fails, so the caller can treat the bus as degraded without a try/except.

Source code in src/synthorg/communication/bus_protocol.py
async def health_check(self) -> bool:
    """Live liveness probe against the underlying transport.

    Unlike :attr:`is_running`, this performs an actual
    round-trip to confirm the broker is reachable and the
    connection is healthy. Implementations return ``False``
    rather than raising when the probe fails, so the caller
    can treat the bus as degraded without a try/except.
    """
    ...

publish async

publish(message, *, ttl_seconds=None)

Publish a message to its channel.

The target channel is determined by message.channel.

Parameters:

Name Type Description Default
message Message

The message to publish.

required
ttl_seconds float | None

Optional per-message TTL in seconds. When set, the message expires after this duration on the server. Requires the NATS stream to have allow_msg_ttl=True. None (default) defers to stream retention policy.

None

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelNotFoundError

If the target channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def publish(
    self,
    message: Message,
    *,
    ttl_seconds: float | None = None,
) -> None:
    """Publish a message to its channel.

    The target channel is determined by ``message.channel``.

    Args:
        message: The message to publish.
        ttl_seconds: Optional per-message TTL in seconds.  When set,
            the message expires after this duration on the server.
            Requires the NATS stream to have ``allow_msg_ttl=True``.
            ``None`` (default) defers to stream retention policy.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelNotFoundError: If the target channel does not exist.
    """
    ...

send_direct async

send_direct(message, *, recipient, ttl_seconds=None)

Send a direct message between two agents.

Lazily creates a DIRECT channel named @{a}:{b} (where a, b are the sorted agent IDs) and subscribes both agents.

Parameters:

Name Type Description Default
message Message

The message to send (message.sender is the sender).

required
recipient NotBlankStr

The recipient agent ID.

required
ttl_seconds float | None

Optional per-message TTL in seconds.

None

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ValueError

If the recipient does not match message.to, an agent ID contains the reserved separator, or the message exceeds the payload limit.

Source code in src/synthorg/communication/bus_protocol.py
async def send_direct(
    self,
    message: Message,
    *,
    recipient: NotBlankStr,
    ttl_seconds: float | None = None,
) -> None:
    """Send a direct message between two agents.

    Lazily creates a DIRECT channel named ``@{a}:{b}`` (where
    a, b are the sorted agent IDs) and subscribes both agents.

    Args:
        message: The message to send (``message.sender`` is the
            sender).
        recipient: The recipient agent ID.
        ttl_seconds: Optional per-message TTL in seconds.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ValueError: If the recipient does not match
            ``message.to``, an agent ID contains the reserved
            separator, or the message exceeds the payload limit.
    """
    ...

subscribe async

subscribe(channel_name, subscriber_id)

Subscribe an agent to a channel.

Idempotent -- returns a fresh subscription record if already subscribed (the channel's subscriber list is not duplicated).

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to subscribe to.

required
subscriber_id NotBlankStr

Agent ID of the subscriber.

required

Returns:

Type Description
Subscription

The subscription record.

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelNotFoundError

If the channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def subscribe(
    self,
    channel_name: NotBlankStr,
    subscriber_id: NotBlankStr,
) -> Subscription:
    """Subscribe an agent to a channel.

    Idempotent -- returns a fresh subscription record if already
    subscribed (the channel's subscriber list is not duplicated).

    Args:
        channel_name: Channel to subscribe to.
        subscriber_id: Agent ID of the subscriber.

    Returns:
        The subscription record.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelNotFoundError: If the channel does not exist.
    """
    ...

unsubscribe async

unsubscribe(channel_name, subscriber_id)

Remove an agent's subscription from a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to unsubscribe from.

required
subscriber_id NotBlankStr

Agent ID to remove.

required

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

NotSubscribedError

If the agent is not subscribed.

Source code in src/synthorg/communication/bus_protocol.py
async def unsubscribe(
    self,
    channel_name: NotBlankStr,
    subscriber_id: NotBlankStr,
) -> None:
    """Remove an agent's subscription from a channel.

    Args:
        channel_name: Channel to unsubscribe from.
        subscriber_id: Agent ID to remove.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        NotSubscribedError: If the agent is not subscribed.
    """
    ...

receive async

receive(channel_name, subscriber_id, *, timeout=None)

Receive the next message from a channel.

Awaits until a message is available, the timeout expires, or the bus is stopped. When timeout is None, awaits indefinitely (or until shutdown).

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to receive from.

required
subscriber_id NotBlankStr

Agent ID receiving.

required
timeout float | None

Seconds to wait before returning None.

None

Returns:

Type Description
DeliveryEnvelope | None

A delivery envelope, or None on timeout or shutdown.

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelNotFoundError

If the channel does not exist.

NotSubscribedError

If the subscriber is not subscribed (for TOPIC and DIRECT channels).

Source code in src/synthorg/communication/bus_protocol.py
async def receive(
    self,
    channel_name: NotBlankStr,
    subscriber_id: NotBlankStr,
    *,
    timeout: float | None = None,  # noqa: ASYNC109
) -> DeliveryEnvelope | None:
    """Receive the next message from a channel.

    Awaits until a message is available, the timeout expires, or
    the bus is stopped.  When ``timeout`` is ``None``, awaits
    indefinitely (or until shutdown).

    Args:
        channel_name: Channel to receive from.
        subscriber_id: Agent ID receiving.
        timeout: Seconds to wait before returning ``None``.

    Returns:
        A delivery envelope, or ``None`` on timeout or shutdown.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelNotFoundError: If the channel does not exist.
        NotSubscribedError: If the subscriber is not subscribed
            (for TOPIC and DIRECT channels).
    """
    ...

create_channel async

create_channel(channel)

Create a new channel.

Parameters:

Name Type Description Default
channel Channel

Channel definition to create.

required

Returns:

Type Description
Channel

The created channel.

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelAlreadyExistsError

If a channel with that name already exists.

Source code in src/synthorg/communication/bus_protocol.py
async def create_channel(self, channel: Channel) -> Channel:
    """Create a new channel.

    Args:
        channel: Channel definition to create.

    Returns:
        The created channel.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelAlreadyExistsError: If a channel with that name
            already exists.
    """
    ...

get_channel async

get_channel(channel_name)

Get a channel by name.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Name of the channel.

required

Returns:

Type Description
Channel

The channel.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def get_channel(self, channel_name: NotBlankStr) -> Channel:
    """Get a channel by name.

    Args:
        channel_name: Name of the channel.

    Returns:
        The channel.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
    """
    ...

publish_batch async

publish_batch(messages, *, ttl_seconds=None)

Publish multiple messages in a batch.

Optimised for throughput. In NATS backends, uses pipelined async publishes for reduced round-trip overhead.

Parameters:

Name Type Description Default
messages Sequence[Message]

Messages to publish. An empty sequence is a no-op.

required
ttl_seconds float | None

Optional per-message TTL applied to every message in the batch.

None

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelNotFoundError

If any target channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def publish_batch(
    self,
    messages: Sequence[Message],
    *,
    ttl_seconds: float | None = None,
) -> None:
    """Publish multiple messages in a batch.

    Optimised for throughput.  In NATS backends, uses pipelined
    async publishes for reduced round-trip overhead.

    Args:
        messages: Messages to publish.  An empty sequence is a
            no-op.
        ttl_seconds: Optional per-message TTL applied to every
            message in the batch.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelNotFoundError: If any target channel does not exist.
    """
    ...

list_channels async

list_channels()

List all channels.

Returns:

Type Description
tuple[Channel, ...]

All registered channels.

Source code in src/synthorg/communication/bus_protocol.py
async def list_channels(self) -> tuple[Channel, ...]:
    """List all channels.

    Returns:
        All registered channels.
    """
    ...

get_channel_history async

get_channel_history(channel_name, *, limit=None)

Get message history for a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to query.

required
limit int | None

Maximum number of most recent messages to return. Values <= 0 return an empty tuple.

None

Returns:

Type Description
tuple[Message, ...]

Messages in chronological order.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def get_channel_history(
    self,
    channel_name: NotBlankStr,
    *,
    limit: int | None = None,
) -> tuple[Message, ...]:
    """Get message history for a channel.

    Args:
        channel_name: Channel to query.
        limit: Maximum number of most recent messages to return.
            Values ``<= 0`` return an empty tuple.

    Returns:
        Messages in chronological order.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
    """
    ...

Config

config

Communication configuration models (see Communication design page).

MessageRetentionConfig pydantic-model

Bases: BaseModel

Retention settings for channel message history and subscriber delivery.

Attributes:

Name Type Description
max_messages_per_channel int

Maximum messages kept per channel for both in-memory and NATS backends. In-memory backend: collections.deque(maxlen=...). NATS backend: max_msgs_per_subject on the JetStream stream.

max_subscriber_queue_size int

Maximum in-flight envelopes per (channel, subscriber). In-memory backend applies a drop-newest policy with COMM_SUBSCRIBER_QUEUE_OVERFLOW emission when the cap is hit. NATS backend wires this value into ConsumerConfig.max_ack_pending so JetStream pauses delivery to a consumer whose unacked count reaches the cap. The same configuration surface keeps the two backends at parity. Bounded above by :data:_MAX_SUBSCRIBER_QUEUE_SIZE_LIMIT to guard against a misconfiguration or untrusted-config-source DoS exhausting memory at queue creation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

max_messages_per_channel pydantic-field

max_messages_per_channel = 1000

Maximum messages kept per channel

max_subscriber_queue_size pydantic-field

max_subscriber_queue_size = 1024

Maximum in-flight envelopes per (channel, subscriber). In-memory bus: hard cap with drop-newest policy. NATS: ConsumerConfig.max_ack_pending.

NatsConfig pydantic-model

Bases: BaseModel

NATS JetStream backend configuration.

Only applicable when MessageBusConfig.backend == NATS. See docs/design/distributed-runtime.md for stream layout and subject naming.

Attributes:

Name Type Description
url NotBlankStr

NATS server URL (e.g. nats://localhost:4222).

credentials_path str | None

Optional path to a credentials file for secured clusters (creds file or jwt+seed).

stream_name_prefix NotBlankStr

Prefix for JetStream stream names. The bus stream is <prefix>_BUS and the KV bucket for dynamic channels is <prefix>_BUS_CHANNELS.

connect_timeout_seconds float

Seconds to wait for the initial connection before raising.

reconnect_time_wait_seconds float

Seconds between reconnect attempts.

max_reconnect_attempts int

Maximum reconnect attempts before giving up (-1 for unlimited).

publish_ack_wait_seconds float

Seconds to wait for a JetStream publish ack before considering the publish failed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_urlurl

url pydantic-field

url = 'nats://localhost:4222'

NATS server URL

credentials_path pydantic-field

credentials_path = None

Optional credentials file path

stream_name_prefix pydantic-field

stream_name_prefix = 'SYNTHORG'

Prefix for JetStream stream names

connect_timeout_seconds pydantic-field

connect_timeout_seconds = 5.0

Initial connect timeout

reconnect_time_wait_seconds pydantic-field

reconnect_time_wait_seconds = 2.0

Seconds between reconnect attempts

max_reconnect_attempts pydantic-field

max_reconnect_attempts = -1

Max reconnect attempts (-1 = unlimited)

publish_ack_wait_seconds pydantic-field

publish_ack_wait_seconds = 5.0

JetStream publish ack wait

health_flush_timeout_seconds pydantic-field

health_flush_timeout_seconds = 2.0

Timeout in seconds for the NATS health-check flush probe. Deliberately tight so a stuck probe fails fast rather than blocking the /healthz endpoint.

MessageBusConfig pydantic-model

Bases: BaseModel

Message bus backend configuration.

Maps to the Communication design page message_bus.

Attributes:

Name Type Description
backend MessageBusBackend

Transport backend to use.

channels tuple[NotBlankStr, ...]

Pre-defined channel names.

retention MessageRetentionConfig

Message retention settings.

nats NatsConfig | None

NATS-specific configuration (required when backend == NATS, ignored otherwise).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_channels
  • _validate_backend_config

backend pydantic-field

backend = INTERNAL

Transport backend

channels pydantic-field

channels = _DEFAULT_CHANNELS

Pre-defined channel names

retention pydantic-field

retention

Message retention settings

nats pydantic-field

nats = None

NATS-specific configuration (required when backend=nats)

MeetingTypeConfig pydantic-model

Bases: BaseModel

Configuration for a single meeting type.

Maps to the Communication design page meetings.types[]. Exactly one of frequency or trigger must be set.

Attributes:

Name Type Description
name NotBlankStr

Meeting type name (e.g. "daily_standup").

frequency MeetingFrequency | None

Recurrence schedule (mutually exclusive with trigger).

trigger NotBlankStr | None

Event trigger (mutually exclusive with frequency).

participants tuple[NotBlankStr, ...]

Participant role or agent identifiers.

duration_tokens int

Token budget for the meeting.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_frequency_or_trigger
  • _validate_participants

name pydantic-field

name

Meeting type name

frequency pydantic-field

frequency = None

Recurrence schedule

trigger pydantic-field

trigger = None

Event trigger

participants pydantic-field

participants = ()

Participant role or agent identifiers

duration_tokens pydantic-field

duration_tokens = 2000

Token budget for the meeting

protocol_config pydantic-field

protocol_config

Meeting protocol configuration

min_interval_seconds pydantic-field

min_interval_seconds = None

Minimum seconds between event-triggered meetings of this type

MeetingsConfig pydantic-model

Bases: BaseModel

Meetings subsystem configuration.

Maps to the Communication design page meetings.

Attributes:

Name Type Description
enabled bool

Whether the meetings subsystem is active.

types tuple[MeetingTypeConfig, ...]

Configured meeting types (unique by name).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors
  • _validate_unique_meeting_names

enabled pydantic-field

enabled = True

Meetings subsystem active

types pydantic-field

types = ()

Configured meeting types

HierarchyConfig pydantic-model

Bases: BaseModel

Hierarchy enforcement configuration.

Maps to the Communication design page hierarchy.

Attributes:

Name Type Description
enforce_chain_of_command bool

Whether chain-of-command is enforced.

allow_skip_level bool

Whether skip-level messaging is allowed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

enforce_chain_of_command pydantic-field

enforce_chain_of_command = True

Enforce chain-of-command

allow_skip_level pydantic-field

allow_skip_level = False

Allow skip-level messaging

RateLimitConfig pydantic-model

Bases: BaseModel

Per-pair message rate limit configuration.

Maps to the Communication design page rate_limit.

Attributes:

Name Type Description
max_per_pair_per_minute int

Maximum messages per agent pair per minute.

burst_allowance int

Extra burst capacity above the rate limit.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

max_per_pair_per_minute pydantic-field

max_per_pair_per_minute = 10

Max messages per agent pair per minute

burst_allowance pydantic-field

burst_allowance = 3

Extra burst capacity

CircuitBreakerConfig pydantic-model

Bases: BaseModel

Circuit breaker configuration for agent-pair communication.

Maps to the Communication design page circuit_breaker.

Attributes:

Name Type Description
bounce_threshold int

Bounce count before the circuit opens.

cooldown_seconds int

Seconds to wait before retrying after trip.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_cooldown_bounds

bounce_threshold pydantic-field

bounce_threshold = 3

Bounce count before circuit opens

cooldown_seconds pydantic-field

cooldown_seconds = 300

Cooldown period in seconds

max_cooldown_seconds pydantic-field

max_cooldown_seconds = 3600

Maximum cooldown period in seconds (caps exponential backoff)

LoopPreventionConfig pydantic-model

Bases: BaseModel

Loop prevention safeguards.

Maps to the Communication design page. ancestry_tracking is always on and cannot be disabled.

Attributes:

Name Type Description
max_delegation_depth int

Hard limit on delegation chain length.

rate_limit RateLimitConfig

Per-pair rate limit settings.

dedup_window_seconds int

Deduplication window in seconds.

circuit_breaker CircuitBreakerConfig

Circuit breaker settings.

ancestry_tracking Literal[True]

Must always be True.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

max_delegation_depth pydantic-field

max_delegation_depth = 5

Hard limit on delegation chain length

rate_limit pydantic-field

rate_limit

Per-pair rate limit settings

dedup_window_seconds pydantic-field

dedup_window_seconds = 60

Deduplication window in seconds

circuit_breaker pydantic-field

circuit_breaker

Circuit breaker settings

ancestry_tracking pydantic-field

ancestry_tracking = True

Task ancestry tracking (always on, not configurable)

CommunicationConfig pydantic-model

Bases: BaseModel

Top-level communication configuration.

Aggregates the Communication design page sections under a single model.

Attributes:

Name Type Description
default_pattern CommunicationPattern

High-level communication pattern.

message_bus MessageBusConfig

Message bus configuration.

meetings MeetingsConfig

Meetings subsystem configuration.

hierarchy HierarchyConfig

Hierarchy enforcement settings.

loop_prevention LoopPreventionConfig

Loop prevention safeguards.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

default_pattern pydantic-field

default_pattern = HYBRID

High-level communication pattern

message_bus pydantic-field

message_bus

Message bus configuration

meetings pydantic-field

meetings

Meetings subsystem configuration

hierarchy pydantic-field

hierarchy

Hierarchy enforcement settings

loop_prevention pydantic-field

loop_prevention

Loop prevention safeguards

conflict_resolution pydantic-field

conflict_resolution

Conflict resolution configuration (see Communication design page)

Errors

errors

Communication error hierarchy (see Communication design page).

All communication errors carry an immutable context mapping for structured metadata, following the same pattern as ToolError.

CommunicationError

CommunicationError(message, *, context=None)

Bases: DomainError

Base exception for all communication-layer errors.

Attributes:

Name Type Description
message

Human-readable error description.

context MappingProxyType[str, Any]

Immutable metadata about the error.

Class Attributes

status_code: HTTP 500 default. error_code: COMMUNICATION_ERROR. error_category: INTERNAL. retryable: False. default_message: Generic 5xx-safe message.

Initialize a communication error.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
context dict[str, Any] | None

Arbitrary metadata about the error. Stored as an immutable mapping; defaults to empty if not provided.

None
Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

__str__

__str__()

Format error with optional context metadata.

Source code in src/synthorg/communication/errors.py
def __str__(self) -> str:
    """Format error with optional context metadata."""
    if self.context:
        ctx = ", ".join(f"{k}={v!r}" for k, v in self.context.items())
        return f"{self.message} ({ctx})"
    return self.message

ChannelNotFoundError

ChannelNotFoundError(message, *, context=None)

Bases: CommunicationError

Requested channel does not exist.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ChannelAlreadyExistsError

ChannelAlreadyExistsError(message, *, context=None)

Bases: CommunicationError

Channel with the given name already exists.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

NotSubscribedError

NotSubscribedError(message, *, context=None)

Bases: CommunicationError

Agent is not subscribed to the specified channel.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

MessageBusNotRunningError

MessageBusNotRunningError(message, *, context=None)

Bases: CommunicationError

Operation attempted on a message bus that is not running.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

MessageBusAlreadyRunningError

MessageBusAlreadyRunningError(message, *, context=None)

Bases: CommunicationError

start() called on a message bus that is already running.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationError

DelegationError(message, *, context=None)

Bases: CommunicationError

Base exception for delegation-related errors.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationAuthorityError

DelegationAuthorityError(message, *, context=None)

Bases: DelegationError

Delegator lacks authority to delegate to the target agent.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationLoopError

DelegationLoopError(message, *, context=None)

Bases: DelegationError

Base for loop prevention mechanism rejections.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationDepthError

DelegationDepthError(message, *, context=None)

Bases: DelegationLoopError

Delegation chain exceeds maximum depth.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationAncestryError

DelegationAncestryError(message, *, context=None)

Bases: DelegationLoopError

Delegation would create a cycle in the task ancestry.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationRateLimitError

DelegationRateLimitError(message, *, context=None)

Bases: DelegationLoopError

Delegation rate limit exceeded for agent pair.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationCircuitOpenError

DelegationCircuitOpenError(message, *, context=None)

Bases: DelegationLoopError

Circuit breaker is open for agent pair.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationDuplicateError

DelegationDuplicateError(message, *, context=None)

Bases: DelegationLoopError

Duplicate delegation detected within dedup window.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

HierarchyResolutionError

HierarchyResolutionError(message, *, context=None)

Bases: CommunicationError

Error resolving organizational hierarchy.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ConflictResolutionError

ConflictResolutionError(message, *, context=None)

Bases: CommunicationError

Base exception for conflict resolution errors.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ConflictStrategyError

ConflictStrategyError(message, *, context=None)

Bases: ConflictResolutionError

Error within a conflict resolution strategy.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ConflictHierarchyError

ConflictHierarchyError(message, *, context=None)

Bases: ConflictResolutionError

No common manager found for cross-department conflict.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

EscalationDecisionError

EscalationDecisionError(message, *, context=None)

Bases: ConflictResolutionError

A human-supplied escalation decision cannot be applied.

Concrete subclasses distinguish the failure mode: the decision shape is not accepted by the active processor variant (:class:EscalationDecisionShapeError) or the decision references an agent outside the conflict (:class:EscalationDecisionAgentError).

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

EscalationDecisionShapeError

EscalationDecisionShapeError(message, *, context=None)

Bases: EscalationDecisionError

The decision variant is not accepted by the active processor.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

EscalationDecisionAgentError

EscalationDecisionAgentError(message, *, context=None)

Bases: EscalationDecisionError

A winner decision references an agent outside the conflict.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

Delegation

models

Delegation request, result, and audit trail models.

DelegationRequest pydantic-model

Bases: BaseModel

Request to delegate a task down the hierarchy.

Attributes:

Name Type Description
delegator_id NotBlankStr

Agent ID of the delegator.

delegatee_id NotBlankStr

Agent ID of the target agent.

task Task

The task to delegate.

refinement str

Additional context from the delegator.

constraints tuple[NotBlankStr, ...]

Extra constraints for the delegatee.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_self_delegation

delegator_id pydantic-field

delegator_id

Agent ID of the delegator

delegatee_id pydantic-field

delegatee_id

Agent ID of the target agent

task pydantic-field

task

Task to delegate

refinement pydantic-field

refinement = ''

Additional context from the delegator

constraints pydantic-field

constraints = ()

Extra constraints for the delegatee

entity_versions pydantic-field

entity_versions = None

Delegator's known entity version manifest

DelegationResult pydantic-model

Bases: BaseModel

Outcome of a delegation attempt.

Attributes:

Name Type Description
success bool

Whether the delegation succeeded.

delegated_task Task | None

The sub-task created, if successful.

rejection_reason str | None

Reason for rejection, if unsuccessful.

blocked_by NotBlankStr | None

Mechanism name that blocked, if applicable.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_success_consistency

success pydantic-field

success

Whether delegation succeeded

delegated_task pydantic-field

delegated_task = None

Sub-task created on success

rejection_reason pydantic-field

rejection_reason = None

Reason for rejection

blocked_by pydantic-field

blocked_by = None

Mechanism name that blocked delegation

DelegationRecord pydantic-model

Bases: BaseModel

Audit trail entry for a completed delegation.

Attributes:

Name Type Description
delegation_id NotBlankStr

Unique delegation identifier.

delegator_id NotBlankStr

Agent ID of the delegator.

delegatee_id NotBlankStr

Agent ID of the delegatee.

original_task_id NotBlankStr

ID of the original task.

delegated_task_id NotBlankStr

ID of the created sub-task.

timestamp AwareDatetime

When the delegation occurred.

refinement str

Context provided by the delegator.

entity_versions Mapping[str, int] | None

Entity version manifest at delegation time.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

delegation_id pydantic-field

delegation_id

Unique delegation identifier

delegator_id pydantic-field

delegator_id

Delegator agent ID

delegatee_id pydantic-field

delegatee_id

Delegatee agent ID

original_task_id pydantic-field

original_task_id

Original task ID

delegated_task_id pydantic-field

delegated_task_id

Created sub-task ID

timestamp pydantic-field

timestamp

When delegation occurred

refinement pydantic-field

refinement = ''

Context provided by delegator

entity_versions pydantic-field

entity_versions = None

Entity version manifest at delegation time

service

Delegation service orchestrating hierarchy, authority, and loop prevention.

DelegationService

DelegationService(
    *, hierarchy, authority_validator, guard, record_store=None, entity_guard=None
)

Orchestrates hierarchical delegation with loop prevention.

Validates authority, checks loop prevention guards, creates sub-tasks, and records audit trail entries. The core logic is synchronous (CPU-only); messaging is a separate async concern.

Parameters:

Name Type Description Default
hierarchy HierarchyResolver

Resolved organizational hierarchy.

required
authority_validator AuthorityValidator

Authority validation logic.

required
guard DelegationGuard

Loop prevention guard.

required
record_store DelegationRecordStore | None

Optional delegation record store for activity tracking.

None
Source code in src/synthorg/communication/delegation/service.py
def __init__(
    self,
    *,
    hierarchy: HierarchyResolver,
    authority_validator: AuthorityValidator,
    guard: DelegationGuard,
    record_store: DelegationRecordStore | None = None,
    entity_guard: EntityAlignmentGuard | None = None,
) -> None:
    self._hierarchy = hierarchy
    self._authority_validator = authority_validator
    self._guard = guard
    self._record_store = record_store
    self._entity_guard = entity_guard
    self._audit_trail: list[DelegationRecord] = []

delegate async

delegate(request, delegator, delegatee)

Execute a delegation: authority, loops, sub-task, audit.

Parameters:

Name Type Description Default
request DelegationRequest

The delegation request.

required
delegator AgentIdentity

Identity of the delegating agent.

required
delegatee AgentIdentity

Identity of the target agent.

required

Returns:

Type Description
DelegationResult

Result indicating success or rejection with reason.

Raises:

Type Description
ValueError

If request IDs do not match identity objects.

DelegationError

If sub-task construction fails.

Source code in src/synthorg/communication/delegation/service.py
async def delegate(
    self,
    request: DelegationRequest,
    delegator: AgentIdentity,
    delegatee: AgentIdentity,
) -> DelegationResult:
    """Execute a delegation: authority, loops, sub-task, audit.

    Args:
        request: The delegation request.
        delegator: Identity of the delegating agent.
        delegatee: Identity of the target agent.

    Returns:
        Result indicating success or rejection with reason.

    Raises:
        ValueError: If request IDs do not match identity objects.
        DelegationError: If sub-task construction fails.
    """
    self._validate_identity(request, delegator, delegatee)

    logger.info(
        DELEGATION_REQUESTED,
        delegator=request.delegator_id,
        delegatee=request.delegatee_id,
        task_id=request.task.id,
    )

    # 1. Authority check
    auth_result = self._authority_validator.validate(delegator, delegatee)
    if not auth_result.allowed:
        return DelegationResult(
            success=False,
            rejection_reason=auth_result.reason,
            blocked_by="authority",
        )

    # 2. Loop prevention checks
    guard_outcome = self._guard.check(
        delegation_chain=request.task.delegation_chain,
        delegator_id=request.delegator_id,
        delegatee_id=request.delegatee_id,
        task_id=request.task.id,
    )
    if not guard_outcome.passed:
        self._escalate_loop_detection(request, guard_outcome.mechanism)
        return DelegationResult(
            success=False,
            rejection_reason=guard_outcome.message,
            blocked_by=guard_outcome.mechanism,
        )

    # 3. Entity alignment guard (async)
    entity_versions: Mapping[str, int] | None = None
    if self._entity_guard is not None:
        guard_result = await self._entity_guard.check(request)
        entity_versions = guard_result.entity_versions
        if not guard_result.passed:
            return DelegationResult(
                success=False,
                rejection_reason=guard_result.message,
                blocked_by=guard_result.mechanism,
            )

    # 4. Create sub-task and record
    sub_task = self._create_sub_task(request)
    self._record_delegation(
        request,
        sub_task,
        entity_versions=entity_versions,
    )

    return DelegationResult(success=True, delegated_task=sub_task)

reject_delegated_task staticmethod

reject_delegated_task(task)

Transition a CREATED task to REJECTED.

Used when a delegatee explicitly refuses a task that was already created for them. The task must be in CREATED status.

Parameters:

Name Type Description Default
task Task

The task to reject (must be in CREATED status).

required

Returns:

Type Description
Task

A new Task in REJECTED status.

Raises:

Type Description
ValueError

If the task is not in CREATED status.

Source code in src/synthorg/communication/delegation/service.py
@staticmethod
def reject_delegated_task(task: Task) -> Task:
    """Transition a CREATED task to REJECTED.

    Used when a delegatee explicitly refuses a task that was
    already created for them. The task must be in CREATED status.

    Args:
        task: The task to reject (must be in CREATED status).

    Returns:
        A new Task in REJECTED status.

    Raises:
        ValueError: If the task is not in CREATED status.
    """
    return task.with_transition(TaskStatus.REJECTED)

get_audit_trail

get_audit_trail()

Return all delegation audit records.

Returns:

Type Description
tuple[DelegationRecord, ...]

Tuple of delegation records in chronological order.

Source code in src/synthorg/communication/delegation/service.py
def get_audit_trail(self) -> tuple[DelegationRecord, ...]:
    """Return all delegation audit records.

    Returns:
        Tuple of delegation records in chronological order.
    """
    return tuple(self._audit_trail)

get_supervisor_of

get_supervisor_of(agent_name)

Expose hierarchy lookup for escalation callers.

Parameters:

Name Type Description Default
agent_name str

Agent name to look up.

required

Returns:

Type Description
str | None

Supervisor name or None if at the top.

Source code in src/synthorg/communication/delegation/service.py
def get_supervisor_of(self, agent_name: str) -> str | None:
    """Expose hierarchy lookup for escalation callers.

    Args:
        agent_name: Agent name to look up.

    Returns:
        Supervisor name or None if at the top.
    """
    return self._hierarchy.get_supervisor(agent_name)

authority

Authority validation for hierarchical delegation.

AuthorityCheckResult pydantic-model

Bases: BaseModel

Result of an authority validation check.

Attributes:

Name Type Description
allowed bool

Whether the delegation is authorized.

reason str

Explanation (empty on success).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_allowed_reason

allowed pydantic-field

allowed

Whether delegation is allowed

reason pydantic-field

reason = ''

Explanation

AuthorityValidator

AuthorityValidator(hierarchy, hierarchy_config)

Validates delegation authority using hierarchy and role permissions.

Checks
  1. Hierarchy: delegatee must be a subordinate of delegator (direct or skip-level depending on config).
  2. Roles: if delegator.authority.can_delegate_to is non-empty, delegatee.role must be in it; if empty, all roles are permitted.

Parameters:

Name Type Description Default
hierarchy HierarchyResolver

Resolved org hierarchy.

required
hierarchy_config HierarchyConfig

Hierarchy enforcement configuration.

required
Source code in src/synthorg/communication/delegation/authority.py
def __init__(
    self,
    hierarchy: HierarchyResolver,
    hierarchy_config: HierarchyConfig,
) -> None:
    self._hierarchy = hierarchy
    self._config = hierarchy_config

validate

validate(delegator, delegatee)

Validate whether delegator can delegate to delegatee.

Parameters:

Name Type Description Default
delegator AgentIdentity

Identity of the delegating agent.

required
delegatee AgentIdentity

Identity of the target agent.

required

Returns:

Type Description
AuthorityCheckResult

Result indicating whether delegation is authorized.

Source code in src/synthorg/communication/delegation/authority.py
def validate(
    self,
    delegator: AgentIdentity,
    delegatee: AgentIdentity,
) -> AuthorityCheckResult:
    """Validate whether delegator can delegate to delegatee.

    Args:
        delegator: Identity of the delegating agent.
        delegatee: Identity of the target agent.

    Returns:
        Result indicating whether delegation is authorized.
    """
    if self._config.enforce_chain_of_command:
        result = self._check_hierarchy(delegator, delegatee)
        if not result.allowed:
            return result

    result = self._check_role_permissions(delegator, delegatee)
    if not result.allowed:
        return result

    logger.info(
        DELEGATION_AUTHORIZED,
        delegator=delegator.name,
        delegatee=delegatee.name,
    )
    return AuthorityCheckResult(allowed=True)

Loop Prevention

guard

Delegation guard orchestrating all loop prevention mechanisms.

DelegationGuard

DelegationGuard(config, *, circuit_breaker_repo=None)

Orchestrates all loop prevention mechanisms.

Checks run in order: ancestry, depth, dedup, rate_limit, circuit_breaker. Short-circuits on the first failure.

Parameters:

Name Type Description Default
config LoopPreventionConfig

Loop prevention configuration.

required
circuit_breaker_repo CircuitBreakerStateRepository | None

Optional persistence repository for circuit breaker state (survives restarts).

None
Source code in src/synthorg/communication/loop_prevention/guard.py
def __init__(
    self,
    config: LoopPreventionConfig,
    *,
    circuit_breaker_repo: CircuitBreakerStateRepository | None = None,
) -> None:
    self._config = config
    self._deduplicator = DelegationDeduplicator(
        window_seconds=config.dedup_window_seconds,
    )
    self._rate_limiter = DelegationRateLimiter(config.rate_limit)
    self._circuit_breaker = DelegationCircuitBreaker(
        config.circuit_breaker,
        state_repo=circuit_breaker_repo,
    )

check

check(delegation_chain, delegator_id, delegatee_id, task_id)

Run all loop prevention checks.

Returns the first failing outcome, or a success outcome if all checks pass.

Parameters:

Name Type Description Default
delegation_chain tuple[str, ...]

Current delegation ancestry.

required
delegator_id str

ID of the delegating agent.

required
delegatee_id str

ID of the proposed delegatee.

required
task_id str

Unique ID of the task being delegated.

required

Returns:

Type Description
GuardCheckOutcome

First failing outcome or an all-passed success.

Source code in src/synthorg/communication/loop_prevention/guard.py
def check(
    self,
    delegation_chain: tuple[str, ...],
    delegator_id: str,
    delegatee_id: str,
    task_id: str,
) -> GuardCheckOutcome:
    """Run all loop prevention checks.

    Returns the first failing outcome, or a success outcome if
    all checks pass.

    Args:
        delegation_chain: Current delegation ancestry.
        delegator_id: ID of the delegating agent.
        delegatee_id: ID of the proposed delegatee.
        task_id: Unique ID of the task being delegated.

    Returns:
        First failing outcome or an all-passed success.
    """
    # Pure (stateless) checks first -- sequential to short-circuit
    outcome = check_ancestry(delegation_chain, delegatee_id)
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    outcome = check_delegation_depth(
        delegation_chain,
        self._config.max_delegation_depth,
    )
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    # Stateful checks -- only run if pure checks passed
    outcome = self._deduplicator.check(
        delegator_id,
        delegatee_id,
        task_id,
    )
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    outcome = self._rate_limiter.check(delegator_id, delegatee_id)
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    outcome = self._circuit_breaker.check(delegator_id, delegatee_id)
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)
    return GuardCheckOutcome(
        passed=True,
        mechanism=_SUCCESS_MECHANISM,
    )

record_delegation

record_delegation(delegator_id, delegatee_id, task_id)

Record a successful delegation in all stateful mechanisms.

Each delegation between a pair contributes to the circuit breaker bounce count. Back-and-forth patterns (A->B then B->A) both increment the same counter because the pair key is direction- agnostic, so repeated ping-pong will trip the breaker fastest.

Parameters:

Name Type Description Default
delegator_id str

ID of the delegating agent.

required
delegatee_id str

ID of the target agent.

required
task_id str

Unique ID of the delegated task.

required
Source code in src/synthorg/communication/loop_prevention/guard.py
def record_delegation(
    self,
    delegator_id: str,
    delegatee_id: str,
    task_id: str,
) -> None:
    """Record a successful delegation in all stateful mechanisms.

    Each delegation between a pair contributes to the circuit breaker
    bounce count.  Back-and-forth patterns (A->B then B->A) both
    increment the same counter because the pair key is direction-
    agnostic, so repeated ping-pong will trip the breaker fastest.

    Args:
        delegator_id: ID of the delegating agent.
        delegatee_id: ID of the target agent.
        task_id: Unique ID of the delegated task.
    """
    self._deduplicator.record(delegator_id, delegatee_id, task_id)
    self._rate_limiter.record(delegator_id, delegatee_id)
    self._circuit_breaker.record_delegation(delegator_id, delegatee_id)

load_state async

load_state()

Load persisted circuit breaker state from the repository.

Called once at startup. No-op if no repository is configured.

Source code in src/synthorg/communication/loop_prevention/guard.py
async def load_state(self) -> None:
    """Load persisted circuit breaker state from the repository.

    Called once at startup. No-op if no repository is configured.
    """
    await self._circuit_breaker.load_state()

persist async

persist()

Flush dirty circuit breaker state to the repository.

Best-effort. No-op if no repository is configured.

Source code in src/synthorg/communication/loop_prevention/guard.py
async def persist(self) -> None:
    """Flush dirty circuit breaker state to the repository.

    Best-effort. No-op if no repository is configured.
    """
    await self._circuit_breaker.persist_dirty()

models

Loop prevention check outcome model.

GuardCheckOutcome pydantic-model

Bases: BaseModel

Result of a single loop prevention check.

Attributes:

Name Type Description
passed bool

Whether the check passed (delegation allowed).

mechanism NotBlankStr

Name of the mechanism that produced this outcome.

message str

Human-readable detail (empty on success).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_passed_message

passed pydantic-field

passed

Whether the check passed

mechanism pydantic-field

mechanism

Mechanism name (e.g. 'max_depth', 'ancestry')

message pydantic-field

message = ''

Human-readable detail

Conflict Resolution

protocol

Conflict resolution protocol interfaces (see Communication design page).

Defines the pluggable strategy interface that varies per resolution approach (resolve + build_dissent_records). Detection logic lives on the service, not the protocol, because it is strategy-agnostic.

JudgeDecision

Bases: NamedTuple

Result of an LLM-based judge evaluation.

Attributes:

Name Type Description
winning_agent_id str

Agent whose position was chosen.

reasoning str

Explanation for the decision.

ConflictResolver

Bases: Protocol

Protocol for conflict resolution strategies.

Each strategy implements resolve (async, may need LLM calls) and build_dissent_records (sync, builds audit artifacts for every overruled position).

resolve async

resolve(conflict)

Resolve a conflict and produce a decision.

Parameters:

Name Type Description Default
conflict Conflict

The conflict to resolve.

required

Returns:

Type Description
ConflictResolution

Resolution decision.

Source code in src/synthorg/communication/conflict_resolution/protocol.py
async def resolve(self, conflict: Conflict) -> ConflictResolution:
    """Resolve a conflict and produce a decision.

    Args:
        conflict: The conflict to resolve.

    Returns:
        Resolution decision.
    """
    ...

build_dissent_records

build_dissent_records(conflict, resolution)

Build audit records for all losing positions.

For N-party conflicts, produces one DissentRecord per overruled agent. For escalated conflicts, produces one record per position (all are pending human review).

Parameters:

Name Type Description Default
conflict Conflict

The original conflict.

required
resolution ConflictResolution

The resolution decision.

required

Returns:

Type Description
tuple[DissentRecord, ...]

Dissent records preserving every overruled reasoning.

Source code in src/synthorg/communication/conflict_resolution/protocol.py
def build_dissent_records(
    self,
    conflict: Conflict,
    resolution: ConflictResolution,
) -> tuple[DissentRecord, ...]:
    """Build audit records for all losing positions.

    For N-party conflicts, produces one ``DissentRecord`` per
    overruled agent.  For escalated conflicts, produces one
    record per position (all are pending human review).

    Args:
        conflict: The original conflict.
        resolution: The resolution decision.

    Returns:
        Dissent records preserving every overruled reasoning.
    """
    ...

JudgeEvaluator

Bases: Protocol

Protocol for LLM-based judge evaluation.

Used by debate and hybrid strategies. When absent, strategies fall back to authority-based judging.

evaluate async

evaluate(conflict, judge_agent_id)

Evaluate conflict positions and pick a winner.

Parameters:

Name Type Description Default
conflict Conflict

The conflict with agent positions.

required
judge_agent_id NotBlankStr

The agent acting as judge.

required

Returns:

Type Description
JudgeDecision

Decision containing the winning agent ID and reasoning.

Source code in src/synthorg/communication/conflict_resolution/protocol.py
async def evaluate(
    self,
    conflict: Conflict,
    judge_agent_id: NotBlankStr,
) -> JudgeDecision:
    """Evaluate conflict positions and pick a winner.

    Args:
        conflict: The conflict with agent positions.
        judge_agent_id: The agent acting as judge.

    Returns:
        Decision containing the winning agent ID and reasoning.
    """
    ...

config

Conflict resolution configuration models (see Communication design page).

DebateConfig pydantic-model

Bases: BaseModel

Configuration for the structured debate strategy.

Attributes:

Name Type Description
judge NotBlankStr

Judge selection -- "shared_manager" (lowest common manager), "ceo" (hierarchy root), or a named agent.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

judge pydantic-field

judge = 'shared_manager'

Judge selection: "shared_manager", "ceo", or agent name

HybridConfig pydantic-model

Bases: BaseModel

Configuration for the hybrid resolution strategy.

Attributes:

Name Type Description
review_agent NotBlankStr

Agent tasked with reviewing positions.

escalate_on_ambiguity bool

Whether to escalate to human when the review result is ambiguous.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

review_agent pydantic-field

review_agent = 'conflict_reviewer'

Agent tasked with reviewing positions

escalate_on_ambiguity pydantic-field

escalate_on_ambiguity = True

Escalate to human when ambiguous

ConflictResolutionConfig pydantic-model

Bases: BaseModel

Top-level conflict resolution configuration.

Attributes:

Name Type Description
strategy ConflictResolutionStrategy

Default resolution strategy.

debate DebateConfig

Configuration for the debate strategy.

hybrid HybridConfig

Configuration for the hybrid strategy.

escalation EscalationQueueConfig

Configuration for the human escalation queue.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

strategy pydantic-field

strategy = AUTHORITY

Default resolution strategy

debate pydantic-field

debate

Debate strategy configuration

hybrid pydantic-field

hybrid

Hybrid strategy configuration

escalation pydantic-field

escalation

Human escalation queue configuration

models

Conflict resolution domain models (see Communication design page).

All models are frozen Pydantic v2 with NotBlankStr identifiers, following the patterns established in delegation/models.py.

ConflictResolutionOutcome

Bases: StrEnum

Outcome of a conflict resolution attempt.

Members

RESOLVED_BY_AUTHORITY: Decided by seniority/hierarchy. RESOLVED_BY_DEBATE: Decided by structured debate + judge. RESOLVED_BY_HYBRID: Decided by hybrid review process. RESOLVED_BY_HUMAN: Human operator picked a winning position via the escalation queue. REJECTED_BY_HUMAN: Human operator rejected all positions via the escalation queue. ESCALATED_TO_HUMAN: Escalation pending or timed out with no human decision collected.

ConflictPosition pydantic-model

Bases: BaseModel

One agent's stance in a conflict.

Attributes:

Name Type Description
agent_id NotBlankStr

Identifier of the agent taking the position.

agent_department NotBlankStr

Department the agent belongs to.

agent_level SeniorityLevel

Seniority level of the agent.

position NotBlankStr

Summary of the agent's stance.

reasoning NotBlankStr

Detailed justification for the position.

timestamp AwareDatetime

When the position was stated.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_id pydantic-field

agent_id

Agent taking the position

agent_department pydantic-field

agent_department

Agent's department

agent_level pydantic-field

agent_level

Agent seniority level

position pydantic-field

position

Summary of the stance

reasoning pydantic-field

reasoning

Justification for the position

timestamp pydantic-field

timestamp

When position was stated

Conflict pydantic-model

Bases: BaseModel

A dispute between two or more agents.

Attributes:

Name Type Description
id NotBlankStr

Unique conflict identifier (e.g. "conflict-a1b2c3d4e5f6").

type ConflictType

Category of the conflict.

task_id NotBlankStr | None

Related task, if any.

subject NotBlankStr

Brief description of the dispute.

positions tuple[ConflictPosition, ...]

Agent positions (minimum 2, unique agent IDs).

detected_at AwareDatetime

When the conflict was detected.

is_cross_department bool

Whether agents span multiple departments (computed from positions).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_positions

id pydantic-field

id

Unique conflict identifier

type pydantic-field

type

Conflict category

task_id pydantic-field

task_id = None

Related task ID

subject pydantic-field

subject

Brief dispute description

positions pydantic-field

positions

Agent positions (min 2)

detected_at pydantic-field

detected_at

Detection timestamp

is_cross_department property

is_cross_department

Whether the conflict spans multiple departments.

ConflictResolution pydantic-model

Bases: BaseModel

Decision produced by a conflict resolution strategy.

Attributes:

Name Type Description
conflict_id NotBlankStr

ID of the resolved conflict.

outcome ConflictResolutionOutcome

How the conflict was resolved.

winning_agent_id NotBlankStr | None

Agent whose position was chosen (None if escalated).

winning_position NotBlankStr | None

The winning position text (None if escalated).

decided_by NotBlankStr

Entity that made the decision (agent name or "human").

reasoning NotBlankStr

Explanation for the decision.

resolved_at AwareDatetime

When the resolution was produced.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_outcome_consistency

conflict_id pydantic-field

conflict_id

Resolved conflict ID

outcome pydantic-field

outcome

Resolution outcome

winning_agent_id pydantic-field

winning_agent_id = None

Winning agent (None if escalated)

winning_position pydantic-field

winning_position = None

Winning position text (None if escalated)

decided_by pydantic-field

decided_by

Decision maker

reasoning pydantic-field

reasoning

Decision explanation

resolved_at pydantic-field

resolved_at

Resolution timestamp

DissentRecord pydantic-model

Bases: BaseModel

Audit artifact for a resolved conflict.

Preserves the losing agent's reasoning for organizational learning.

Attributes:

Name Type Description
id NotBlankStr

Unique dissent record identifier.

conflict Conflict

The original conflict.

resolution ConflictResolution

The resolution decision.

dissenting_agent_id NotBlankStr

Agent whose position was overruled.

dissenting_position NotBlankStr

The overruled position text.

strategy_used ConflictResolutionStrategy

Strategy that was used.

timestamp AwareDatetime

When the record was created.

metadata tuple[tuple[NotBlankStr, NotBlankStr], ...]

Extra key-value metadata pairs.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_dissent_consistency

id pydantic-field

id

Unique dissent record ID

conflict pydantic-field

conflict

Original conflict

resolution pydantic-field

resolution

Resolution decision

dissenting_agent_id pydantic-field

dissenting_agent_id

Agent whose position was overruled

dissenting_position pydantic-field

dissenting_position

Overruled position text

strategy_used pydantic-field

strategy_used

Strategy that resolved the conflict

timestamp pydantic-field

timestamp

Record creation timestamp

metadata pydantic-field

metadata = ()

Extra key-value metadata pairs

DissentPayload pydantic-model

Bases: BaseModel

Typed payload for DISSENT bus messages.

Extracts the key fields from a DissentRecord into a structured, serializable payload suitable for the internal message bus and SSE event stream.

Attributes:

Name Type Description
dissent_id NotBlankStr

Unique dissent record identifier.

conflict_id NotBlankStr

Identifier of the originating conflict.

dissenting_agent_id NotBlankStr

Agent whose position was overruled.

conflict_type ConflictType

Type of the originating conflict.

strategy_used ConflictResolutionStrategy

Resolution strategy that was applied.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

dissent_id pydantic-field

dissent_id

Dissent record ID

conflict_id pydantic-field

conflict_id

Originating conflict ID

dissenting_agent_id pydantic-field

dissenting_agent_id

Agent whose position was overruled

conflict_type pydantic-field

conflict_type

Conflict type

strategy_used pydantic-field

strategy_used

Resolution strategy

from_record classmethod

from_record(record)

Build a payload from a dissent record.

Parameters:

Name Type Description Default
record DissentRecord

The dissent record.

required

Returns:

Type Description
DissentPayload

A typed dissent payload.

Source code in src/synthorg/communication/conflict_resolution/models.py
@classmethod
def from_record(cls, record: DissentRecord) -> DissentPayload:
    """Build a payload from a dissent record.

    Args:
        record: The dissent record.

    Returns:
        A typed dissent payload.
    """
    return cls(
        dissent_id=record.id,
        conflict_id=record.conflict.id,
        dissenting_agent_id=record.dissenting_agent_id,
        conflict_type=record.conflict.type,
        strategy_used=record.strategy_used,
    )

service

Conflict resolution service orchestrator (see Communication design page).

Follows the DelegationService pattern: __slots__, keyword-only constructor, audit trail list, structured logging.

ConflictResolutionService

ConflictResolutionService(*, config, resolvers, event_hub=None, message_bus=None)

Orchestrates conflict detection, resolution, and audit.

Selects the configured strategy, delegates to the resolver, builds dissent records for all overruled positions, and maintains an audit trail.

Parameters:

Name Type Description Default
config ConflictResolutionConfig

Conflict resolution configuration.

required
resolvers Mapping[ConflictResolutionStrategy, ConflictResolver]

Strategy → resolver mapping.

required
Source code in src/synthorg/communication/conflict_resolution/service.py
def __init__(
    self,
    *,
    config: ConflictResolutionConfig,
    resolvers: Mapping[ConflictResolutionStrategy, ConflictResolver],
    event_hub: EventStreamHub | None = None,
    message_bus: object | None = None,
) -> None:
    self._config = config
    self._message_bus = message_bus
    self._resolvers: MappingProxyType[
        ConflictResolutionStrategy, ConflictResolver
    ] = MappingProxyType(dict(resolvers))
    self._audit_trail: list[DissentRecord] = []
    self._event_hub = event_hub

create_conflict

create_conflict(*, conflict_type, subject, positions, task_id=None)

Create a conflict from agent positions.

Validates minimum positions and unique agent IDs, and generates an ID.

Parameters:

Name Type Description Default
conflict_type ConflictType

Category of the conflict.

required
subject NotBlankStr

Brief description of the dispute.

required
positions Sequence[ConflictPosition]

Agent positions (minimum 2).

required
task_id NotBlankStr | None

Related task ID, if any.

None

Returns:

Type Description
Conflict

New Conflict instance.

Raises:

Type Description
ConflictResolutionError

If fewer than 2 positions or duplicate agent IDs.

Source code in src/synthorg/communication/conflict_resolution/service.py
def create_conflict(
    self,
    *,
    conflict_type: ConflictType,
    subject: NotBlankStr,
    positions: Sequence[ConflictPosition],
    task_id: NotBlankStr | None = None,
) -> Conflict:
    """Create a conflict from agent positions.

    Validates minimum positions and unique agent IDs,
    and generates an ID.

    Args:
        conflict_type: Category of the conflict.
        subject: Brief description of the dispute.
        positions: Agent positions (minimum 2).
        task_id: Related task ID, if any.

    Returns:
        New Conflict instance.

    Raises:
        ConflictResolutionError: If fewer than 2 positions or
            duplicate agent IDs.
    """
    if len(positions) < _MIN_POSITIONS:
        msg = "A conflict requires at least 2 positions"
        logger.warning(
            CONFLICT_VALIDATION_ERROR,
            error=msg,
            position_count=len(positions),
        )
        raise ConflictResolutionError(msg)

    agent_ids = [p.agent_id for p in positions]
    if len(agent_ids) != len(set(agent_ids)):
        msg = "Duplicate agent_id in conflict positions"
        logger.warning(
            CONFLICT_VALIDATION_ERROR,
            error=msg,
            agent_ids=agent_ids,
        )
        raise ConflictResolutionError(msg)

    conflict = Conflict(
        id=f"conflict-{uuid4().hex[:12]}",
        type=conflict_type,
        task_id=task_id,
        subject=subject,
        positions=tuple(positions),
        detected_at=datetime.now(UTC),
    )

    logger.info(
        CONFLICT_DETECTED,
        conflict_id=conflict.id,
        conflict_type=conflict.type,
        subject=conflict.subject,
        is_cross_department=conflict.is_cross_department,
        agent_count=len(positions),
    )

    return conflict

resolve async

resolve(conflict)

Resolve a conflict using the configured strategy.

Parameters:

Name Type Description Default
conflict Conflict

The conflict to resolve.

required

Returns:

Type Description
tuple[ConflictResolution, tuple[DissentRecord, ...]]

Tuple of (resolution, dissent_records).

Raises:

Type Description
ConflictResolutionError

If the configured strategy has no registered resolver.

Source code in src/synthorg/communication/conflict_resolution/service.py
async def resolve(
    self,
    conflict: Conflict,
) -> tuple[ConflictResolution, tuple[DissentRecord, ...]]:
    """Resolve a conflict using the configured strategy.

    Args:
        conflict: The conflict to resolve.

    Returns:
        Tuple of ``(resolution, dissent_records)``.

    Raises:
        ConflictResolutionError: If the configured strategy has
            no registered resolver.
    """
    strategy = self._config.strategy
    resolver = self._resolvers.get(strategy)
    if resolver is None:
        msg = f"No resolver registered for strategy {strategy!r}"
        logger.warning(
            CONFLICT_NO_RESOLVER,
            strategy=strategy,
            error=msg,
        )
        raise ConflictResolutionError(
            msg,
            context={"strategy": strategy},
        )

    logger.info(
        CONFLICT_RESOLUTION_STARTED,
        conflict_id=conflict.id,
        strategy=strategy,
    )

    try:
        resolution = await resolver.resolve(conflict)
    except Exception:
        logger.exception(
            CONFLICT_RESOLUTION_FAILED,
            conflict_id=conflict.id,
            strategy=strategy,
        )
        raise

    dissent_records = resolver.build_dissent_records(conflict, resolution)
    self._audit_trail.extend(dissent_records)

    logger.info(
        CONFLICT_RESOLVED,
        conflict_id=conflict.id,
        outcome=resolution.outcome,
        winning_agent_id=resolution.winning_agent_id,
    )
    for record in dissent_records:
        logger.info(
            CONFLICT_DISSENT_RECORDED,
            dissent_id=record.id,
            conflict_id=conflict.id,
            dissenting_agent=record.dissenting_agent_id,
        )

    await self._publish_dissent_events(dissent_records, conflict.id)

    return resolution, dissent_records

get_dissent_records

get_dissent_records()

Return all dissent records.

Returns:

Type Description
tuple[DissentRecord, ...]

Tuple of dissent records in chronological order.

Source code in src/synthorg/communication/conflict_resolution/service.py
def get_dissent_records(self) -> tuple[DissentRecord, ...]:
    """Return all dissent records.

    Returns:
        Tuple of dissent records in chronological order.
    """
    return tuple(self._audit_trail)

query_dissent_records

query_dissent_records(*, agent_id=None, conflict_type=None, strategy=None, since=None)

Query dissent records with optional filters.

All filters are combined with AND logic.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by dissenting agent ID.

None
conflict_type ConflictType | None

Filter by conflict type.

None
strategy ConflictResolutionStrategy | None

Filter by strategy used.

None
since AwareDatetime | None

Filter by records after this timestamp (must be timezone-aware).

None

Returns:

Type Description
tuple[DissentRecord, ...]

Matching dissent records.

Source code in src/synthorg/communication/conflict_resolution/service.py
def query_dissent_records(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    conflict_type: ConflictType | None = None,
    strategy: ConflictResolutionStrategy | None = None,
    since: AwareDatetime | None = None,
) -> tuple[DissentRecord, ...]:
    """Query dissent records with optional filters.

    All filters are combined with AND logic.

    Args:
        agent_id: Filter by dissenting agent ID.
        conflict_type: Filter by conflict type.
        strategy: Filter by strategy used.
        since: Filter by records after this timestamp
            (must be timezone-aware).

    Returns:
        Matching dissent records.
    """
    logger.debug(
        CONFLICT_DISSENT_QUERIED,
        agent_id=agent_id,
        conflict_type=conflict_type,
        strategy=strategy,
        since=str(since) if since else None,
    )

    return tuple(
        r
        for r in self._audit_trail
        if (agent_id is None or r.dissenting_agent_id == agent_id)
        and (conflict_type is None or r.conflict.type == conflict_type)
        and (strategy is None or r.strategy_used == strategy)
        and (since is None or r.timestamp >= since)
    )

Meeting Protocol

protocol

Meeting protocol interface (see Communication design page).

Defines the MeetingProtocol protocol, the ConflictDetector protocol, and the AgentCaller type alias used to invoke agents during a meeting without coupling to the engine layer.

AgentCaller module-attribute

AgentCaller = Callable[[str, str, int, str], Awaitable[AgentResponse]]

Callback to invoke an agent during a meeting.

Signature: (agent_id, prompt, max_tokens, meeting_id) -> AgentResponse

The orchestrator constructs this from the engine layer, decoupling protocol implementations from the execution engine. meeting_id is threaded through so cost-recording attribution carries the real meeting identifier per turn instead of a synthetic placeholder.

TaskCreator module-attribute

TaskCreator = Callable[[str, str | None, Priority], None]

Callback to create a task from a meeting action item.

Signature: (description, assignee_id, priority: Priority) -> None

Used by the orchestrator to optionally create tasks from extracted action items.

ConflictDetector

Bases: Protocol

Strategy for detecting conflicts in agent responses.

Used by StructuredPhasesProtocol to determine whether a discussion round is needed. The default implementation uses keyword matching; alternative implementations might use structured JSON output or tool calling for more robust detection.

detect

detect(response_content)

Determine whether the response indicates conflicts.

Parameters:

Name Type Description Default
response_content str

The conflict-check agent response text.

required

Returns:

Type Description
bool

True if conflicts were detected, False otherwise.

Source code in src/synthorg/communication/meeting/protocol.py
def detect(self, response_content: str) -> bool:
    """Determine whether the response indicates conflicts.

    Args:
        response_content: The conflict-check agent response text.

    Returns:
        True if conflicts were detected, False otherwise.
    """
    ...

MeetingProtocol

Bases: Protocol

Strategy interface for meeting protocol implementations.

Each implementation defines a different structure for how agents interact during a meeting (round-robin turns, parallel position papers, structured phases with discussion).

run async

run(
    *,
    meeting_id,
    agenda,
    leader_id,
    participant_ids,
    agent_caller,
    token_budget,
    lens_assignments=None,
)

Execute the meeting protocol and produce minutes.

Parameters:

Name Type Description Default
meeting_id str

Unique identifier for this meeting.

required
agenda MeetingAgenda

The meeting agenda.

required
leader_id str

ID of the agent leading the meeting.

required
participant_ids tuple[str, ...]

IDs of participating agents.

required
agent_caller AgentCaller

Callback to invoke agents.

required
token_budget int

Maximum tokens for the entire meeting.

required
lens_assignments Mapping[str, str] | None

Optional mapping of participant ID to strategic lens name. When provided, protocols inject the lens perspective into each participant's prompt.

None

Returns:

Type Description
MeetingMinutes

Complete meeting minutes.

Source code in src/synthorg/communication/meeting/protocol.py
async def run(  # noqa: PLR0913
    self,
    *,
    meeting_id: str,
    agenda: MeetingAgenda,
    leader_id: str,
    participant_ids: tuple[str, ...],
    agent_caller: AgentCaller,
    token_budget: int,
    lens_assignments: Mapping[str, str] | None = None,
) -> MeetingMinutes:
    """Execute the meeting protocol and produce minutes.

    Args:
        meeting_id: Unique identifier for this meeting.
        agenda: The meeting agenda.
        leader_id: ID of the agent leading the meeting.
        participant_ids: IDs of participating agents.
        agent_caller: Callback to invoke agents.
        token_budget: Maximum tokens for the entire meeting.
        lens_assignments: Optional mapping of participant ID to
            strategic lens name.  When provided, protocols inject
            the lens perspective into each participant's prompt.

    Returns:
        Complete meeting minutes.
    """
    ...

get_protocol_type

get_protocol_type()

Return the protocol type this implementation handles.

Returns:

Type Description
MeetingProtocolType

The meeting protocol type enum value.

Source code in src/synthorg/communication/meeting/protocol.py
def get_protocol_type(self) -> MeetingProtocolType:
    """Return the protocol type this implementation handles.

    Returns:
        The meeting protocol type enum value.
    """
    ...

config

Meeting protocol configuration models (see Communication design page).

RoundRobinConfig pydantic-model

Bases: BaseModel

Configuration for the round-robin meeting protocol.

Attributes:

Name Type Description
max_turns_per_agent int

Maximum turns each agent may take.

max_total_turns int

Hard cap on total turns across all agents.

leader_summarizes bool

Whether the leader produces a final summary.

summary_reserve_fraction float

Fraction of the token budget reserved for the summary phase (0.0--1.0).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

max_turns_per_agent pydantic-field

max_turns_per_agent = 2

Maximum turns each agent may take

max_total_turns pydantic-field

max_total_turns = 16

Hard cap on total turns across all agents

leader_summarizes pydantic-field

leader_summarizes = True

Whether the leader produces a final summary

summary_reserve_fraction pydantic-field

summary_reserve_fraction = 0.2

Fraction of token budget reserved for summary phase

PositionPapersConfig pydantic-model

Bases: BaseModel

Configuration for the position-papers meeting protocol.

Attributes:

Name Type Description
max_tokens_per_position int

Token budget per position paper.

synthesizer NotBlankStr

Who performs synthesis. The sentinel "meeting_leader" resolves to the meeting leader at runtime; otherwise interpreted as a specific agent ID.

synthesis_reserve_fraction float

Fraction of the token budget reserved for the synthesis phase (0.0--1.0).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

max_tokens_per_position pydantic-field

max_tokens_per_position = 300

Token budget per position paper

synthesizer pydantic-field

synthesizer = 'meeting_leader'

Who performs synthesis (meeting_leader or agent ID)

synthesis_reserve_fraction pydantic-field

synthesis_reserve_fraction = 0.2

Fraction of token budget reserved for synthesis phase

StructuredPhasesConfig pydantic-model

Bases: BaseModel

Configuration for the structured-phases meeting protocol.

Attributes:

Name Type Description
skip_discussion_if_no_conflicts bool

Skip discussion when no conflicts are detected.

max_discussion_tokens int

Token budget for the discussion round.

synthesis_reserve_fraction float

Fraction of the remaining token budget reserved for the synthesis phase (0.0--1.0).

conflict_detector ConflictDetectorType

Which conflict-detection strategy the structured-phases protocol uses to decide whether discussion is needed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

skip_discussion_if_no_conflicts pydantic-field

skip_discussion_if_no_conflicts = True

Skip discussion when no conflicts detected

max_discussion_tokens pydantic-field

max_discussion_tokens = 1000

Token budget for discussion round

synthesis_reserve_fraction pydantic-field

synthesis_reserve_fraction = 0.2

Fraction of remaining token budget reserved for synthesis

conflict_detector pydantic-field

conflict_detector = KEYWORD

Conflict-detection strategy discriminator

MeetingProtocolConfig pydantic-model

Bases: BaseModel

Top-level meeting protocol configuration.

Selects which protocol strategy to use and carries the per-protocol settings. The three sub-config fields below are all materialised eagerly with sensible defaults; the factory consumes ONLY the sub-config matching :attr:protocol. Setting fields on a sub-config that does not match the active protocol (for example position_papers.synthesizer = "alice" while protocol is ROUND_ROBIN) is silently ignored. Pydantic discriminated unions would express this invariant in the type system but were deferred so the YAML config can serialise every sub-config slot independently without a discriminator wrapper.

Attributes:

Name Type Description
protocol MeetingProtocolType

Which protocol strategy to use.

auto_create_tasks bool

Whether to auto-create tasks from action items extracted during any protocol execution.

max_tasks_per_meeting int | None

Optional cap on how many tasks to create from a single meeting's action items.

round_robin RoundRobinConfig

Round-robin protocol settings (used only when protocol == ROUND_ROBIN).

position_papers PositionPapersConfig

Position-papers protocol settings (used only when protocol == POSITION_PAPERS).

structured_phases StructuredPhasesConfig

Structured-phases protocol settings (used only when protocol == STRUCTURED_PHASES).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

protocol pydantic-field

protocol = ROUND_ROBIN

Which protocol strategy to use

auto_create_tasks pydantic-field

auto_create_tasks = True

Auto-create tasks from action items

max_tasks_per_meeting pydantic-field

max_tasks_per_meeting = None

Maximum tasks to create from a single meeting's action items

round_robin pydantic-field

round_robin

Round-robin protocol settings

position_papers pydantic-field

position_papers

Position-papers protocol settings

structured_phases pydantic-field

structured_phases

Structured-phases protocol settings

models

Meeting protocol domain models (see Communication design page).

AgentResponse pydantic-model

Bases: BaseModel

Result of a single agent invocation during a meeting.

Attributes:

Name Type Description
agent_id NotBlankStr

Identifier of the agent that responded.

content str

Text content of the response.

input_tokens int

Tokens consumed by the prompt.

output_tokens int

Tokens generated in the response.

cost float

Estimated cost of the invocation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_id pydantic-field

agent_id

Agent that responded

content pydantic-field

content

Response content

input_tokens pydantic-field

input_tokens = 0

Prompt tokens consumed

output_tokens pydantic-field

output_tokens = 0

Response tokens generated

cost pydantic-field

cost = 0.0

Estimated invocation cost

MeetingAgendaItem pydantic-model

Bases: BaseModel

A single topic on the meeting agenda.

Attributes:

Name Type Description
title NotBlankStr

Short title of the agenda topic.

description str

Detailed description of the topic.

presenter_id NotBlankStr | None

Agent who presents this item (optional).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

title pydantic-field

title

Agenda topic title

description pydantic-field

description = ''

Detailed topic description

presenter_id pydantic-field

presenter_id = None

Agent who presents this item

MeetingAgenda pydantic-model

Bases: BaseModel

Full meeting agenda.

Attributes:

Name Type Description
title NotBlankStr

Meeting title.

context str

Background context for the meeting.

items tuple[MeetingAgendaItem, ...]

Ordered agenda items.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

title pydantic-field

title

Meeting title

context pydantic-field

context = ''

Background context for the meeting

items pydantic-field

items = ()

Ordered agenda items

MeetingContribution pydantic-model

Bases: BaseModel

An agent's contribution during a meeting phase.

Attributes:

Name Type Description
agent_id NotBlankStr

Identifier of the contributing agent.

content str

Text content of the contribution.

phase MeetingPhase

Meeting phase during which this was contributed.

turn_number int

Turn number within the meeting.

input_tokens int

Prompt tokens consumed.

output_tokens int

Response tokens generated.

timestamp AwareDatetime

When the contribution was recorded.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_id pydantic-field

agent_id

Contributing agent

content pydantic-field

content

Contribution content

phase pydantic-field

phase

Phase of contribution

turn_number pydantic-field

turn_number

Turn number

input_tokens pydantic-field

input_tokens = 0

Prompt tokens consumed

output_tokens pydantic-field

output_tokens = 0

Response tokens generated

timestamp pydantic-field

timestamp

When recorded

ActionItem pydantic-model

Bases: BaseModel

An action item extracted from meeting decisions.

Attributes:

Name Type Description
description NotBlankStr

What needs to be done.

assignee_id NotBlankStr | None

Agent responsible for the action.

priority Priority

Urgency of the action item.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

description pydantic-field

description

What needs to be done

assignee_id pydantic-field

assignee_id = None

Agent responsible for the action

priority pydantic-field

priority = MEDIUM

Action item priority

MeetingMinutes pydantic-model

Bases: BaseModel

Complete output of a meeting protocol execution.

Attributes:

Name Type Description
meeting_id NotBlankStr

Unique meeting identifier.

protocol_type MeetingProtocolType

Protocol that produced these minutes.

leader_id NotBlankStr

Agent who led the meeting.

participant_ids tuple[NotBlankStr, ...]

Agents who participated.

agenda MeetingAgenda

The meeting agenda.

contributions tuple[MeetingContribution, ...]

All agent contributions in order.

summary str

Final meeting summary text.

decisions tuple[NotBlankStr, ...]

Decisions made during the meeting.

action_items tuple[ActionItem, ...]

Extracted action items.

conflicts_detected bool

Whether conflicts were detected.

total_input_tokens int

Total prompt tokens consumed.

total_output_tokens int

Total response tokens generated.

started_at AwareDatetime

When the meeting started.

ended_at AwareDatetime

When the meeting ended.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_timing
  • _validate_participants
  • _validate_token_aggregates

meeting_id pydantic-field

meeting_id

Unique meeting ID

protocol_type pydantic-field

protocol_type

Protocol used

leader_id pydantic-field

leader_id

Meeting leader

participant_ids pydantic-field

participant_ids

Meeting participants

agenda pydantic-field

agenda

Meeting agenda

contributions pydantic-field

contributions = ()

All contributions in order

summary pydantic-field

summary = ''

Final summary

decisions pydantic-field

decisions = ()

Decisions made

action_items pydantic-field

action_items = ()

Extracted action items

conflicts_detected pydantic-field

conflicts_detected = False

Whether conflicts were detected

total_input_tokens pydantic-field

total_input_tokens = 0

Total prompt tokens

total_output_tokens pydantic-field

total_output_tokens = 0

Total response tokens

started_at pydantic-field

started_at

Meeting start time

ended_at pydantic-field

ended_at

Meeting end time

total_tokens property

total_tokens

Total tokens consumed (input + output).

MeetingRecord pydantic-model

Bases: BaseModel

Audit trail entry for a meeting execution.

Attributes:

Name Type Description
meeting_id NotBlankStr

Unique meeting identifier.

meeting_type_name NotBlankStr

Name of the meeting type from config.

protocol_type MeetingProtocolType

Protocol strategy used.

status MeetingStatus

Final status of the meeting.

minutes MeetingMinutes | None

Complete minutes if meeting succeeded.

error_message NotBlankStr | None

Error description if meeting failed.

token_budget int

Token budget that was allocated.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_status_consistency

meeting_id pydantic-field

meeting_id

Unique meeting ID

meeting_type_name pydantic-field

meeting_type_name

Meeting type from config

protocol_type pydantic-field

protocol_type

Protocol strategy used

status pydantic-field

status

Final meeting status

minutes pydantic-field

minutes = None

Complete minutes on success

error_message pydantic-field

error_message = None

Error description on failure

token_budget pydantic-field

token_budget

Token budget allocated

orchestrator

Meeting orchestrator -- lifecycle manager (see Communication design page).

Manages the full meeting lifecycle: validates inputs, selects the configured protocol, executes the meeting, optionally creates tasks from action items, and records audit trail entries.

MeetingOrchestrator

MeetingOrchestrator(
    *,
    protocol_registry,
    agent_caller,
    task_creator=None,
    strategy_config=None,
    lens_assigner=None,
    config_resolver=None,
)

Lifecycle manager for meeting execution.

Coordinates protocol selection, execution, task creation from action items, and audit trail recording. Meeting records are stored in memory; see the persistence layer for durable storage when available.

Parameters:

Name Type Description Default
protocol_registry Mapping[MeetingProtocolType, MeetingProtocol]

Mapping of protocol types to implementations.

required
agent_caller AgentCaller

Callback to invoke agents during meetings.

required
task_creator TaskCreator | None

Optional callback to create tasks from action items.

None
Source code in src/synthorg/communication/meeting/orchestrator.py
def __init__(  # noqa: PLR0913
    self,
    *,
    protocol_registry: Mapping[MeetingProtocolType, MeetingProtocol],
    agent_caller: AgentCaller,
    task_creator: TaskCreator | None = None,
    strategy_config: Any = None,  # typed as Any to avoid circular import
    lens_assigner: Any = None,  # typed as Any to avoid circular import
    config_resolver: ConfigResolver | None = None,
) -> None:
    self._protocol_registry: MappingProxyType[
        MeetingProtocolType, MeetingProtocol
    ] = MappingProxyType(dict(protocol_registry))
    self._agent_caller = agent_caller
    self._task_creator = task_creator
    self._strategy_config = strategy_config
    self._lens_assigner = lens_assigner
    self._config_resolver = config_resolver
    self._records: list[MeetingRecord] = []
    # Mirror records by id for O(1) lookup via ``get_record``.
    # The list keeps chronological order (used by ``get_records``);
    # the dict serves point lookups so controller endpoints don't
    # need to scan every record on every fetch.
    self._records_by_id: dict[str, MeetingRecord] = {}

run_meeting async

run_meeting(
    *,
    meeting_type_name,
    protocol_config,
    agenda,
    leader_id,
    participant_ids,
    token_budget,
)

Execute a meeting and return the audit record.

Validation errors (MeetingParticipantError, MeetingProtocolNotFoundError) are raised directly. Domain and runtime errors during protocol execution are caught and returned as a MeetingRecord with FAILED or BUDGET_EXHAUSTED status. BaseException subclasses (e.g. KeyboardInterrupt) are NOT caught.

Parameters:

Name Type Description Default
meeting_type_name str

Name of the meeting type from config.

required
protocol_config MeetingProtocolConfig

Protocol configuration to use.

required
agenda MeetingAgenda

The meeting agenda.

required
leader_id str

ID of the agent leading the meeting.

required
participant_ids tuple[str, ...]

IDs of participating agents.

required
token_budget int

Maximum tokens for the meeting (must be > 0).

required

Returns:

Type Description
MeetingRecord

Meeting record with status and optional minutes.

Raises:

Type Description
MeetingProtocolNotFoundError

If the configured protocol is not in the registry.

MeetingParticipantError

If participant list is empty, contains duplicates, or leader is in participants.

ValueError

If token_budget is not positive.

Source code in src/synthorg/communication/meeting/orchestrator.py
async def run_meeting(  # noqa: PLR0913
    self,
    *,
    meeting_type_name: str,
    protocol_config: MeetingProtocolConfig,
    agenda: MeetingAgenda,
    leader_id: str,
    participant_ids: tuple[str, ...],
    token_budget: int,
) -> MeetingRecord:
    """Execute a meeting and return the audit record.

    Validation errors (``MeetingParticipantError``,
    ``MeetingProtocolNotFoundError``) are raised directly.
    Domain and runtime errors during protocol execution are caught
    and returned as a ``MeetingRecord`` with ``FAILED`` or
    ``BUDGET_EXHAUSTED`` status.  ``BaseException`` subclasses
    (e.g. ``KeyboardInterrupt``) are NOT caught.

    Args:
        meeting_type_name: Name of the meeting type from config.
        protocol_config: Protocol configuration to use.
        agenda: The meeting agenda.
        leader_id: ID of the agent leading the meeting.
        participant_ids: IDs of participating agents.
        token_budget: Maximum tokens for the meeting (must be > 0).

    Returns:
        Meeting record with status and optional minutes.

    Raises:
        MeetingProtocolNotFoundError: If the configured protocol
            is not in the registry.
        MeetingParticipantError: If participant list is empty,
            contains duplicates, or leader is in participants.
        ValueError: If token_budget is not positive.
    """
    meeting_id = f"mtg-{uuid4().hex[:12]}"
    protocol_type = protocol_config.protocol

    # Validate inputs before the kill-switch check so an invalid
    # request (e.g. token_budget=0, empty participants) always
    # raises the same ``ValueError`` /
    # ``MeetingParticipantError`` regardless of whether the
    # operator has paused meetings -- callers should not see
    # divergent error semantics across kill-switch state, and
    # constructing the cancellation ``MeetingRecord`` below would
    # otherwise hit pydantic validation on ``token_budget`` (gt=0).
    self._validate_inputs(
        meeting_id,
        leader_id,
        participant_ids,
        token_budget,
    )

    # ``communication.meetings_enabled`` kill switch: when False
    # the orchestrator records a CANCELLED meeting (no protocol
    # invocation, no agent calls) so an operator can suspend the
    # meetings subsystem without tearing down the scheduler.
    meetings_enabled = await resolve_bool_with_fallback(
        resolver=self._config_resolver,
        namespace="communication",
        key="meetings_enabled",
        fallback=True,
    )
    if not meetings_enabled:
        cancelled_record = MeetingRecord(
            meeting_id=meeting_id,
            meeting_type_name=meeting_type_name,
            protocol_type=protocol_type,
            status=MeetingStatus.CANCELLED,
            token_budget=token_budget,
        )
        # Persist the cancellation in ``self._records`` so
        # ``get_records()`` reports it alongside successful and
        # protocol-failed runs; an audit trail that silently drops
        # kill-switch cancellations would mislead operators
        # reconstructing what happened during a paused window.
        self._append_record(cancelled_record)
        # ``MEETING_CANCELLED`` (not ``MEETING_FAILED``): operator
        # cancellations should not skew failure metrics or trip
        # alerts wired to ``meeting.lifecycle.failed``. Logged
        # AFTER the record append per the post-persist contract
        # (CLAUDE.md state-transition rule).
        logger.info(
            MEETING_CANCELLED,
            meeting_id=meeting_id,
            meeting_type=meeting_type_name,
            reason="meetings_disabled_by_setting",
        )
        return cancelled_record

    protocol = self._resolve_protocol(meeting_id, protocol_type)

    logger.info(
        MEETING_STARTED,
        meeting_id=meeting_id,
        meeting_type=meeting_type_name,
        protocol=protocol_type,
        leader_id=leader_id,
        participant_count=len(participant_ids),
        token_budget=token_budget,
    )

    # Lens assignment (optional, when strategy config is present)
    lens_assignments = self._compute_lens_assignments(participant_ids)

    result = await self._execute_protocol(
        protocol,
        meeting_id,
        meeting_type_name,
        agenda,
        leader_id,
        participant_ids,
        token_budget,
        lens_assignments=lens_assignments,
    )

    if isinstance(result, MeetingRecord):
        return result

    self._create_tasks(meeting_id, protocol_config, result)
    return self._record_success(
        meeting_id,
        meeting_type_name,
        protocol_type,
        result,
        token_budget,
    )

get_records

get_records()

Return all meeting audit records.

Returns:

Type Description
tuple[MeetingRecord, ...]

Tuple of meeting records in chronological order.

Source code in src/synthorg/communication/meeting/orchestrator.py
def get_records(self) -> tuple[MeetingRecord, ...]:
    """Return all meeting audit records.

    Returns:
        Tuple of meeting records in chronological order.
    """
    return tuple(self._records)

get_record

get_record(meeting_id)

Return the meeting record matching meeting_id or None.

O(1) lookup via the by-id mirror; controller endpoints fetch a single meeting without scanning the full record list.

Source code in src/synthorg/communication/meeting/orchestrator.py
def get_record(self, meeting_id: str) -> MeetingRecord | None:
    """Return the meeting record matching ``meeting_id`` or ``None``.

    O(1) lookup via the by-id mirror; controller endpoints fetch a
    single meeting without scanning the full record list.
    """
    return self._records_by_id.get(meeting_id)

delete_record

delete_record(meeting_id)

Remove the meeting record matching meeting_id.

Returns True when a record was removed, False when no record had the supplied id. Synchronous because the in-memory store has no I/O; the surrounding service-layer wrapper is async to match the rest of the persistence contract.

Source code in src/synthorg/communication/meeting/orchestrator.py
def delete_record(self, meeting_id: str) -> bool:
    """Remove the meeting record matching ``meeting_id``.

    Returns ``True`` when a record was removed, ``False`` when no
    record had the supplied id. Synchronous because the in-memory
    store has no I/O; the surrounding service-layer wrapper is
    async to match the rest of the persistence contract.
    """
    record = self._records_by_id.pop(meeting_id, None)
    if record is None:
        return False
    # Locate the same record in the chronological list and remove it.
    # ``list.remove`` is O(n) but acceptable here because the list
    # mirrors the dict; both are bounded by the in-memory record set.
    try:
        self._records.remove(record)
    except ValueError:
        # The list and dict drifted -- restore the dict entry so the
        # caller's "found" answer doesn't regress to a silent loss
        # AND emit a structured ERROR log so operators see the
        # invariant violation instead of debugging a phantom
        # "delete returned False" later.
        self._records_by_id[meeting_id] = record
        # TRY400: this is an invariant-violation log, not a stack
        # trace use case; the relevant context is the structured
        # fields below, not the ValueError trace.
        logger.error(
            MEETING_RECORD_MIRROR_DRIFT,
            reason="record_mirror_drift",
            meeting_id=meeting_id,
            list_len=len(self._records),
            dict_len=len(self._records_by_id),
        )
        return False
    return True