Skip to content

Communication

Inter-agent messaging -- bus, dispatcher, delegation, loop prevention, conflict resolution, and meeting protocols.

Message

message

Message domain models (see Communication design page).

Attachment pydantic-model

Bases: BaseModel

A reference attached to a message.

Attributes:

Name Type Description
type AttachmentType

The kind of attachment.

ref NotBlankStr

Reference identifier (e.g. artifact ID, URL, file path).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

type pydantic-field

type

Kind of attachment

ref pydantic-field

ref

Reference identifier

MessageMetadata pydantic-model

Bases: BaseModel

Optional metadata carried with a message.

Extends the Communication design page metadata with an additional extra field for arbitrary key-value pairs.

Attributes:

Name Type Description
task_id NotBlankStr | None

Related task identifier.

project_id NotBlankStr | None

Related project identifier.

tokens_used int | None

LLM tokens consumed producing the message.

cost_usd float | None

Estimated cost of the message in USD (base currency).

extra tuple[tuple[str, str], ...]

Immutable key-value pairs for arbitrary metadata (extension).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_extra

task_id pydantic-field

task_id = None

Related task identifier

project_id pydantic-field

project_id = None

Related project identifier

tokens_used pydantic-field

tokens_used = None

LLM tokens consumed

cost_usd pydantic-field

cost_usd = None

Estimated cost in USD (base currency)

extra pydantic-field

extra = ()

Immutable key-value pairs for arbitrary metadata

Message pydantic-model

Bases: BaseModel

An inter-agent message.

Field schema is based on the Communication design page with typed refinements. The sender field is aliased to "from" for JSON compatibility with the spec format.

Attributes:

Name Type Description
id UUID

Unique message identifier.

timestamp AwareDatetime

When the message was created (must be timezone-aware).

sender NotBlankStr

Agent ID of the sender (aliased to "from" in JSON).

to NotBlankStr

Recipient agent or channel identifier.

type MessageType

Message type classification.

priority MessagePriority

Message priority level.

channel NotBlankStr

Channel the message is sent through.

content NotBlankStr

Message body text.

attachments tuple[Attachment, ...]

Attached references.

metadata MessageMetadata

Optional message metadata.

Config:

  • frozen: True
  • populate_by_name: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique message identifier

timestamp pydantic-field

timestamp

When the message was created (must be timezone-aware)

sender pydantic-field

sender

Sender agent ID

to pydantic-field

to

Recipient agent or channel

type pydantic-field

type

Message type classification

priority pydantic-field

priority = NORMAL

Message priority level

channel pydantic-field

channel

Channel the message is sent through

content pydantic-field

content

Message body text

attachments pydantic-field

attachments = ()

Attached references

metadata pydantic-field

metadata

Optional message metadata

Messenger

messenger

Per-agent messenger facade over the message bus (see Communication design page).

AgentMessenger

AgentMessenger(agent_id, agent_name, bus, dispatcher=None)

Per-agent facade for sending, receiving, and dispatching messages.

Wraps a :class:MessageBus and optional :class:MessageDispatcher to provide a high-level API that auto-fills sender, timestamp, and message ID.

Parameters:

Name Type Description Default
agent_id str

Identifier of the owning agent.

required
agent_name str

Human-readable name of the agent.

required
bus MessageBus

The underlying message bus.

required
dispatcher MessageDispatcher | None

Optional message dispatcher for handler routing.

None

Raises:

Type Description
ValueError

If agent_id or agent_name is blank.

Source code in src/synthorg/communication/messenger.py
def __init__(
    self,
    agent_id: str,
    agent_name: str,
    bus: MessageBus,
    dispatcher: MessageDispatcher | None = None,
) -> None:
    if not agent_id.strip():
        logger.warning(
            COMM_MESSENGER_INVALID_AGENT,
            field="agent_id",
            value=repr(agent_id),
        )
        msg = "agent_id must not be blank"
        raise ValueError(msg)
    if not agent_name.strip():
        logger.warning(
            COMM_MESSENGER_INVALID_AGENT,
            field="agent_name",
            value=repr(agent_name),
        )
        msg = "agent_name must not be blank"
        raise ValueError(msg)
    self._agent_id = agent_id
    self._agent_name = agent_name
    self._bus = bus
    self._dispatcher = dispatcher
    logger.debug(
        COMM_MESSENGER_CREATED,
        agent_id=agent_id,
        agent_name=agent_name,
    )

send_message async

send_message(*, to, channel, content, message_type, priority=NORMAL)

Send a message to a channel.

Auto-fills sender, timestamp, and message ID.

Parameters:

Name Type Description Default
to str

Recipient agent or channel identifier.

required
channel str

Channel to publish through.

required
content str

Message body text.

required
message_type MessageType

Message type classification.

required
priority MessagePriority

Message priority level.

NORMAL

Returns:

Type Description
Message

The constructed and published message.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def send_message(
    self,
    *,
    to: str,
    channel: str,
    content: str,
    message_type: MessageType,
    priority: MessagePriority = MessagePriority.NORMAL,
) -> Message:
    """Send a message to a channel.

    Auto-fills sender, timestamp, and message ID.

    Args:
        to: Recipient agent or channel identifier.
        channel: Channel to publish through.
        content: Message body text.
        message_type: Message type classification.
        priority: Message priority level.

    Returns:
        The constructed and published message.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
        MessageBusNotRunningError: If the bus is not running.
    """
    msg = Message(
        timestamp=datetime.now(UTC),
        sender=self._agent_id,
        to=to,
        type=message_type,
        priority=priority,
        channel=channel,
        content=content,
    )
    await self._bus.publish(msg)
    logger.info(
        COMM_MESSAGE_SENT,
        agent_id=self._agent_id,
        to=to,
        channel=channel,
        message_id=str(msg.id),
    )
    return msg

send_direct async

send_direct(*, to, content, message_type, priority=NORMAL)

Send a direct message to another agent.

Auto-fills sender, timestamp, and message ID. The message's channel field is set to the deterministic @{a}:{b} channel name computed from the sorted agent ID pair.

Parameters:

Name Type Description Default
to str

Recipient agent ID.

required
content str

Message body text.

required
message_type MessageType

Message type classification.

required
priority MessagePriority

Message priority level.

NORMAL

Returns:

Type Description
Message

The constructed and sent message.

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def send_direct(
    self,
    *,
    to: str,
    content: str,
    message_type: MessageType,
    priority: MessagePriority = MessagePriority.NORMAL,
) -> Message:
    """Send a direct message to another agent.

    Auto-fills sender, timestamp, and message ID. The message's
    ``channel`` field is set to the deterministic ``@{a}:{b}``
    channel name computed from the sorted agent ID pair.

    Args:
        to: Recipient agent ID.
        content: Message body text.
        message_type: Message type classification.
        priority: Message priority level.

    Returns:
        The constructed and sent message.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
    """
    channel = _direct_channel_name(self._agent_id, to)
    msg = Message(
        timestamp=datetime.now(UTC),
        sender=self._agent_id,
        to=to,
        type=message_type,
        priority=priority,
        channel=channel,
        content=content,
    )
    await self._bus.send_direct(msg, recipient=to)
    logger.info(
        COMM_MESSAGE_SENT,
        agent_id=self._agent_id,
        to=to,
        channel=channel,
        message_id=str(msg.id),
    )
    return msg

broadcast async

broadcast(*, content, message_type, priority=NORMAL, channel='#all-hands')

Publish an announcement to a shared channel.

Agents must be subscribed to the target channel to receive the message. For true fan-out to all known agents, use a channel of type :attr:ChannelType.BROADCAST.

Parameters:

Name Type Description Default
content str

Message body text.

required
message_type MessageType

Message type classification.

required
priority MessagePriority

Message priority level.

NORMAL
channel str

Channel name (default "#all-hands").

'#all-hands'

Returns:

Type Description
Message

The constructed and published message.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def broadcast(
    self,
    *,
    content: str,
    message_type: MessageType,
    priority: MessagePriority = MessagePriority.NORMAL,
    channel: str = "#all-hands",
) -> Message:
    """Publish an announcement to a shared channel.

    Agents must be subscribed to the target channel to receive
    the message.  For true fan-out to all known agents, use a
    channel of type :attr:`ChannelType.BROADCAST`.

    Args:
        content: Message body text.
        message_type: Message type classification.
        priority: Message priority level.
        channel: Channel name (default ``"#all-hands"``).

    Returns:
        The constructed and published message.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
        MessageBusNotRunningError: If the bus is not running.
    """
    msg = Message(
        timestamp=datetime.now(UTC),
        sender=self._agent_id,
        to=channel,
        type=message_type,
        priority=priority,
        channel=channel,
        content=content,
    )
    await self._bus.publish(msg)
    logger.info(
        COMM_MESSAGE_BROADCAST,
        agent_id=self._agent_id,
        channel=channel,
        message_id=str(msg.id),
    )
    return msg

subscribe async

subscribe(channel_name)

Subscribe this agent to a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to subscribe to.

required

Returns:

Type Description
Subscription

The subscription record.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/messenger.py
async def subscribe(self, channel_name: NotBlankStr) -> Subscription:
    """Subscribe this agent to a channel.

    Args:
        channel_name: Channel to subscribe to.

    Returns:
        The subscription record.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
        MessageBusNotRunningError: If the bus is not running.
    """
    sub = await self._bus.subscribe(channel_name, self._agent_id)
    logger.info(
        COMM_MESSENGER_SUBSCRIBED,
        agent_id=self._agent_id,
        channel=channel_name,
    )
    return sub

unsubscribe async

unsubscribe(channel_name)

Unsubscribe this agent from a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to unsubscribe from.

required

Raises:

Type Description
NotSubscribedError

If not currently subscribed.

Source code in src/synthorg/communication/messenger.py
async def unsubscribe(self, channel_name: NotBlankStr) -> None:
    """Unsubscribe this agent from a channel.

    Args:
        channel_name: Channel to unsubscribe from.

    Raises:
        NotSubscribedError: If not currently subscribed.
    """
    await self._bus.unsubscribe(channel_name, self._agent_id)
    logger.info(
        COMM_MESSENGER_UNSUBSCRIBED,
        agent_id=self._agent_id,
        channel=channel_name,
    )

receive async

receive(channel_name, *, timeout=None)

Receive the next message from a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to receive from.

required
timeout float | None

Max seconds to wait, or None for indefinite.

None

Returns:

Type Description
DeliveryEnvelope | None

The next delivery envelope, or None when:

DeliveryEnvelope | None
  • timeout expires without a message arriving.
DeliveryEnvelope | None
  • The bus is shut down while waiting.
DeliveryEnvelope | None
  • The subscription is cancelled via :meth:unsubscribe while a receive() call is in flight.
Source code in src/synthorg/communication/messenger.py
async def receive(
    self,
    channel_name: NotBlankStr,
    *,
    timeout: float | None = None,  # noqa: ASYNC109
) -> DeliveryEnvelope | None:
    """Receive the next message from a channel.

    Args:
        channel_name: Channel to receive from.
        timeout: Max seconds to wait, or ``None`` for indefinite.

    Returns:
        The next delivery envelope, or ``None`` when:

        - *timeout* expires without a message arriving.
        - The bus is shut down while waiting.
        - The subscription is cancelled via :meth:`unsubscribe`
            while a ``receive()`` call is in flight.
    """
    return await self._bus.receive(
        channel_name,
        self._agent_id,
        timeout=timeout,
    )

register_handler

register_handler(handler, *, message_types=None, min_priority=LOW, name='unnamed')

Register a message handler.

Creates a dispatcher automatically if one was not provided at construction time.

Parameters:

Name Type Description Default
handler MessageHandler | MessageHandlerFunc

The handler instance or async function.

required
message_types frozenset[MessageType] | None

Message types to match (empty/None = all).

None
min_priority MessagePriority

Minimum priority to accept.

LOW
name str

Human-readable label for debugging.

'unnamed'

Returns:

Type Description
str

The unique handler registration ID.

Source code in src/synthorg/communication/messenger.py
def register_handler(
    self,
    handler: MessageHandler | MessageHandlerFunc,
    *,
    message_types: frozenset[MessageType] | None = None,
    min_priority: MessagePriority = MessagePriority.LOW,
    name: str = "unnamed",
) -> str:
    """Register a message handler.

    Creates a dispatcher automatically if one was not provided
    at construction time.

    Args:
        handler: The handler instance or async function.
        message_types: Message types to match (empty/None = all).
        min_priority: Minimum priority to accept.
        name: Human-readable label for debugging.

    Returns:
        The unique handler registration ID.
    """
    if self._dispatcher is None:
        self._dispatcher = MessageDispatcher(
            agent_id=self._agent_id,
        )
    return self._dispatcher.register(
        handler,
        message_types=message_types,
        min_priority=min_priority,
        name=name,
    )

deregister_handler

deregister_handler(handler_id)

Remove a previously registered handler.

Parameters:

Name Type Description Default
handler_id str

The registration ID.

required

Returns:

Type Description
bool

True if the handler was found and removed.

Source code in src/synthorg/communication/messenger.py
def deregister_handler(self, handler_id: str) -> bool:
    """Remove a previously registered handler.

    Args:
        handler_id: The registration ID.

    Returns:
        True if the handler was found and removed.
    """
    if self._dispatcher is None:
        logger.debug(
            COMM_HANDLER_DEREGISTER_MISS,
            agent_id=self._agent_id,
            handler_id=handler_id,
        )
        return False
    return self._dispatcher.deregister(handler_id)

dispatch_message async

dispatch_message(message)

Dispatch an incoming message to registered handlers.

Parameters:

Name Type Description Default
message Message

The message to dispatch.

required

Returns:

Name Type Description
A DispatchResult

class:DispatchResult summarising the outcome.

Source code in src/synthorg/communication/messenger.py
async def dispatch_message(self, message: Message) -> DispatchResult:
    """Dispatch an incoming message to registered handlers.

    Args:
        message: The message to dispatch.

    Returns:
        A :class:`DispatchResult` summarising the outcome.
    """
    if self._dispatcher is None:
        logger.debug(
            COMM_DISPATCH_NO_DISPATCHER,
            agent_id=self._agent_id,
            message_id=str(message.id),
        )
        return DispatchResult(
            message_id=message.id,
            handlers_succeeded=0,
            handlers_failed=0,
        )
    return await self._dispatcher.dispatch(message)

Dispatcher

dispatcher

Message dispatcher -- routes incoming messages to registered handlers.

See the Communication design page.

DispatchResult pydantic-model

Bases: BaseModel

Immutable outcome of dispatching a single message.

Attributes:

Name Type Description
message_id UUID

The dispatched message's identifier.

handlers_matched int

Derived count (succeeded + failed).

handlers_succeeded int

Number of handlers that completed without error.

handlers_failed int

Number of handlers that raised an exception.

errors tuple[str, ...]

Error descriptions from failed handlers.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • message_id (UUID)
  • handlers_succeeded (int)
  • handlers_failed (int)
  • errors (tuple[str, ...])

handlers_matched property

handlers_matched

Total handlers that matched (succeeded + failed).

MessageDispatcher

MessageDispatcher(agent_id='unknown')

Per-agent dispatcher that routes messages to registered handlers.

Parameters:

Name Type Description Default
agent_id str

Identifier of the owning agent (for logging context).

'unknown'
Source code in src/synthorg/communication/dispatcher.py
def __init__(self, agent_id: str = "unknown") -> None:
    self._agent_id = agent_id
    self._registrations: dict[str, HandlerRegistration] = {}

register

register(handler, *, message_types=None, min_priority=LOW, name='unnamed')

Register a handler for incoming messages.

If handler is a bare async function, it is automatically wrapped in a :class:FunctionHandler.

Parameters:

Name Type Description Default
handler MessageHandler | MessageHandlerFunc

The handler instance or async function.

required
message_types frozenset[MessageType] | None

Message types to match (empty/None = all).

None
min_priority MessagePriority

Minimum priority to accept.

LOW
name str

Human-readable label for debugging.

'unnamed'

Returns:

Type Description
str

The unique handler registration ID.

Source code in src/synthorg/communication/dispatcher.py
def register(
    self,
    handler: MessageHandler | MessageHandlerFunc,
    *,
    message_types: frozenset[MessageType] | None = None,
    min_priority: MessagePriority = MessagePriority.LOW,
    name: str = "unnamed",
) -> str:
    """Register a handler for incoming messages.

    If *handler* is a bare async function, it is automatically wrapped
    in a :class:`FunctionHandler`.

    Args:
        handler: The handler instance or async function.
        message_types: Message types to match (empty/None = all).
        min_priority: Minimum priority to accept.
        name: Human-readable label for debugging.

    Returns:
        The unique handler registration ID.
    """
    if not isinstance(handler, MessageHandler):
        handler = FunctionHandler(handler)
    elif not inspect.iscoroutinefunction(handler.handle):
        msg = (
            f"MessageHandler {type(handler).__name__!r} has a "
            f"synchronous handle() -- must be async"
        )
        logger.warning(
            COMM_HANDLER_INVALID,
            agent_id=self._agent_id,
            handler_name=name,
            handler_type=type(handler).__name__,
            error=msg,
        )
        raise TypeError(msg)

    registration = HandlerRegistration(
        handler=handler,
        message_types=message_types or frozenset(),
        min_priority=min_priority,
        name=name,
    )
    self._registrations[registration.handler_id] = registration
    logger.info(
        COMM_HANDLER_REGISTERED,
        agent_id=self._agent_id,
        handler_id=registration.handler_id,
        handler_name=name,
    )
    return registration.handler_id

deregister

deregister(handler_id)

Remove a previously registered handler.

Parameters:

Name Type Description Default
handler_id str

The registration ID returned by :meth:register.

required

Returns:

Type Description
bool

True if the handler was found and removed, False otherwise.

Source code in src/synthorg/communication/dispatcher.py
def deregister(self, handler_id: str) -> bool:
    """Remove a previously registered handler.

    Args:
        handler_id: The registration ID returned by :meth:`register`.

    Returns:
        True if the handler was found and removed, False otherwise.
    """
    removed = self._registrations.pop(handler_id, None)
    if removed is not None:
        logger.info(
            COMM_HANDLER_DEREGISTERED,
            agent_id=self._agent_id,
            handler_id=handler_id,
            handler_name=removed.name,
        )
        return True
    logger.debug(
        COMM_HANDLER_DEREGISTER_MISS,
        agent_id=self._agent_id,
        handler_id=handler_id,
    )
    return False

dispatch async

dispatch(message)

Route a message to all matching handlers concurrently.

Handlers that raise Exception subclasses are isolated -- their errors are captured without affecting other handlers. BaseException subclasses (e.g. KeyboardInterrupt, CancelledError) propagate through the TaskGroup, cancelling all remaining handlers.

Parameters:

Name Type Description Default
message Message

The message to dispatch.

required

Returns:

Name Type Description
A DispatchResult

class:DispatchResult summarising the outcome.

Source code in src/synthorg/communication/dispatcher.py
async def dispatch(self, message: Message) -> DispatchResult:
    """Route a message to all matching handlers concurrently.

    Handlers that raise ``Exception`` subclasses are isolated --
    their errors are captured without affecting other handlers.
    ``BaseException`` subclasses (e.g. ``KeyboardInterrupt``,
    ``CancelledError``) propagate through the ``TaskGroup``,
    cancelling all remaining handlers.

    Args:
        message: The message to dispatch.

    Returns:
        A :class:`DispatchResult` summarising the outcome.
    """
    matched = [
        reg for reg in self._registrations.values() if self._matches(reg, message)
    ]

    logger.debug(
        COMM_DISPATCH_START,
        agent_id=self._agent_id,
        message_id=str(message.id),
        message_type=message.type,
    )

    if not matched:
        logger.debug(
            COMM_DISPATCH_NO_HANDLERS,
            agent_id=self._agent_id,
            message_id=str(message.id),
        )
        return DispatchResult(
            message_id=message.id,
            handlers_succeeded=0,
            handlers_failed=0,
        )

    logger.debug(
        COMM_DISPATCH_HANDLER_MATCHED,
        agent_id=self._agent_id,
        message_id=str(message.id),
        count=len(matched),
    )

    errors: list[str | None] = [None] * len(matched)

    async with asyncio.TaskGroup() as tg:
        for idx, reg in enumerate(matched):
            tg.create_task(
                self._guarded_handle(reg, message, errors, idx),
            )

    error_msgs = tuple(e for e in errors if e is not None)
    succeeded = len(matched) - len(error_msgs)

    logger.info(
        COMM_DISPATCH_COMPLETE,
        agent_id=self._agent_id,
        message_id=str(message.id),
        matched=len(matched),
        succeeded=succeeded,
        failed=len(error_msgs),
    )

    return DispatchResult(
        message_id=message.id,
        handlers_succeeded=succeeded,
        handlers_failed=len(error_msgs),
        errors=error_msgs,
    )

Bus Protocol

bus_protocol

Message bus protocol (see Communication design page).

Defines the swappable interface for message bus backends. The default implementation is :class:InMemoryMessageBus in bus_memory.py.

MessageBus

Bases: Protocol

Protocol for message bus backends.

All implementations must support the full lifecycle (start/stop), channel management, pub/sub messaging, direct messaging, and channel history.

Uses a pull model: consumers call :meth:receive to get the next message rather than registering push callbacks.

is_running property

is_running

Whether the bus is currently running.

start async

start()

Start the bus and create pre-configured channels.

Raises:

Type Description
MessageBusAlreadyRunningError

If the bus is already running.

Source code in src/synthorg/communication/bus_protocol.py
async def start(self) -> None:
    """Start the bus and create pre-configured channels.

    Raises:
        MessageBusAlreadyRunningError: If the bus is already running.
    """
    ...

stop async

stop()

Stop the bus gracefully. Idempotent.

Source code in src/synthorg/communication/bus_protocol.py
async def stop(self) -> None:
    """Stop the bus gracefully.  Idempotent."""
    ...

publish async

publish(message)

Publish a message to its channel.

The target channel is determined by message.channel.

Parameters:

Name Type Description Default
message Message

The message to publish.

required

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelNotFoundError

If the target channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def publish(self, message: Message) -> None:
    """Publish a message to its channel.

    The target channel is determined by ``message.channel``.

    Args:
        message: The message to publish.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelNotFoundError: If the target channel does not exist.
    """
    ...

send_direct async

send_direct(message, *, recipient)

Send a direct message between two agents.

Lazily creates a DIRECT channel named @{a}:{b} (where a, b are the sorted agent IDs) and subscribes both agents.

Parameters:

Name Type Description Default
message Message

The message to send (message.sender is the sender).

required
recipient NotBlankStr

The recipient agent ID.

required

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

Source code in src/synthorg/communication/bus_protocol.py
async def send_direct(
    self,
    message: Message,
    *,
    recipient: NotBlankStr,
) -> None:
    """Send a direct message between two agents.

    Lazily creates a DIRECT channel named ``@{a}:{b}`` (where
    a, b are the sorted agent IDs) and subscribes both agents.

    Args:
        message: The message to send (``message.sender`` is the
            sender).
        recipient: The recipient agent ID.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
    """
    ...

subscribe async

subscribe(channel_name, subscriber_id)

Subscribe an agent to a channel.

Idempotent -- returns a fresh subscription record if already subscribed (the channel's subscriber list is not duplicated).

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to subscribe to.

required
subscriber_id NotBlankStr

Agent ID of the subscriber.

required

Returns:

Type Description
Subscription

The subscription record.

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelNotFoundError

If the channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def subscribe(
    self,
    channel_name: NotBlankStr,
    subscriber_id: NotBlankStr,
) -> Subscription:
    """Subscribe an agent to a channel.

    Idempotent -- returns a fresh subscription record if already
    subscribed (the channel's subscriber list is not duplicated).

    Args:
        channel_name: Channel to subscribe to.
        subscriber_id: Agent ID of the subscriber.

    Returns:
        The subscription record.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelNotFoundError: If the channel does not exist.
    """
    ...

unsubscribe async

unsubscribe(channel_name, subscriber_id)

Remove an agent's subscription from a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to unsubscribe from.

required
subscriber_id NotBlankStr

Agent ID to remove.

required

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

NotSubscribedError

If the agent is not subscribed.

Source code in src/synthorg/communication/bus_protocol.py
async def unsubscribe(
    self,
    channel_name: NotBlankStr,
    subscriber_id: NotBlankStr,
) -> None:
    """Remove an agent's subscription from a channel.

    Args:
        channel_name: Channel to unsubscribe from.
        subscriber_id: Agent ID to remove.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        NotSubscribedError: If the agent is not subscribed.
    """
    ...

receive async

receive(channel_name, subscriber_id, *, timeout=None)

Receive the next message from a channel.

Awaits until a message is available, the timeout expires, or the bus is stopped. When timeout is None, awaits indefinitely (or until shutdown).

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to receive from.

required
subscriber_id NotBlankStr

Agent ID receiving.

required
timeout float | None

Seconds to wait before returning None.

None

Returns:

Type Description
DeliveryEnvelope | None

A delivery envelope, or None on timeout or shutdown.

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelNotFoundError

If the channel does not exist.

NotSubscribedError

If the subscriber is not subscribed (for TOPIC and DIRECT channels).

Source code in src/synthorg/communication/bus_protocol.py
async def receive(
    self,
    channel_name: NotBlankStr,
    subscriber_id: NotBlankStr,
    *,
    timeout: float | None = None,  # noqa: ASYNC109
) -> DeliveryEnvelope | None:
    """Receive the next message from a channel.

    Awaits until a message is available, the timeout expires, or
    the bus is stopped.  When ``timeout`` is ``None``, awaits
    indefinitely (or until shutdown).

    Args:
        channel_name: Channel to receive from.
        subscriber_id: Agent ID receiving.
        timeout: Seconds to wait before returning ``None``.

    Returns:
        A delivery envelope, or ``None`` on timeout or shutdown.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelNotFoundError: If the channel does not exist.
        NotSubscribedError: If the subscriber is not subscribed
            (for TOPIC and DIRECT channels).
    """
    ...

create_channel async

create_channel(channel)

Create a new channel.

Parameters:

Name Type Description Default
channel Channel

Channel definition to create.

required

Returns:

Type Description
Channel

The created channel.

Raises:

Type Description
MessageBusNotRunningError

If the bus is not running.

ChannelAlreadyExistsError

If a channel with that name already exists.

Source code in src/synthorg/communication/bus_protocol.py
async def create_channel(self, channel: Channel) -> Channel:
    """Create a new channel.

    Args:
        channel: Channel definition to create.

    Returns:
        The created channel.

    Raises:
        MessageBusNotRunningError: If the bus is not running.
        ChannelAlreadyExistsError: If a channel with that name
            already exists.
    """
    ...

get_channel async

get_channel(channel_name)

Get a channel by name.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Name of the channel.

required

Returns:

Type Description
Channel

The channel.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def get_channel(self, channel_name: NotBlankStr) -> Channel:
    """Get a channel by name.

    Args:
        channel_name: Name of the channel.

    Returns:
        The channel.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
    """
    ...

list_channels async

list_channels()

List all channels.

Returns:

Type Description
tuple[Channel, ...]

All registered channels.

Source code in src/synthorg/communication/bus_protocol.py
async def list_channels(self) -> tuple[Channel, ...]:
    """List all channels.

    Returns:
        All registered channels.
    """
    ...

get_channel_history async

get_channel_history(channel_name, *, limit=None)

Get message history for a channel.

Parameters:

Name Type Description Default
channel_name NotBlankStr

Channel to query.

required
limit int | None

Maximum number of most recent messages to return. Values <= 0 return an empty tuple.

None

Returns:

Type Description
tuple[Message, ...]

Messages in chronological order.

Raises:

Type Description
ChannelNotFoundError

If the channel does not exist.

Source code in src/synthorg/communication/bus_protocol.py
async def get_channel_history(
    self,
    channel_name: NotBlankStr,
    *,
    limit: int | None = None,
) -> tuple[Message, ...]:
    """Get message history for a channel.

    Args:
        channel_name: Channel to query.
        limit: Maximum number of most recent messages to return.
            Values ``<= 0`` return an empty tuple.

    Returns:
        Messages in chronological order.

    Raises:
        ChannelNotFoundError: If the channel does not exist.
    """
    ...

Config

config

Communication configuration models (see Communication design page).

MessageRetentionConfig pydantic-model

Bases: BaseModel

Retention settings for channel message history.

Attributes:

Name Type Description
max_messages_per_channel int

Maximum messages kept per channel.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

max_messages_per_channel pydantic-field

max_messages_per_channel = 1000

Maximum messages kept per channel

MessageBusConfig pydantic-model

Bases: BaseModel

Message bus backend configuration.

Maps to the Communication design page message_bus.

Attributes:

Name Type Description
backend MessageBusBackend

Transport backend to use.

channels tuple[NotBlankStr, ...]

Pre-defined channel names.

retention MessageRetentionConfig

Message retention settings.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_channels

backend pydantic-field

backend = INTERNAL

Transport backend

channels pydantic-field

channels = _DEFAULT_CHANNELS

Pre-defined channel names

retention pydantic-field

retention

Message retention settings

MeetingTypeConfig pydantic-model

Bases: BaseModel

Configuration for a single meeting type.

Maps to the Communication design page meetings.types[]. Exactly one of frequency or trigger must be set.

Attributes:

Name Type Description
name NotBlankStr

Meeting type name (e.g. "daily_standup").

frequency MeetingFrequency | None

Recurrence schedule (mutually exclusive with trigger).

trigger NotBlankStr | None

Event trigger (mutually exclusive with frequency).

participants tuple[NotBlankStr, ...]

Participant role or agent identifiers.

duration_tokens int

Token budget for the meeting.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_frequency_or_trigger
  • _validate_participants

name pydantic-field

name

Meeting type name

frequency pydantic-field

frequency = None

Recurrence schedule

trigger pydantic-field

trigger = None

Event trigger

participants pydantic-field

participants = ()

Participant role or agent identifiers

duration_tokens pydantic-field

duration_tokens = 2000

Token budget for the meeting

protocol_config pydantic-field

protocol_config

Meeting protocol configuration

MeetingsConfig pydantic-model

Bases: BaseModel

Meetings subsystem configuration.

Maps to the Communication design page meetings.

Attributes:

Name Type Description
enabled bool

Whether the meetings subsystem is active.

types tuple[MeetingTypeConfig, ...]

Configured meeting types (unique by name).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_meeting_names

enabled pydantic-field

enabled = True

Meetings subsystem active

types pydantic-field

types = ()

Configured meeting types

HierarchyConfig pydantic-model

Bases: BaseModel

Hierarchy enforcement configuration.

Maps to the Communication design page hierarchy.

Attributes:

Name Type Description
enforce_chain_of_command bool

Whether chain-of-command is enforced.

allow_skip_level bool

Whether skip-level messaging is allowed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

enforce_chain_of_command pydantic-field

enforce_chain_of_command = True

Enforce chain-of-command

allow_skip_level pydantic-field

allow_skip_level = False

Allow skip-level messaging

RateLimitConfig pydantic-model

Bases: BaseModel

Per-pair message rate limit configuration.

Maps to the Communication design page rate_limit.

Attributes:

Name Type Description
max_per_pair_per_minute int

Maximum messages per agent pair per minute.

burst_allowance int

Extra burst capacity above the rate limit.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

max_per_pair_per_minute pydantic-field

max_per_pair_per_minute = 10

Max messages per agent pair per minute

burst_allowance pydantic-field

burst_allowance = 3

Extra burst capacity

CircuitBreakerConfig pydantic-model

Bases: BaseModel

Circuit breaker configuration for agent-pair communication.

Maps to the Communication design page circuit_breaker.

Attributes:

Name Type Description
bounce_threshold int

Bounce count before the circuit opens.

cooldown_seconds int

Seconds to wait before retrying after trip.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

bounce_threshold pydantic-field

bounce_threshold = 3

Bounce count before circuit opens

cooldown_seconds pydantic-field

cooldown_seconds = 300

Cooldown period in seconds

LoopPreventionConfig pydantic-model

Bases: BaseModel

Loop prevention safeguards.

Maps to the Communication design page. ancestry_tracking is always on and cannot be disabled.

Attributes:

Name Type Description
max_delegation_depth int

Hard limit on delegation chain length.

rate_limit RateLimitConfig

Per-pair rate limit settings.

dedup_window_seconds int

Deduplication window in seconds.

circuit_breaker CircuitBreakerConfig

Circuit breaker settings.

ancestry_tracking Literal[True]

Must always be True.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

max_delegation_depth pydantic-field

max_delegation_depth = 5

Hard limit on delegation chain length

rate_limit pydantic-field

rate_limit

Per-pair rate limit settings

dedup_window_seconds pydantic-field

dedup_window_seconds = 60

Deduplication window in seconds

circuit_breaker pydantic-field

circuit_breaker

Circuit breaker settings

ancestry_tracking pydantic-field

ancestry_tracking = True

Task ancestry tracking (always on, not configurable)

CommunicationConfig pydantic-model

Bases: BaseModel

Top-level communication configuration.

Aggregates the Communication design page sections under a single model.

Attributes:

Name Type Description
default_pattern CommunicationPattern

High-level communication pattern.

message_bus MessageBusConfig

Message bus configuration.

meetings MeetingsConfig

Meetings subsystem configuration.

hierarchy HierarchyConfig

Hierarchy enforcement settings.

loop_prevention LoopPreventionConfig

Loop prevention safeguards.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

default_pattern pydantic-field

default_pattern = HYBRID

High-level communication pattern

message_bus pydantic-field

message_bus

Message bus configuration

meetings pydantic-field

meetings

Meetings subsystem configuration

hierarchy pydantic-field

hierarchy

Hierarchy enforcement settings

loop_prevention pydantic-field

loop_prevention

Loop prevention safeguards

conflict_resolution pydantic-field

conflict_resolution

Conflict resolution configuration (see Communication design page)

Errors

errors

Communication error hierarchy (see Communication design page).

All communication errors carry an immutable context mapping for structured metadata, following the same pattern as ToolError.

CommunicationError

CommunicationError(message, *, context=None)

Bases: Exception

Base exception for all communication-layer errors.

Attributes:

Name Type Description
message

Human-readable error description.

context MappingProxyType[str, Any]

Immutable metadata about the error.

Initialize a communication error.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
context dict[str, Any] | None

Arbitrary metadata about the error. Stored as an immutable mapping; defaults to empty if not provided.

None
Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

__str__

__str__()

Format error with optional context metadata.

Source code in src/synthorg/communication/errors.py
def __str__(self) -> str:
    """Format error with optional context metadata."""
    if self.context:
        ctx = ", ".join(f"{k}={v!r}" for k, v in self.context.items())
        return f"{self.message} ({ctx})"
    return self.message

ChannelNotFoundError

ChannelNotFoundError(message, *, context=None)

Bases: CommunicationError

Requested channel does not exist.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ChannelAlreadyExistsError

ChannelAlreadyExistsError(message, *, context=None)

Bases: CommunicationError

Channel with the given name already exists.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

NotSubscribedError

NotSubscribedError(message, *, context=None)

Bases: CommunicationError

Agent is not subscribed to the specified channel.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

MessageBusNotRunningError

MessageBusNotRunningError(message, *, context=None)

Bases: CommunicationError

Operation attempted on a message bus that is not running.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

MessageBusAlreadyRunningError

MessageBusAlreadyRunningError(message, *, context=None)

Bases: CommunicationError

start() called on a message bus that is already running.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationError

DelegationError(message, *, context=None)

Bases: CommunicationError

Base exception for delegation-related errors.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationAuthorityError

DelegationAuthorityError(message, *, context=None)

Bases: DelegationError

Delegator lacks authority to delegate to the target agent.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationLoopError

DelegationLoopError(message, *, context=None)

Bases: DelegationError

Base for loop prevention mechanism rejections.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationDepthError

DelegationDepthError(message, *, context=None)

Bases: DelegationLoopError

Delegation chain exceeds maximum depth.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationAncestryError

DelegationAncestryError(message, *, context=None)

Bases: DelegationLoopError

Delegation would create a cycle in the task ancestry.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationRateLimitError

DelegationRateLimitError(message, *, context=None)

Bases: DelegationLoopError

Delegation rate limit exceeded for agent pair.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationCircuitOpenError

DelegationCircuitOpenError(message, *, context=None)

Bases: DelegationLoopError

Circuit breaker is open for agent pair.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

DelegationDuplicateError

DelegationDuplicateError(message, *, context=None)

Bases: DelegationLoopError

Duplicate delegation detected within dedup window.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

HierarchyResolutionError

HierarchyResolutionError(message, *, context=None)

Bases: CommunicationError

Error resolving organizational hierarchy.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ConflictResolutionError

ConflictResolutionError(message, *, context=None)

Bases: CommunicationError

Base exception for conflict resolution errors.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ConflictStrategyError

ConflictStrategyError(message, *, context=None)

Bases: ConflictResolutionError

Error within a conflict resolution strategy.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

ConflictHierarchyError

ConflictHierarchyError(message, *, context=None)

Bases: ConflictResolutionError

No common manager found for cross-department conflict.

Source code in src/synthorg/communication/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a communication error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(context) if context else {},
    )
    super().__init__(message)

Delegation

models

Delegation request, result, and audit trail models.

DelegationRequest pydantic-model

Bases: BaseModel

Request to delegate a task down the hierarchy.

Attributes:

Name Type Description
delegator_id NotBlankStr

Agent ID of the delegator.

delegatee_id NotBlankStr

Agent ID of the target agent.

task Task

The task to delegate.

refinement str

Additional context from the delegator.

constraints tuple[NotBlankStr, ...]

Extra constraints for the delegatee.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_self_delegation

delegator_id pydantic-field

delegator_id

Agent ID of the delegator

delegatee_id pydantic-field

delegatee_id

Agent ID of the target agent

task pydantic-field

task

Task to delegate

refinement pydantic-field

refinement = ''

Additional context from the delegator

constraints pydantic-field

constraints = ()

Extra constraints for the delegatee

DelegationResult pydantic-model

Bases: BaseModel

Outcome of a delegation attempt.

Attributes:

Name Type Description
success bool

Whether the delegation succeeded.

delegated_task Task | None

The sub-task created, if successful.

rejection_reason str | None

Reason for rejection, if unsuccessful.

blocked_by NotBlankStr | None

Mechanism name that blocked, if applicable.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_success_consistency

success pydantic-field

success

Whether delegation succeeded

delegated_task pydantic-field

delegated_task = None

Sub-task created on success

rejection_reason pydantic-field

rejection_reason = None

Reason for rejection

blocked_by pydantic-field

blocked_by = None

Mechanism name that blocked delegation

DelegationRecord pydantic-model

Bases: BaseModel

Audit trail entry for a completed delegation.

Attributes:

Name Type Description
delegation_id NotBlankStr

Unique delegation identifier.

delegator_id NotBlankStr

Agent ID of the delegator.

delegatee_id NotBlankStr

Agent ID of the delegatee.

original_task_id NotBlankStr

ID of the original task.

delegated_task_id NotBlankStr

ID of the created sub-task.

timestamp AwareDatetime

When the delegation occurred.

refinement str

Context provided by the delegator.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

delegation_id pydantic-field

delegation_id

Unique delegation identifier

delegator_id pydantic-field

delegator_id

Delegator agent ID

delegatee_id pydantic-field

delegatee_id

Delegatee agent ID

original_task_id pydantic-field

original_task_id

Original task ID

delegated_task_id pydantic-field

delegated_task_id

Created sub-task ID

timestamp pydantic-field

timestamp

When delegation occurred

refinement pydantic-field

refinement = ''

Context provided by delegator

service

Delegation service orchestrating hierarchy, authority, and loop prevention.

DelegationService

DelegationService(*, hierarchy, authority_validator, guard, record_store=None)

Orchestrates hierarchical delegation with loop prevention.

Validates authority, checks loop prevention guards, creates sub-tasks, and records audit trail entries. The core logic is synchronous (CPU-only); messaging is a separate async concern.

Parameters:

Name Type Description Default
hierarchy HierarchyResolver

Resolved organizational hierarchy.

required
authority_validator AuthorityValidator

Authority validation logic.

required
guard DelegationGuard

Loop prevention guard.

required
record_store DelegationRecordStore | None

Optional delegation record store for activity tracking.

None
Source code in src/synthorg/communication/delegation/service.py
def __init__(
    self,
    *,
    hierarchy: HierarchyResolver,
    authority_validator: AuthorityValidator,
    guard: DelegationGuard,
    record_store: DelegationRecordStore | None = None,
) -> None:
    self._hierarchy = hierarchy
    self._authority_validator = authority_validator
    self._guard = guard
    self._record_store = record_store
    self._audit_trail: list[DelegationRecord] = []

delegate

delegate(request, delegator, delegatee)

Execute a delegation: authority, loops, sub-task, audit.

Parameters:

Name Type Description Default
request DelegationRequest

The delegation request.

required
delegator AgentIdentity

Identity of the delegating agent.

required
delegatee AgentIdentity

Identity of the target agent.

required

Returns:

Type Description
DelegationResult

Result indicating success or rejection with reason.

Raises:

Type Description
ValueError

If request IDs do not match identity objects.

DelegationError

If sub-task construction fails.

Source code in src/synthorg/communication/delegation/service.py
def delegate(
    self,
    request: DelegationRequest,
    delegator: AgentIdentity,
    delegatee: AgentIdentity,
) -> DelegationResult:
    """Execute a delegation: authority, loops, sub-task, audit.

    Args:
        request: The delegation request.
        delegator: Identity of the delegating agent.
        delegatee: Identity of the target agent.

    Returns:
        Result indicating success or rejection with reason.

    Raises:
        ValueError: If request IDs do not match identity objects.
        DelegationError: If sub-task construction fails.
    """
    self._validate_identity(request, delegator, delegatee)

    logger.info(
        DELEGATION_REQUESTED,
        delegator=request.delegator_id,
        delegatee=request.delegatee_id,
        task_id=request.task.id,
    )

    # 1. Authority check
    auth_result = self._authority_validator.validate(delegator, delegatee)
    if not auth_result.allowed:
        return DelegationResult(
            success=False,
            rejection_reason=auth_result.reason,
            blocked_by="authority",
        )

    # 2. Loop prevention checks
    guard_outcome = self._guard.check(
        delegation_chain=request.task.delegation_chain,
        delegator_id=request.delegator_id,
        delegatee_id=request.delegatee_id,
        task_id=request.task.id,
    )
    if not guard_outcome.passed:
        self._escalate_loop_detection(request, guard_outcome.mechanism)
        return DelegationResult(
            success=False,
            rejection_reason=guard_outcome.message,
            blocked_by=guard_outcome.mechanism,
        )

    # 3. Create sub-task and record
    sub_task = self._create_sub_task(request)
    self._record_delegation(request, sub_task)

    return DelegationResult(success=True, delegated_task=sub_task)

get_audit_trail

get_audit_trail()

Return all delegation audit records.

Returns:

Type Description
tuple[DelegationRecord, ...]

Tuple of delegation records in chronological order.

Source code in src/synthorg/communication/delegation/service.py
def get_audit_trail(self) -> tuple[DelegationRecord, ...]:
    """Return all delegation audit records.

    Returns:
        Tuple of delegation records in chronological order.
    """
    return tuple(self._audit_trail)

get_supervisor_of

get_supervisor_of(agent_name)

Expose hierarchy lookup for escalation callers.

Parameters:

Name Type Description Default
agent_name str

Agent name to look up.

required

Returns:

Type Description
str | None

Supervisor name or None if at the top.

Source code in src/synthorg/communication/delegation/service.py
def get_supervisor_of(self, agent_name: str) -> str | None:
    """Expose hierarchy lookup for escalation callers.

    Args:
        agent_name: Agent name to look up.

    Returns:
        Supervisor name or None if at the top.
    """
    return self._hierarchy.get_supervisor(agent_name)

authority

Authority validation for hierarchical delegation.

AuthorityCheckResult pydantic-model

Bases: BaseModel

Result of an authority validation check.

Attributes:

Name Type Description
allowed bool

Whether the delegation is authorized.

reason str

Explanation (empty on success).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_allowed_reason

allowed pydantic-field

allowed

Whether delegation is allowed

reason pydantic-field

reason = ''

Explanation

AuthorityValidator

AuthorityValidator(hierarchy, hierarchy_config)

Validates delegation authority using hierarchy and role permissions.

Checks
  1. Hierarchy: delegatee must be a subordinate of delegator (direct or skip-level depending on config).
  2. Roles: if delegator.authority.can_delegate_to is non-empty, delegatee.role must be in it; if empty, all roles are permitted.

Parameters:

Name Type Description Default
hierarchy HierarchyResolver

Resolved org hierarchy.

required
hierarchy_config HierarchyConfig

Hierarchy enforcement configuration.

required
Source code in src/synthorg/communication/delegation/authority.py
def __init__(
    self,
    hierarchy: HierarchyResolver,
    hierarchy_config: HierarchyConfig,
) -> None:
    self._hierarchy = hierarchy
    self._config = hierarchy_config

validate

validate(delegator, delegatee)

Validate whether delegator can delegate to delegatee.

Parameters:

Name Type Description Default
delegator AgentIdentity

Identity of the delegating agent.

required
delegatee AgentIdentity

Identity of the target agent.

required

Returns:

Type Description
AuthorityCheckResult

Result indicating whether delegation is authorized.

Source code in src/synthorg/communication/delegation/authority.py
def validate(
    self,
    delegator: AgentIdentity,
    delegatee: AgentIdentity,
) -> AuthorityCheckResult:
    """Validate whether delegator can delegate to delegatee.

    Args:
        delegator: Identity of the delegating agent.
        delegatee: Identity of the target agent.

    Returns:
        Result indicating whether delegation is authorized.
    """
    if self._config.enforce_chain_of_command:
        result = self._check_hierarchy(delegator, delegatee)
        if not result.allowed:
            return result

    result = self._check_role_permissions(delegator, delegatee)
    if not result.allowed:
        return result

    logger.info(
        DELEGATION_AUTHORIZED,
        delegator=delegator.name,
        delegatee=delegatee.name,
    )
    return AuthorityCheckResult(allowed=True)

Loop Prevention

guard

Delegation guard orchestrating all loop prevention mechanisms.

DelegationGuard

DelegationGuard(config)

Orchestrates all loop prevention mechanisms.

Checks run in order: ancestry, depth, dedup, rate_limit, circuit_breaker. Short-circuits on the first failure.

Parameters:

Name Type Description Default
config LoopPreventionConfig

Loop prevention configuration.

required
Source code in src/synthorg/communication/loop_prevention/guard.py
def __init__(self, config: LoopPreventionConfig) -> None:
    self._config = config
    self._deduplicator = DelegationDeduplicator(
        window_seconds=config.dedup_window_seconds,
    )
    self._rate_limiter = DelegationRateLimiter(config.rate_limit)
    self._circuit_breaker = DelegationCircuitBreaker(
        config.circuit_breaker,
    )

check

check(delegation_chain, delegator_id, delegatee_id, task_id)

Run all loop prevention checks.

Returns the first failing outcome, or a success outcome if all checks pass.

Parameters:

Name Type Description Default
delegation_chain tuple[str, ...]

Current delegation ancestry.

required
delegator_id str

ID of the delegating agent.

required
delegatee_id str

ID of the proposed delegatee.

required
task_id str

Unique ID of the task being delegated.

required

Returns:

Type Description
GuardCheckOutcome

First failing outcome or an all-passed success.

Source code in src/synthorg/communication/loop_prevention/guard.py
def check(
    self,
    delegation_chain: tuple[str, ...],
    delegator_id: str,
    delegatee_id: str,
    task_id: str,
) -> GuardCheckOutcome:
    """Run all loop prevention checks.

    Returns the first failing outcome, or a success outcome if
    all checks pass.

    Args:
        delegation_chain: Current delegation ancestry.
        delegator_id: ID of the delegating agent.
        delegatee_id: ID of the proposed delegatee.
        task_id: Unique ID of the task being delegated.

    Returns:
        First failing outcome or an all-passed success.
    """
    # Pure (stateless) checks first -- sequential to short-circuit
    outcome = check_ancestry(delegation_chain, delegatee_id)
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    outcome = check_delegation_depth(
        delegation_chain,
        self._config.max_delegation_depth,
    )
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    # Stateful checks -- only run if pure checks passed
    outcome = self._deduplicator.check(
        delegator_id,
        delegatee_id,
        task_id,
    )
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    outcome = self._rate_limiter.check(delegator_id, delegatee_id)
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)

    outcome = self._circuit_breaker.check(delegator_id, delegatee_id)
    if not outcome.passed:
        return self._log_and_return(outcome, delegator_id, delegatee_id)
    return GuardCheckOutcome(
        passed=True,
        mechanism=_SUCCESS_MECHANISM,
    )

record_delegation

record_delegation(delegator_id, delegatee_id, task_id)

Record a successful delegation in all stateful mechanisms.

Each delegation between a pair contributes to the circuit breaker bounce count. Back-and-forth patterns (A→B then B→A) both increment the same counter because the pair key is direction- agnostic, so repeated ping-pong will trip the breaker fastest.

Parameters:

Name Type Description Default
delegator_id str

ID of the delegating agent.

required
delegatee_id str

ID of the target agent.

required
task_id str

Unique ID of the delegated task.

required
Source code in src/synthorg/communication/loop_prevention/guard.py
def record_delegation(
    self,
    delegator_id: str,
    delegatee_id: str,
    task_id: str,
) -> None:
    """Record a successful delegation in all stateful mechanisms.

    Each delegation between a pair contributes to the circuit breaker
    bounce count.  Back-and-forth patterns (A→B then B→A) both
    increment the same counter because the pair key is direction-
    agnostic, so repeated ping-pong will trip the breaker fastest.

    Args:
        delegator_id: ID of the delegating agent.
        delegatee_id: ID of the target agent.
        task_id: Unique ID of the delegated task.
    """
    self._deduplicator.record(delegator_id, delegatee_id, task_id)
    self._rate_limiter.record(delegator_id, delegatee_id)
    self._circuit_breaker.record_delegation(delegator_id, delegatee_id)

models

Loop prevention check outcome model.

GuardCheckOutcome pydantic-model

Bases: BaseModel

Result of a single loop prevention check.

Attributes:

Name Type Description
passed bool

Whether the check passed (delegation allowed).

mechanism NotBlankStr

Name of the mechanism that produced this outcome.

message str

Human-readable detail (empty on success).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_passed_message

passed pydantic-field

passed

Whether the check passed

mechanism pydantic-field

mechanism

Mechanism name (e.g. 'max_depth', 'ancestry')

message pydantic-field

message = ''

Human-readable detail

Conflict Resolution

protocol

Conflict resolution protocol interfaces (see Communication design page).

Defines the pluggable strategy interface that varies per resolution approach (resolve + build_dissent_records). Detection logic lives on the service, not the protocol, because it is strategy-agnostic.

JudgeDecision

Bases: NamedTuple

Result of an LLM-based judge evaluation.

Attributes:

Name Type Description
winning_agent_id str

Agent whose position was chosen.

reasoning str

Explanation for the decision.

ConflictResolver

Bases: Protocol

Protocol for conflict resolution strategies.

Each strategy implements resolve (async, may need LLM calls) and build_dissent_records (sync, builds audit artifacts for every overruled position).

resolve async

resolve(conflict)

Resolve a conflict and produce a decision.

Parameters:

Name Type Description Default
conflict Conflict

The conflict to resolve.

required

Returns:

Type Description
ConflictResolution

Resolution decision.

Source code in src/synthorg/communication/conflict_resolution/protocol.py
async def resolve(self, conflict: Conflict) -> ConflictResolution:
    """Resolve a conflict and produce a decision.

    Args:
        conflict: The conflict to resolve.

    Returns:
        Resolution decision.
    """
    ...

build_dissent_records

build_dissent_records(conflict, resolution)

Build audit records for all losing positions.

For N-party conflicts, produces one DissentRecord per overruled agent. For escalated conflicts, produces one record per position (all are pending human review).

Parameters:

Name Type Description Default
conflict Conflict

The original conflict.

required
resolution ConflictResolution

The resolution decision.

required

Returns:

Type Description
tuple[DissentRecord, ...]

Dissent records preserving every overruled reasoning.

Source code in src/synthorg/communication/conflict_resolution/protocol.py
def build_dissent_records(
    self,
    conflict: Conflict,
    resolution: ConflictResolution,
) -> tuple[DissentRecord, ...]:
    """Build audit records for all losing positions.

    For N-party conflicts, produces one ``DissentRecord`` per
    overruled agent.  For escalated conflicts, produces one
    record per position (all are pending human review).

    Args:
        conflict: The original conflict.
        resolution: The resolution decision.

    Returns:
        Dissent records preserving every overruled reasoning.
    """
    ...

JudgeEvaluator

Bases: Protocol

Protocol for LLM-based judge evaluation.

Used by debate and hybrid strategies. When absent, strategies fall back to authority-based judging.

evaluate async

evaluate(conflict, judge_agent_id)

Evaluate conflict positions and pick a winner.

Parameters:

Name Type Description Default
conflict Conflict

The conflict with agent positions.

required
judge_agent_id NotBlankStr

The agent acting as judge.

required

Returns:

Type Description
JudgeDecision

Decision containing the winning agent ID and reasoning.

Source code in src/synthorg/communication/conflict_resolution/protocol.py
async def evaluate(
    self,
    conflict: Conflict,
    judge_agent_id: NotBlankStr,
) -> JudgeDecision:
    """Evaluate conflict positions and pick a winner.

    Args:
        conflict: The conflict with agent positions.
        judge_agent_id: The agent acting as judge.

    Returns:
        Decision containing the winning agent ID and reasoning.
    """
    ...

config

Conflict resolution configuration models (see Communication design page).

DebateConfig pydantic-model

Bases: BaseModel

Configuration for the structured debate strategy.

Attributes:

Name Type Description
judge NotBlankStr

Judge selection -- "shared_manager" (lowest common manager), "ceo" (hierarchy root), or a named agent.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

judge pydantic-field

judge = 'shared_manager'

Judge selection: "shared_manager", "ceo", or agent name

HybridConfig pydantic-model

Bases: BaseModel

Configuration for the hybrid resolution strategy.

Attributes:

Name Type Description
review_agent NotBlankStr

Agent tasked with reviewing positions.

escalate_on_ambiguity bool

Whether to escalate to human when the review result is ambiguous.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

review_agent pydantic-field

review_agent = 'conflict_reviewer'

Agent tasked with reviewing positions

escalate_on_ambiguity pydantic-field

escalate_on_ambiguity = True

Escalate to human when ambiguous

ConflictResolutionConfig pydantic-model

Bases: BaseModel

Top-level conflict resolution configuration.

Attributes:

Name Type Description
strategy ConflictResolutionStrategy

Default resolution strategy.

debate DebateConfig

Configuration for the debate strategy.

hybrid HybridConfig

Configuration for the hybrid strategy.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

strategy pydantic-field

strategy = AUTHORITY

Default resolution strategy

debate pydantic-field

debate

Debate strategy configuration

hybrid pydantic-field

hybrid

Hybrid strategy configuration

models

Conflict resolution domain models (see Communication design page).

All models are frozen Pydantic v2 with NotBlankStr identifiers, following the patterns established in delegation/models.py.

ConflictResolutionOutcome

Bases: StrEnum

Outcome of a conflict resolution attempt.

Members

RESOLVED_BY_AUTHORITY: Decided by seniority/hierarchy. RESOLVED_BY_DEBATE: Decided by structured debate + judge. RESOLVED_BY_HYBRID: Decided by hybrid review process. ESCALATED_TO_HUMAN: Escalated to human for resolution.

ConflictPosition pydantic-model

Bases: BaseModel

One agent's stance in a conflict.

Attributes:

Name Type Description
agent_id NotBlankStr

Identifier of the agent taking the position.

agent_department NotBlankStr

Department the agent belongs to.

agent_level SeniorityLevel

Seniority level of the agent.

position NotBlankStr

Summary of the agent's stance.

reasoning NotBlankStr

Detailed justification for the position.

timestamp AwareDatetime

When the position was stated.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent taking the position

agent_department pydantic-field

agent_department

Agent's department

agent_level pydantic-field

agent_level

Agent seniority level

position pydantic-field

position

Summary of the stance

reasoning pydantic-field

reasoning

Justification for the position

timestamp pydantic-field

timestamp

When position was stated

Conflict pydantic-model

Bases: BaseModel

A dispute between two or more agents.

Attributes:

Name Type Description
id NotBlankStr

Unique conflict identifier (e.g. "conflict-a1b2c3d4e5f6").

type ConflictType

Category of the conflict.

task_id NotBlankStr | None

Related task, if any.

subject NotBlankStr

Brief description of the dispute.

positions tuple[ConflictPosition, ...]

Agent positions (minimum 2, unique agent IDs).

detected_at AwareDatetime

When the conflict was detected.

is_cross_department bool

Whether agents span multiple departments (computed from positions).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_positions

id pydantic-field

id

Unique conflict identifier

type pydantic-field

type

Conflict category

task_id pydantic-field

task_id = None

Related task ID

subject pydantic-field

subject

Brief dispute description

positions pydantic-field

positions

Agent positions (min 2)

detected_at pydantic-field

detected_at

Detection timestamp

is_cross_department property

is_cross_department

Whether the conflict spans multiple departments.

ConflictResolution pydantic-model

Bases: BaseModel

Decision produced by a conflict resolution strategy.

Attributes:

Name Type Description
conflict_id NotBlankStr

ID of the resolved conflict.

outcome ConflictResolutionOutcome

How the conflict was resolved.

winning_agent_id NotBlankStr | None

Agent whose position was chosen (None if escalated).

winning_position NotBlankStr | None

The winning position text (None if escalated).

decided_by NotBlankStr

Entity that made the decision (agent name or "human").

reasoning NotBlankStr

Explanation for the decision.

resolved_at AwareDatetime

When the resolution was produced.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_outcome_consistency

conflict_id pydantic-field

conflict_id

Resolved conflict ID

outcome pydantic-field

outcome

Resolution outcome

winning_agent_id pydantic-field

winning_agent_id = None

Winning agent (None if escalated)

winning_position pydantic-field

winning_position = None

Winning position text (None if escalated)

decided_by pydantic-field

decided_by

Decision maker

reasoning pydantic-field

reasoning

Decision explanation

resolved_at pydantic-field

resolved_at

Resolution timestamp

DissentRecord pydantic-model

Bases: BaseModel

Audit artifact for a resolved conflict.

Preserves the losing agent's reasoning for organizational learning.

Attributes:

Name Type Description
id NotBlankStr

Unique dissent record identifier.

conflict Conflict

The original conflict.

resolution ConflictResolution

The resolution decision.

dissenting_agent_id NotBlankStr

Agent whose position was overruled.

dissenting_position NotBlankStr

The overruled position text.

strategy_used ConflictResolutionStrategy

Strategy that was used.

timestamp AwareDatetime

When the record was created.

metadata tuple[tuple[NotBlankStr, NotBlankStr], ...]

Extra key-value metadata pairs.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_dissent_consistency

id pydantic-field

id

Unique dissent record ID

conflict pydantic-field

conflict

Original conflict

resolution pydantic-field

resolution

Resolution decision

dissenting_agent_id pydantic-field

dissenting_agent_id

Agent whose position was overruled

dissenting_position pydantic-field

dissenting_position

Overruled position text

strategy_used pydantic-field

strategy_used

Strategy that resolved the conflict

timestamp pydantic-field

timestamp

Record creation timestamp

metadata pydantic-field

metadata = ()

Extra key-value metadata pairs

service

Conflict resolution service orchestrator (see Communication design page).

Follows the DelegationService pattern: __slots__, keyword-only constructor, audit trail list, structured logging.

ConflictResolutionService

ConflictResolutionService(*, config, resolvers)

Orchestrates conflict detection, resolution, and audit.

Selects the configured strategy, delegates to the resolver, builds dissent records for all overruled positions, and maintains an audit trail.

Parameters:

Name Type Description Default
config ConflictResolutionConfig

Conflict resolution configuration.

required
resolvers Mapping[ConflictResolutionStrategy, ConflictResolver]

Strategy → resolver mapping.

required
Source code in src/synthorg/communication/conflict_resolution/service.py
def __init__(
    self,
    *,
    config: ConflictResolutionConfig,
    resolvers: Mapping[ConflictResolutionStrategy, ConflictResolver],
) -> None:
    self._config = config
    self._resolvers: MappingProxyType[
        ConflictResolutionStrategy, ConflictResolver
    ] = MappingProxyType(dict(resolvers))
    self._audit_trail: list[DissentRecord] = []

create_conflict

create_conflict(*, conflict_type, subject, positions, task_id=None)

Create a conflict from agent positions.

Validates minimum positions and unique agent IDs, and generates an ID.

Parameters:

Name Type Description Default
conflict_type ConflictType

Category of the conflict.

required
subject NotBlankStr

Brief description of the dispute.

required
positions Sequence[ConflictPosition]

Agent positions (minimum 2).

required
task_id NotBlankStr | None

Related task ID, if any.

None

Returns:

Type Description
Conflict

New Conflict instance.

Raises:

Type Description
ConflictResolutionError

If fewer than 2 positions or duplicate agent IDs.

Source code in src/synthorg/communication/conflict_resolution/service.py
def create_conflict(
    self,
    *,
    conflict_type: ConflictType,
    subject: NotBlankStr,
    positions: Sequence[ConflictPosition],
    task_id: NotBlankStr | None = None,
) -> Conflict:
    """Create a conflict from agent positions.

    Validates minimum positions and unique agent IDs,
    and generates an ID.

    Args:
        conflict_type: Category of the conflict.
        subject: Brief description of the dispute.
        positions: Agent positions (minimum 2).
        task_id: Related task ID, if any.

    Returns:
        New Conflict instance.

    Raises:
        ConflictResolutionError: If fewer than 2 positions or
            duplicate agent IDs.
    """
    if len(positions) < _MIN_POSITIONS:
        msg = "A conflict requires at least 2 positions"
        logger.warning(
            CONFLICT_VALIDATION_ERROR,
            error=msg,
            position_count=len(positions),
        )
        raise ConflictResolutionError(msg)

    agent_ids = [p.agent_id for p in positions]
    if len(agent_ids) != len(set(agent_ids)):
        msg = "Duplicate agent_id in conflict positions"
        logger.warning(
            CONFLICT_VALIDATION_ERROR,
            error=msg,
            agent_ids=agent_ids,
        )
        raise ConflictResolutionError(msg)

    conflict = Conflict(
        id=f"conflict-{uuid4().hex[:12]}",
        type=conflict_type,
        task_id=task_id,
        subject=subject,
        positions=tuple(positions),
        detected_at=datetime.now(UTC),
    )

    logger.info(
        CONFLICT_DETECTED,
        conflict_id=conflict.id,
        conflict_type=conflict.type,
        subject=conflict.subject,
        is_cross_department=conflict.is_cross_department,
        agent_count=len(positions),
    )

    return conflict

resolve async

resolve(conflict)

Resolve a conflict using the configured strategy.

Parameters:

Name Type Description Default
conflict Conflict

The conflict to resolve.

required

Returns:

Type Description
tuple[ConflictResolution, tuple[DissentRecord, ...]]

Tuple of (resolution, dissent_records).

Raises:

Type Description
ConflictResolutionError

If the configured strategy has no registered resolver.

Source code in src/synthorg/communication/conflict_resolution/service.py
async def resolve(
    self,
    conflict: Conflict,
) -> tuple[ConflictResolution, tuple[DissentRecord, ...]]:
    """Resolve a conflict using the configured strategy.

    Args:
        conflict: The conflict to resolve.

    Returns:
        Tuple of ``(resolution, dissent_records)``.

    Raises:
        ConflictResolutionError: If the configured strategy has
            no registered resolver.
    """
    strategy = self._config.strategy
    resolver = self._resolvers.get(strategy)
    if resolver is None:
        msg = f"No resolver registered for strategy {strategy!r}"
        logger.warning(
            CONFLICT_NO_RESOLVER,
            strategy=strategy,
            error=msg,
        )
        raise ConflictResolutionError(
            msg,
            context={"strategy": strategy},
        )

    logger.info(
        CONFLICT_RESOLUTION_STARTED,
        conflict_id=conflict.id,
        strategy=strategy,
    )

    try:
        resolution = await resolver.resolve(conflict)
    except Exception:
        logger.exception(
            CONFLICT_RESOLUTION_FAILED,
            conflict_id=conflict.id,
            strategy=strategy,
        )
        raise

    dissent_records = resolver.build_dissent_records(conflict, resolution)
    self._audit_trail.extend(dissent_records)

    logger.info(
        CONFLICT_RESOLVED,
        conflict_id=conflict.id,
        outcome=resolution.outcome,
        winning_agent_id=resolution.winning_agent_id,
    )
    for record in dissent_records:
        logger.info(
            CONFLICT_DISSENT_RECORDED,
            dissent_id=record.id,
            conflict_id=conflict.id,
            dissenting_agent=record.dissenting_agent_id,
        )

    return resolution, dissent_records

get_dissent_records

get_dissent_records()

Return all dissent records.

Returns:

Type Description
tuple[DissentRecord, ...]

Tuple of dissent records in chronological order.

Source code in src/synthorg/communication/conflict_resolution/service.py
def get_dissent_records(self) -> tuple[DissentRecord, ...]:
    """Return all dissent records.

    Returns:
        Tuple of dissent records in chronological order.
    """
    return tuple(self._audit_trail)

query_dissent_records

query_dissent_records(*, agent_id=None, conflict_type=None, strategy=None, since=None)

Query dissent records with optional filters.

All filters are combined with AND logic.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by dissenting agent ID.

None
conflict_type ConflictType | None

Filter by conflict type.

None
strategy ConflictResolutionStrategy | None

Filter by strategy used.

None
since AwareDatetime | None

Filter by records after this timestamp (must be timezone-aware).

None

Returns:

Type Description
tuple[DissentRecord, ...]

Matching dissent records.

Source code in src/synthorg/communication/conflict_resolution/service.py
def query_dissent_records(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    conflict_type: ConflictType | None = None,
    strategy: ConflictResolutionStrategy | None = None,
    since: AwareDatetime | None = None,
) -> tuple[DissentRecord, ...]:
    """Query dissent records with optional filters.

    All filters are combined with AND logic.

    Args:
        agent_id: Filter by dissenting agent ID.
        conflict_type: Filter by conflict type.
        strategy: Filter by strategy used.
        since: Filter by records after this timestamp
            (must be timezone-aware).

    Returns:
        Matching dissent records.
    """
    logger.debug(
        CONFLICT_DISSENT_QUERIED,
        agent_id=agent_id,
        conflict_type=conflict_type,
        strategy=strategy,
        since=str(since) if since else None,
    )

    return tuple(
        r
        for r in self._audit_trail
        if (agent_id is None or r.dissenting_agent_id == agent_id)
        and (conflict_type is None or r.conflict.type == conflict_type)
        and (strategy is None or r.strategy_used == strategy)
        and (since is None or r.timestamp >= since)
    )

Meeting Protocol

protocol

Meeting protocol interface (see Communication design page).

Defines the MeetingProtocol protocol, the ConflictDetector protocol, and the AgentCaller type alias used to invoke agents during a meeting without coupling to the engine layer.

AgentCaller module-attribute

AgentCaller = Callable[[str, str, int], Awaitable[AgentResponse]]

Callback to invoke an agent during a meeting.

Signature: (agent_id, prompt, max_tokens) -> AgentResponse

The orchestrator constructs this from the engine layer, decoupling protocol implementations from the execution engine.

TaskCreator module-attribute

TaskCreator = Callable[[str, str | None, Priority], None]

Callback to create a task from a meeting action item.

Signature: (description, assignee_id, priority: Priority) -> None

Used by the orchestrator to optionally create tasks from extracted action items.

ConflictDetector

Bases: Protocol

Strategy for detecting conflicts in agent responses.

Used by StructuredPhasesProtocol to determine whether a discussion round is needed. The default implementation uses keyword matching; alternative implementations might use structured JSON output or tool calling for more robust detection.

detect

detect(response_content)

Determine whether the response indicates conflicts.

Parameters:

Name Type Description Default
response_content str

The conflict-check agent response text.

required

Returns:

Type Description
bool

True if conflicts were detected, False otherwise.

Source code in src/synthorg/communication/meeting/protocol.py
def detect(self, response_content: str) -> bool:
    """Determine whether the response indicates conflicts.

    Args:
        response_content: The conflict-check agent response text.

    Returns:
        True if conflicts were detected, False otherwise.
    """
    ...

MeetingProtocol

Bases: Protocol

Strategy interface for meeting protocol implementations.

Each implementation defines a different structure for how agents interact during a meeting (round-robin turns, parallel position papers, structured phases with discussion).

run async

run(*, meeting_id, agenda, leader_id, participant_ids, agent_caller, token_budget)

Execute the meeting protocol and produce minutes.

Parameters:

Name Type Description Default
meeting_id str

Unique identifier for this meeting.

required
agenda MeetingAgenda

The meeting agenda.

required
leader_id str

ID of the agent leading the meeting.

required
participant_ids tuple[str, ...]

IDs of participating agents.

required
agent_caller AgentCaller

Callback to invoke agents.

required
token_budget int

Maximum tokens for the entire meeting.

required

Returns:

Type Description
MeetingMinutes

Complete meeting minutes.

Source code in src/synthorg/communication/meeting/protocol.py
async def run(  # noqa: PLR0913
    self,
    *,
    meeting_id: str,
    agenda: MeetingAgenda,
    leader_id: str,
    participant_ids: tuple[str, ...],
    agent_caller: AgentCaller,
    token_budget: int,
) -> MeetingMinutes:
    """Execute the meeting protocol and produce minutes.

    Args:
        meeting_id: Unique identifier for this meeting.
        agenda: The meeting agenda.
        leader_id: ID of the agent leading the meeting.
        participant_ids: IDs of participating agents.
        agent_caller: Callback to invoke agents.
        token_budget: Maximum tokens for the entire meeting.

    Returns:
        Complete meeting minutes.
    """
    ...

get_protocol_type

get_protocol_type()

Return the protocol type this implementation handles.

Returns:

Type Description
MeetingProtocolType

The meeting protocol type enum value.

Source code in src/synthorg/communication/meeting/protocol.py
def get_protocol_type(self) -> MeetingProtocolType:
    """Return the protocol type this implementation handles.

    Returns:
        The meeting protocol type enum value.
    """
    ...

config

Meeting protocol configuration models (see Communication design page).

RoundRobinConfig pydantic-model

Bases: BaseModel

Configuration for the round-robin meeting protocol.

Attributes:

Name Type Description
max_turns_per_agent int

Maximum turns each agent may take.

max_total_turns int

Hard cap on total turns across all agents.

leader_summarizes bool

Whether the leader produces a final summary.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

max_turns_per_agent pydantic-field

max_turns_per_agent = 2

Maximum turns each agent may take

max_total_turns pydantic-field

max_total_turns = 16

Hard cap on total turns across all agents

leader_summarizes pydantic-field

leader_summarizes = True

Whether the leader produces a final summary

PositionPapersConfig pydantic-model

Bases: BaseModel

Configuration for the position-papers meeting protocol.

Attributes:

Name Type Description
max_tokens_per_position int

Token budget per position paper.

synthesizer NotBlankStr

Who performs synthesis. The sentinel "meeting_leader" resolves to the meeting leader at runtime; otherwise interpreted as a specific agent ID.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

max_tokens_per_position pydantic-field

max_tokens_per_position = 300

Token budget per position paper

synthesizer pydantic-field

synthesizer = 'meeting_leader'

Who performs synthesis (meeting_leader or agent ID)

StructuredPhasesConfig pydantic-model

Bases: BaseModel

Configuration for the structured-phases meeting protocol.

Attributes:

Name Type Description
skip_discussion_if_no_conflicts bool

Skip discussion when no conflicts are detected.

max_discussion_tokens int

Token budget for the discussion round.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

skip_discussion_if_no_conflicts pydantic-field

skip_discussion_if_no_conflicts = True

Skip discussion when no conflicts detected

max_discussion_tokens pydantic-field

max_discussion_tokens = 1000

Token budget for discussion round

MeetingProtocolConfig pydantic-model

Bases: BaseModel

Top-level meeting protocol configuration.

Selects which protocol strategy to use and carries the per-protocol settings.

Attributes:

Name Type Description
protocol MeetingProtocolType

Which protocol strategy to use.

auto_create_tasks bool

Whether to auto-create tasks from action items extracted during any protocol execution.

round_robin RoundRobinConfig

Round-robin protocol settings.

position_papers PositionPapersConfig

Position-papers protocol settings.

structured_phases StructuredPhasesConfig

Structured-phases protocol settings.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

protocol pydantic-field

protocol = ROUND_ROBIN

Which protocol strategy to use

auto_create_tasks pydantic-field

auto_create_tasks = True

Auto-create tasks from action items

round_robin pydantic-field

round_robin

Round-robin protocol settings

position_papers pydantic-field

position_papers

Position-papers protocol settings

structured_phases pydantic-field

structured_phases

Structured-phases protocol settings

models

Meeting protocol domain models (see Communication design page).

AgentResponse pydantic-model

Bases: BaseModel

Result of a single agent invocation during a meeting.

Attributes:

Name Type Description
agent_id NotBlankStr

Identifier of the agent that responded.

content str

Text content of the response.

input_tokens int

Tokens consumed by the prompt.

output_tokens int

Tokens generated in the response.

cost_usd float

Estimated cost of the invocation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent that responded

content pydantic-field

content

Response content

input_tokens pydantic-field

input_tokens = 0

Prompt tokens consumed

output_tokens pydantic-field

output_tokens = 0

Response tokens generated

cost_usd pydantic-field

cost_usd = 0.0

Estimated invocation cost

MeetingAgendaItem pydantic-model

Bases: BaseModel

A single topic on the meeting agenda.

Attributes:

Name Type Description
title NotBlankStr

Short title of the agenda topic.

description str

Detailed description of the topic.

presenter_id NotBlankStr | None

Agent who presents this item (optional).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

title pydantic-field

title

Agenda topic title

description pydantic-field

description = ''

Detailed topic description

presenter_id pydantic-field

presenter_id = None

Agent who presents this item

MeetingAgenda pydantic-model

Bases: BaseModel

Full meeting agenda.

Attributes:

Name Type Description
title NotBlankStr

Meeting title.

context str

Background context for the meeting.

items tuple[MeetingAgendaItem, ...]

Ordered agenda items.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

title pydantic-field

title

Meeting title

context pydantic-field

context = ''

Background context for the meeting

items pydantic-field

items = ()

Ordered agenda items

MeetingContribution pydantic-model

Bases: BaseModel

An agent's contribution during a meeting phase.

Attributes:

Name Type Description
agent_id NotBlankStr

Identifier of the contributing agent.

content str

Text content of the contribution.

phase MeetingPhase

Meeting phase during which this was contributed.

turn_number int

Turn number within the meeting.

input_tokens int

Prompt tokens consumed.

output_tokens int

Response tokens generated.

timestamp AwareDatetime

When the contribution was recorded.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Contributing agent

content pydantic-field

content

Contribution content

phase pydantic-field

phase

Phase of contribution

turn_number pydantic-field

turn_number

Turn number

input_tokens pydantic-field

input_tokens = 0

Prompt tokens consumed

output_tokens pydantic-field

output_tokens = 0

Response tokens generated

timestamp pydantic-field

timestamp

When recorded

ActionItem pydantic-model

Bases: BaseModel

An action item extracted from meeting decisions.

Attributes:

Name Type Description
description NotBlankStr

What needs to be done.

assignee_id NotBlankStr | None

Agent responsible for the action.

priority Priority

Urgency of the action item.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

description pydantic-field

description

What needs to be done

assignee_id pydantic-field

assignee_id = None

Agent responsible for the action

priority pydantic-field

priority = MEDIUM

Action item priority

MeetingMinutes pydantic-model

Bases: BaseModel

Complete output of a meeting protocol execution.

Attributes:

Name Type Description
meeting_id NotBlankStr

Unique meeting identifier.

protocol_type MeetingProtocolType

Protocol that produced these minutes.

leader_id NotBlankStr

Agent who led the meeting.

participant_ids tuple[NotBlankStr, ...]

Agents who participated.

agenda MeetingAgenda

The meeting agenda.

contributions tuple[MeetingContribution, ...]

All agent contributions in order.

summary str

Final meeting summary text.

decisions tuple[NotBlankStr, ...]

Decisions made during the meeting.

action_items tuple[ActionItem, ...]

Extracted action items.

conflicts_detected bool

Whether conflicts were detected.

total_input_tokens int

Total prompt tokens consumed.

total_output_tokens int

Total response tokens generated.

started_at AwareDatetime

When the meeting started.

ended_at AwareDatetime

When the meeting ended.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_timing
  • _validate_participants
  • _validate_token_aggregates

meeting_id pydantic-field

meeting_id

Unique meeting ID

protocol_type pydantic-field

protocol_type

Protocol used

leader_id pydantic-field

leader_id

Meeting leader

participant_ids pydantic-field

participant_ids

Meeting participants

agenda pydantic-field

agenda

Meeting agenda

contributions pydantic-field

contributions = ()

All contributions in order

summary pydantic-field

summary = ''

Final summary

decisions pydantic-field

decisions = ()

Decisions made

action_items pydantic-field

action_items = ()

Extracted action items

conflicts_detected pydantic-field

conflicts_detected = False

Whether conflicts were detected

total_input_tokens pydantic-field

total_input_tokens = 0

Total prompt tokens

total_output_tokens pydantic-field

total_output_tokens = 0

Total response tokens

started_at pydantic-field

started_at

Meeting start time

ended_at pydantic-field

ended_at

Meeting end time

total_tokens property

total_tokens

Total tokens consumed (input + output).

MeetingRecord pydantic-model

Bases: BaseModel

Audit trail entry for a meeting execution.

Attributes:

Name Type Description
meeting_id NotBlankStr

Unique meeting identifier.

meeting_type_name NotBlankStr

Name of the meeting type from config.

protocol_type MeetingProtocolType

Protocol strategy used.

status MeetingStatus

Final status of the meeting.

minutes MeetingMinutes | None

Complete minutes if meeting succeeded.

error_message NotBlankStr | None

Error description if meeting failed.

token_budget int

Token budget that was allocated.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_status_consistency

meeting_id pydantic-field

meeting_id

Unique meeting ID

meeting_type_name pydantic-field

meeting_type_name

Meeting type from config

protocol_type pydantic-field

protocol_type

Protocol strategy used

status pydantic-field

status

Final meeting status

minutes pydantic-field

minutes = None

Complete minutes on success

error_message pydantic-field

error_message = None

Error description on failure

token_budget pydantic-field

token_budget

Token budget allocated

orchestrator

Meeting orchestrator -- lifecycle manager (see Communication design page).

Manages the full meeting lifecycle: validates inputs, selects the configured protocol, executes the meeting, optionally creates tasks from action items, and records audit trail entries.

MeetingOrchestrator

MeetingOrchestrator(*, protocol_registry, agent_caller, task_creator=None)

Lifecycle manager for meeting execution.

Coordinates protocol selection, execution, task creation from action items, and audit trail recording. Meeting records are stored in memory; see the persistence layer for durable storage when available.

Parameters:

Name Type Description Default
protocol_registry Mapping[MeetingProtocolType, MeetingProtocol]

Mapping of protocol types to implementations.

required
agent_caller AgentCaller

Callback to invoke agents during meetings.

required
task_creator TaskCreator | None

Optional callback to create tasks from action items.

None
Source code in src/synthorg/communication/meeting/orchestrator.py
def __init__(
    self,
    *,
    protocol_registry: Mapping[MeetingProtocolType, MeetingProtocol],
    agent_caller: AgentCaller,
    task_creator: TaskCreator | None = None,
) -> None:
    self._protocol_registry: MappingProxyType[
        MeetingProtocolType, MeetingProtocol
    ] = MappingProxyType(dict(protocol_registry))
    self._agent_caller = agent_caller
    self._task_creator = task_creator
    self._records: list[MeetingRecord] = []

run_meeting async

run_meeting(
    *,
    meeting_type_name,
    protocol_config,
    agenda,
    leader_id,
    participant_ids,
    token_budget,
)

Execute a meeting and return the audit record.

Validation errors (MeetingParticipantError, MeetingProtocolNotFoundError) are raised directly. Domain and runtime errors during protocol execution are caught and returned as a MeetingRecord with FAILED or BUDGET_EXHAUSTED status. BaseException subclasses (e.g. KeyboardInterrupt) are NOT caught.

Parameters:

Name Type Description Default
meeting_type_name str

Name of the meeting type from config.

required
protocol_config MeetingProtocolConfig

Protocol configuration to use.

required
agenda MeetingAgenda

The meeting agenda.

required
leader_id str

ID of the agent leading the meeting.

required
participant_ids tuple[str, ...]

IDs of participating agents.

required
token_budget int

Maximum tokens for the meeting (must be > 0).

required

Returns:

Type Description
MeetingRecord

Meeting record with status and optional minutes.

Raises:

Type Description
MeetingProtocolNotFoundError

If the configured protocol is not in the registry.

MeetingParticipantError

If participant list is empty, contains duplicates, or leader is in participants.

ValueError

If token_budget is not positive.

Source code in src/synthorg/communication/meeting/orchestrator.py
async def run_meeting(  # noqa: PLR0913
    self,
    *,
    meeting_type_name: str,
    protocol_config: MeetingProtocolConfig,
    agenda: MeetingAgenda,
    leader_id: str,
    participant_ids: tuple[str, ...],
    token_budget: int,
) -> MeetingRecord:
    """Execute a meeting and return the audit record.

    Validation errors (``MeetingParticipantError``,
    ``MeetingProtocolNotFoundError``) are raised directly.
    Domain and runtime errors during protocol execution are caught
    and returned as a ``MeetingRecord`` with ``FAILED`` or
    ``BUDGET_EXHAUSTED`` status.  ``BaseException`` subclasses
    (e.g. ``KeyboardInterrupt``) are NOT caught.

    Args:
        meeting_type_name: Name of the meeting type from config.
        protocol_config: Protocol configuration to use.
        agenda: The meeting agenda.
        leader_id: ID of the agent leading the meeting.
        participant_ids: IDs of participating agents.
        token_budget: Maximum tokens for the meeting (must be > 0).

    Returns:
        Meeting record with status and optional minutes.

    Raises:
        MeetingProtocolNotFoundError: If the configured protocol
            is not in the registry.
        MeetingParticipantError: If participant list is empty,
            contains duplicates, or leader is in participants.
        ValueError: If token_budget is not positive.
    """
    meeting_id = f"mtg-{uuid4().hex[:12]}"
    protocol_type = protocol_config.protocol

    self._validate_inputs(
        meeting_id,
        leader_id,
        participant_ids,
        token_budget,
    )
    protocol = self._resolve_protocol(meeting_id, protocol_type)

    logger.info(
        MEETING_STARTED,
        meeting_id=meeting_id,
        meeting_type=meeting_type_name,
        protocol=protocol_type,
        leader_id=leader_id,
        participant_count=len(participant_ids),
        token_budget=token_budget,
    )

    result = await self._execute_protocol(
        protocol,
        meeting_id,
        meeting_type_name,
        agenda,
        leader_id,
        participant_ids,
        token_budget,
    )

    if isinstance(result, MeetingRecord):
        return result

    self._create_tasks(meeting_id, protocol_config, result)
    return self._record_success(
        meeting_id,
        meeting_type_name,
        protocol_type,
        result,
        token_budget,
    )

get_records

get_records()

Return all meeting audit records.

Returns:

Type Description
tuple[MeetingRecord, ...]

Tuple of meeting records in chronological order.

Source code in src/synthorg/communication/meeting/orchestrator.py
def get_records(self) -> tuple[MeetingRecord, ...]:
    """Return all meeting audit records.

    Returns:
        Tuple of meeting records in chronological order.
    """
    return tuple(self._records)