Communication¶
Inter-agent messaging: bus, dispatcher, delegation, loop prevention, conflict resolution, and meeting protocols.
Message¶
message
¶
Message domain models (see Communication design page).
Part
module-attribute
¶
Discriminated union of message content parts.
Pydantic uses the type literal field on each part to determine
which subtype to deserialize into.
TextPart
pydantic-model
¶
Bases: BaseModel
Plain text message content.
Attributes:
| Name | Type | Description |
|---|---|---|
type |
Literal['text']
|
Discriminator literal (always |
text |
NotBlankStr
|
The text content (must not be blank). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
type(Literal['text']) -
text(NotBlankStr)
DataPart
pydantic-model
¶
Bases: BaseModel
Structured JSON data attached to a message.
The data field is deep-copied and recursively frozen at
construction (dict -> MappingProxyType, list ->
tuple, set -> frozenset) to preserve the immutability
contract of frozen Pydantic models.
Attributes:
| Name | Type | Description |
|---|---|---|
type |
Literal['data']
|
Discriminator literal (always |
data |
MappingProxyType[str, Any]
|
Structured JSON content (read-only after construction). |
Config:
frozen:Trueallow_inf_nan:Falsearbitrary_types_allowed:Trueextra:forbid
Fields:
Validators:
-
_deep_copy_and_freeze_data→data
model_copy
¶
Override to re-freeze data when model_copy updates it.
Source code in src/synthorg/communication/message.py
FilePart
pydantic-model
¶
Bases: BaseModel
Reference to a file resource.
Attributes:
| Name | Type | Description |
|---|---|---|
type |
Literal['file']
|
Discriminator literal (always |
uri |
NotBlankStr
|
File URI or path. |
mime_type |
NotBlankStr | None
|
Optional MIME type of the file. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
type(Literal['file']) -
uri(NotBlankStr) -
mime_type(NotBlankStr | None)
UriPart
pydantic-model
¶
Bases: BaseModel
Reference to an external URI resource.
Attributes:
| Name | Type | Description |
|---|---|---|
type |
Literal['uri']
|
Discriminator literal (always |
uri |
NotBlankStr
|
The URI or URL. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
type(Literal['uri']) -
uri(NotBlankStr)
MessageMetadata
pydantic-model
¶
Bases: BaseModel
Optional metadata carried with a message.
Extends the Communication design page metadata with an additional extra
field for arbitrary key-value pairs.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
NotBlankStr | None
|
Related task identifier. |
project_id |
NotBlankStr | None
|
Related project identifier. |
tokens_used |
int | None
|
LLM tokens consumed producing the message. |
cost |
float | None
|
Estimated cost of the message in the configured currency. |
extra |
tuple[tuple[str, str], ...]
|
Immutable key-value pairs for arbitrary metadata (extension). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
task_id(NotBlankStr | None) -
project_id(NotBlankStr | None) -
tokens_used(int | None) -
cost(float | None) -
extra(tuple[tuple[str, str], ...])
Validators:
-
_validate_extra
Message
pydantic-model
¶
Bases: BaseModel
An inter-agent message.
Field schema is based on the Communication design page with typed
refinements. The sender field is aliased to "from" for
JSON compatibility with the spec format.
Content is represented as a tuple of typed Part objects
(text, data, file, URI) rather than a flat string, enabling rich
multi-part messages.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
UUID
|
Unique message identifier. |
timestamp |
AwareDatetime
|
When the message was created (must be timezone-aware). |
sender |
NotBlankStr
|
Agent ID of the sender (aliased to |
to |
NotBlankStr
|
Recipient agent or channel identifier. |
type |
MessageType
|
Message type classification. |
priority |
MessagePriority
|
Message priority level. |
channel |
NotBlankStr
|
Channel the message is sent through. |
parts |
tuple[Part, ...]
|
Ordered content parts (text, data, files, URIs). |
metadata |
MessageMetadata
|
Optional message metadata. |
Config:
frozen:Truepopulate_by_name:Trueallow_inf_nan:Falsearbitrary_types_allowed:True
Fields:
-
id(UUID) -
timestamp(AwareDatetime) -
sender(NotBlankStr) -
to(NotBlankStr) -
type(MessageType) -
priority(MessagePriority) -
channel(NotBlankStr) -
parts(tuple[Part, ...]) -
attachments(tuple[Part, ...]) -
metadata(MessageMetadata)
attachments
pydantic-field
¶
Out-of-band parts attached to the message (files, data blobs, URIs) that are not part of the primary content flow. Persisted alongside the message and round-tripped by both backends.
text
property
¶
Extract the first TextPart text, or empty string.
Convenience accessor for consumers that only need the primary text content of a message.
Messenger¶
messenger
¶
Per-agent messenger facade over the message bus (see Communication design page).
AgentMessenger
¶
Per-agent facade for sending, receiving, and dispatching messages.
Wraps a :class:MessageBus and optional :class:MessageDispatcher
to provide a high-level API that auto-fills sender, timestamp, and
message ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
str
|
Identifier of the owning agent. |
required |
agent_name
|
str
|
Human-readable name of the agent. |
required |
bus
|
MessageBus
|
The underlying message bus. |
required |
dispatcher
|
MessageDispatcher | None
|
Optional message dispatcher for handler routing. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If agent_id or agent_name is blank. |
Source code in src/synthorg/communication/messenger.py
send_message
async
¶
Send a message to a channel.
Auto-fills sender, timestamp, and message ID. Provide either
content (auto-wrapped as a TextPart) or parts for rich
multi-part messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to
|
str
|
Recipient agent or channel identifier. |
required |
channel
|
str
|
Channel to publish through. |
required |
content
|
str | None
|
Message body text (auto-wrapped as TextPart). |
None
|
parts
|
tuple[Part, ...] | None
|
Explicit message parts (mutually exclusive with content). |
None
|
message_type
|
MessageType
|
Message type classification. |
required |
priority
|
MessagePriority
|
Message priority level. |
NORMAL
|
Returns:
| Type | Description |
|---|---|
Message
|
The constructed and published message. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither or both content/parts are provided. |
ChannelNotFoundError
|
If the channel does not exist. |
MessageBusNotRunningError
|
If the bus is not running. |
Source code in src/synthorg/communication/messenger.py
send_direct
async
¶
Send a direct message to another agent.
Auto-fills sender, timestamp, and message ID. The message's
channel field is set to the deterministic @{a}:{b}
channel name computed from the sorted agent ID pair.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to
|
str
|
Recipient agent ID. |
required |
content
|
str | None
|
Message body text (auto-wrapped as TextPart). |
None
|
parts
|
tuple[Part, ...] | None
|
Explicit message parts (mutually exclusive with content). |
None
|
message_type
|
MessageType
|
Message type classification. |
required |
priority
|
MessagePriority
|
Message priority level. |
NORMAL
|
Returns:
| Type | Description |
|---|---|
Message
|
The constructed and sent message. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither or both content/parts are provided. |
MessageBusNotRunningError
|
If the bus is not running. |
Source code in src/synthorg/communication/messenger.py
broadcast
async
¶
Publish an announcement to a shared channel.
Agents must be subscribed to the target channel to receive
the message. For true fan-out to all known agents, use a
channel of type :attr:ChannelType.BROADCAST.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
str | None
|
Message body text (auto-wrapped as TextPart). |
None
|
parts
|
tuple[Part, ...] | None
|
Explicit message parts (mutually exclusive with content). |
None
|
message_type
|
MessageType
|
Message type classification. |
required |
priority
|
MessagePriority
|
Message priority level. |
NORMAL
|
channel
|
str
|
Channel name (default |
'#all-hands'
|
Returns:
| Type | Description |
|---|---|
Message
|
The constructed and published message. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither or both content/parts are provided. |
ChannelNotFoundError
|
If the channel does not exist. |
MessageBusNotRunningError
|
If the bus is not running. |
Source code in src/synthorg/communication/messenger.py
subscribe
async
¶
Subscribe this agent to a channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Channel to subscribe to. |
required |
Returns:
| Type | Description |
|---|---|
Subscription
|
The subscription record. |
Raises:
| Type | Description |
|---|---|
ChannelNotFoundError
|
If the channel does not exist. |
MessageBusNotRunningError
|
If the bus is not running. |
Source code in src/synthorg/communication/messenger.py
unsubscribe
async
¶
Unsubscribe this agent from a channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Channel to unsubscribe from. |
required |
Raises:
| Type | Description |
|---|---|
NotSubscribedError
|
If not currently subscribed. |
Source code in src/synthorg/communication/messenger.py
receive
async
¶
Receive the next message from a channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Channel to receive from. |
required |
timeout
|
float | None
|
Max seconds to wait, or |
None
|
Returns:
| Type | Description |
|---|---|
DeliveryEnvelope | None
|
The next delivery envelope, or |
DeliveryEnvelope | None
|
|
DeliveryEnvelope | None
|
|
DeliveryEnvelope | None
|
|
Source code in src/synthorg/communication/messenger.py
register_handler
¶
Register a message handler.
Creates a dispatcher automatically if one was not provided at construction time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
MessageHandler | MessageHandlerFunc
|
The handler instance or async function. |
required |
message_types
|
frozenset[MessageType] | None
|
Message types to match (empty/None = all). |
None
|
min_priority
|
MessagePriority
|
Minimum priority to accept. |
LOW
|
name
|
str
|
Human-readable label for debugging. |
'unnamed'
|
Returns:
| Type | Description |
|---|---|
str
|
The unique handler registration ID. |
Source code in src/synthorg/communication/messenger.py
deregister_handler
¶
Remove a previously registered handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler_id
|
str
|
The registration ID. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the handler was found and removed. |
Source code in src/synthorg/communication/messenger.py
dispatch_message
async
¶
Dispatch an incoming message to registered handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The message to dispatch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
DispatchResult
|
class: |
Source code in src/synthorg/communication/messenger.py
Dispatcher¶
dispatcher
¶
Message dispatcher -- routes incoming messages to registered handlers.
See the Communication design page.
DispatchResult
pydantic-model
¶
Bases: BaseModel
Immutable outcome of dispatching a single message.
Attributes:
| Name | Type | Description |
|---|---|---|
message_id |
UUID
|
The dispatched message's identifier. |
handlers_matched |
int
|
Derived count (succeeded + failed). |
handlers_succeeded |
int
|
Number of handlers that completed without error. |
handlers_failed |
int
|
Number of handlers that raised an exception. |
errors |
tuple[str, ...]
|
Error descriptions from failed handlers. |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
message_id(UUID) -
handlers_succeeded(int) -
handlers_failed(int) -
errors(tuple[str, ...])
MessageDispatcher
¶
Per-agent dispatcher that routes messages to registered handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
str
|
Identifier of the owning agent (for logging context). |
'unknown'
|
Source code in src/synthorg/communication/dispatcher.py
register
¶
Register a handler for incoming messages.
If handler is a bare async function, it is automatically wrapped
in a :class:FunctionHandler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
MessageHandler | MessageHandlerFunc
|
The handler instance or async function. |
required |
message_types
|
frozenset[MessageType] | None
|
Message types to match (empty/None = all). |
None
|
min_priority
|
MessagePriority
|
Minimum priority to accept. |
LOW
|
name
|
str
|
Human-readable label for debugging. |
'unnamed'
|
Returns:
| Type | Description |
|---|---|
str
|
The unique handler registration ID. |
Source code in src/synthorg/communication/dispatcher.py
deregister
¶
Remove a previously registered handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler_id
|
str
|
The registration ID returned by :meth: |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the handler was found and removed, False otherwise. |
Source code in src/synthorg/communication/dispatcher.py
dispatch
async
¶
Route a message to all matching handlers concurrently.
Most handler Exception subclasses are isolated; their
errors are captured into the per-handler errors slot
without affecting siblings. MemoryError and
RecursionError are deliberate carve-outs: the interpreter
is in catastrophic state, so they re-raise to abort the
TaskGroup and cancel all remaining handlers rather than
being absorbed silently. BaseException subclasses (e.g.
KeyboardInterrupt, asyncio.CancelledError) likewise
propagate through the TaskGroup, cancelling all remaining
handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The message to dispatch. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
A |
DispatchResult
|
class: |
Source code in src/synthorg/communication/dispatcher.py
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | |
Bus Protocol¶
bus_protocol
¶
Message bus protocol (see Communication design page).
Defines the swappable interface for message bus backends. The
default implementation is :class:InMemoryMessageBus in
synthorg.communication.bus.memory.
MessageBus
¶
Bases: Protocol
Protocol for message bus backends.
All implementations must support the full lifecycle (start/stop), channel management, pub/sub messaging, direct messaging, and channel history.
Uses a pull model: consumers call :meth:receive to get the
next message rather than registering push callbacks.
is_running
property
¶
Whether the bus client thinks it is running.
Local state check: flipped by :meth:start / :meth:stop.
Does not reflect live server reachability -- use
:meth:health_check for that.
start
async
¶
Start the bus and create pre-configured channels.
Raises:
| Type | Description |
|---|---|
MessageBusAlreadyRunningError
|
If the bus is already running. |
stop
async
¶
health_check
async
¶
Live liveness probe against the underlying transport.
Unlike :attr:is_running, this performs an actual
round-trip to confirm the broker is reachable and the
connection is healthy. Implementations return False
rather than raising when the probe fails, so the caller
can treat the bus as degraded without a try/except.
Source code in src/synthorg/communication/bus_protocol.py
publish
async
¶
Publish a message to its channel.
The target channel is determined by message.channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The message to publish. |
required |
ttl_seconds
|
float | None
|
Optional per-message TTL in seconds. When set,
the message expires after this duration on the server.
Requires the NATS stream to have |
None
|
Raises:
| Type | Description |
|---|---|
MessageBusNotRunningError
|
If the bus is not running. |
ChannelNotFoundError
|
If the target channel does not exist. |
Source code in src/synthorg/communication/bus_protocol.py
send_direct
async
¶
Send a direct message between two agents.
Lazily creates a DIRECT channel named @{a}:{b} (where
a, b are the sorted agent IDs) and subscribes both agents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Message
|
The message to send ( |
required |
recipient
|
NotBlankStr
|
The recipient agent ID. |
required |
ttl_seconds
|
float | None
|
Optional per-message TTL in seconds. |
None
|
Raises:
| Type | Description |
|---|---|
MessageBusNotRunningError
|
If the bus is not running. |
ValueError
|
If the recipient does not match
|
Source code in src/synthorg/communication/bus_protocol.py
subscribe
async
¶
Subscribe an agent to a channel.
Idempotent -- returns a fresh subscription record if already subscribed (the channel's subscriber list is not duplicated).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Channel to subscribe to. |
required |
subscriber_id
|
NotBlankStr
|
Agent ID of the subscriber. |
required |
Returns:
| Type | Description |
|---|---|
Subscription
|
The subscription record. |
Raises:
| Type | Description |
|---|---|
MessageBusNotRunningError
|
If the bus is not running. |
ChannelNotFoundError
|
If the channel does not exist. |
Source code in src/synthorg/communication/bus_protocol.py
unsubscribe
async
¶
Remove an agent's subscription from a channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Channel to unsubscribe from. |
required |
subscriber_id
|
NotBlankStr
|
Agent ID to remove. |
required |
Raises:
| Type | Description |
|---|---|
MessageBusNotRunningError
|
If the bus is not running. |
NotSubscribedError
|
If the agent is not subscribed. |
Source code in src/synthorg/communication/bus_protocol.py
receive
async
¶
Receive the next message from a channel.
Awaits until a message is available, the timeout expires, or
the bus is stopped. When timeout is None, awaits
indefinitely (or until shutdown).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Channel to receive from. |
required |
subscriber_id
|
NotBlankStr
|
Agent ID receiving. |
required |
timeout
|
float | None
|
Seconds to wait before returning |
None
|
Returns:
| Type | Description |
|---|---|
DeliveryEnvelope | None
|
A delivery envelope, or |
Raises:
| Type | Description |
|---|---|
MessageBusNotRunningError
|
If the bus is not running. |
ChannelNotFoundError
|
If the channel does not exist. |
NotSubscribedError
|
If the subscriber is not subscribed (for TOPIC and DIRECT channels). |
Source code in src/synthorg/communication/bus_protocol.py
create_channel
async
¶
Create a new channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel
|
Channel
|
Channel definition to create. |
required |
Returns:
| Type | Description |
|---|---|
Channel
|
The created channel. |
Raises:
| Type | Description |
|---|---|
MessageBusNotRunningError
|
If the bus is not running. |
ChannelAlreadyExistsError
|
If a channel with that name already exists. |
Source code in src/synthorg/communication/bus_protocol.py
get_channel
async
¶
Get a channel by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Name of the channel. |
required |
Returns:
| Type | Description |
|---|---|
Channel
|
The channel. |
Raises:
| Type | Description |
|---|---|
ChannelNotFoundError
|
If the channel does not exist. |
Source code in src/synthorg/communication/bus_protocol.py
publish_batch
async
¶
Publish multiple messages in a batch.
Optimised for throughput. In NATS backends, uses pipelined async publishes for reduced round-trip overhead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
Sequence[Message]
|
Messages to publish. An empty sequence is a no-op. |
required |
ttl_seconds
|
float | None
|
Optional per-message TTL applied to every message in the batch. |
None
|
Raises:
| Type | Description |
|---|---|
MessageBusNotRunningError
|
If the bus is not running. |
ChannelNotFoundError
|
If any target channel does not exist. |
Source code in src/synthorg/communication/bus_protocol.py
list_channels
async
¶
get_channel_history
async
¶
Get message history for a channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
channel_name
|
NotBlankStr
|
Channel to query. |
required |
limit
|
int | None
|
Maximum number of most recent messages to return.
Values |
None
|
Returns:
| Type | Description |
|---|---|
tuple[Message, ...]
|
Messages in chronological order. |
Raises:
| Type | Description |
|---|---|
ChannelNotFoundError
|
If the channel does not exist. |
Source code in src/synthorg/communication/bus_protocol.py
Config¶
config
¶
Communication configuration models (see Communication design page).
MessageRetentionConfig
pydantic-model
¶
Bases: BaseModel
Retention settings for channel message history and subscriber delivery.
Attributes:
| Name | Type | Description |
|---|---|---|
max_messages_per_channel |
int
|
Maximum messages kept per channel
for both in-memory and NATS backends. In-memory backend:
|
max_subscriber_queue_size |
int
|
Maximum in-flight envelopes per
(channel, subscriber). In-memory backend applies a
drop-newest policy with |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
max_messages_per_channel(int) -
max_subscriber_queue_size(int)
NatsConfig
pydantic-model
¶
Bases: BaseModel
NATS JetStream backend configuration.
Only applicable when MessageBusConfig.backend == NATS. See
docs/design/distributed-runtime.md for stream layout and
subject naming.
Attributes:
| Name | Type | Description |
|---|---|---|
url |
NotBlankStr
|
NATS server URL (e.g. |
credentials_path |
str | None
|
Optional path to a credentials file for secured clusters (creds file or jwt+seed). |
stream_name_prefix |
NotBlankStr
|
Prefix for JetStream stream names. The
bus stream is |
connect_timeout_seconds |
float
|
Seconds to wait for the initial connection before raising. |
reconnect_time_wait_seconds |
float
|
Seconds between reconnect attempts. |
max_reconnect_attempts |
int
|
Maximum reconnect attempts before
giving up ( |
publish_ack_wait_seconds |
float
|
Seconds to wait for a JetStream publish ack before considering the publish failed. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
url(NotBlankStr) -
credentials_path(str | None) -
stream_name_prefix(NotBlankStr) -
connect_timeout_seconds(float) -
reconnect_time_wait_seconds(float) -
max_reconnect_attempts(int) -
publish_ack_wait_seconds(float) -
health_flush_timeout_seconds(float)
Validators:
-
_validate_url→url
stream_name_prefix
pydantic-field
¶
Prefix for JetStream stream names
reconnect_time_wait_seconds
pydantic-field
¶
Seconds between reconnect attempts
max_reconnect_attempts
pydantic-field
¶
Max reconnect attempts (-1 = unlimited)
health_flush_timeout_seconds
pydantic-field
¶
Timeout in seconds for the NATS health-check flush probe. Deliberately tight so a stuck probe fails fast rather than blocking the /healthz endpoint.
MessageBusConfig
pydantic-model
¶
Bases: BaseModel
Message bus backend configuration.
Maps to the Communication design page message_bus.
Attributes:
| Name | Type | Description |
|---|---|---|
backend |
MessageBusBackend
|
Transport backend to use. |
channels |
tuple[NotBlankStr, ...]
|
Pre-defined channel names. |
retention |
MessageRetentionConfig
|
Message retention settings. |
nats |
NatsConfig | None
|
NATS-specific configuration (required when
|
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
backend(MessageBusBackend) -
channels(tuple[NotBlankStr, ...]) -
retention(MessageRetentionConfig) -
nats(NatsConfig | None)
Validators:
-
_validate_channels -
_validate_backend_config
MeetingTypeConfig
pydantic-model
¶
Bases: BaseModel
Configuration for a single meeting type.
Maps to the Communication design page meetings.types[]. Exactly one of
frequency or trigger must be set.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
NotBlankStr
|
Meeting type name (e.g. |
frequency |
MeetingFrequency | None
|
Recurrence schedule (mutually exclusive with trigger). |
trigger |
NotBlankStr | None
|
Event trigger (mutually exclusive with frequency). |
participants |
tuple[NotBlankStr, ...]
|
Participant role or agent identifiers. |
duration_tokens |
int
|
Token budget for the meeting. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
name(NotBlankStr) -
frequency(MeetingFrequency | None) -
trigger(NotBlankStr | None) -
participants(tuple[NotBlankStr, ...]) -
duration_tokens(int) -
protocol_config(MeetingProtocolConfig) -
min_interval_seconds(int | None)
Validators:
-
_validate_frequency_or_trigger -
_validate_participants
min_interval_seconds
pydantic-field
¶
Minimum seconds between event-triggered meetings of this type
MeetingsConfig
pydantic-model
¶
Bases: BaseModel
Meetings subsystem configuration.
Maps to the Communication design page meetings.
Attributes:
| Name | Type | Description |
|---|---|---|
enabled |
bool
|
Whether the meetings subsystem is active. |
types |
tuple[MeetingTypeConfig, ...]
|
Configured meeting types (unique by name). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
enabled(bool) -
types(tuple[MeetingTypeConfig, ...])
Validators:
-
_apply_mirrors -
_validate_unique_meeting_names
HierarchyConfig
pydantic-model
¶
Bases: BaseModel
Hierarchy enforcement configuration.
Maps to the Communication design page hierarchy.
Attributes:
| Name | Type | Description |
|---|---|---|
enforce_chain_of_command |
bool
|
Whether chain-of-command is enforced. |
allow_skip_level |
bool
|
Whether skip-level messaging is allowed. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
enforce_chain_of_command(bool) -
allow_skip_level(bool)
RateLimitConfig
pydantic-model
¶
Bases: BaseModel
Per-pair message rate limit configuration.
Maps to the Communication design page rate_limit.
Attributes:
| Name | Type | Description |
|---|---|---|
max_per_pair_per_minute |
int
|
Maximum messages per agent pair per minute. |
burst_allowance |
int
|
Extra burst capacity above the rate limit. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
max_per_pair_per_minute(int) -
burst_allowance(int)
CircuitBreakerConfig
pydantic-model
¶
Bases: BaseModel
Circuit breaker configuration for agent-pair communication.
Maps to the Communication design page circuit_breaker.
Attributes:
| Name | Type | Description |
|---|---|---|
bounce_threshold |
int
|
Bounce count before the circuit opens. |
cooldown_seconds |
int
|
Seconds to wait before retrying after trip. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
bounce_threshold(int) -
cooldown_seconds(int) -
max_cooldown_seconds(int)
Validators:
-
_validate_cooldown_bounds
max_cooldown_seconds
pydantic-field
¶
Maximum cooldown period in seconds (caps exponential backoff)
LoopPreventionConfig
pydantic-model
¶
Bases: BaseModel
Loop prevention safeguards.
Maps to the Communication design page. ancestry_tracking is always on
and cannot be disabled.
Attributes:
| Name | Type | Description |
|---|---|---|
max_delegation_depth |
int
|
Hard limit on delegation chain length. |
rate_limit |
RateLimitConfig
|
Per-pair rate limit settings. |
dedup_window_seconds |
int
|
Deduplication window in seconds. |
circuit_breaker |
CircuitBreakerConfig
|
Circuit breaker settings. |
ancestry_tracking |
Literal[True]
|
Must always be |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
max_delegation_depth(int) -
rate_limit(RateLimitConfig) -
dedup_window_seconds(int) -
circuit_breaker(CircuitBreakerConfig) -
ancestry_tracking(Literal[True])
CommunicationConfig
pydantic-model
¶
Bases: BaseModel
Top-level communication configuration.
Aggregates the Communication design page sections under a single model.
Attributes:
| Name | Type | Description |
|---|---|---|
default_pattern |
CommunicationPattern
|
High-level communication pattern. |
message_bus |
MessageBusConfig
|
Message bus configuration. |
meetings |
MeetingsConfig
|
Meetings subsystem configuration. |
hierarchy |
HierarchyConfig
|
Hierarchy enforcement settings. |
loop_prevention |
LoopPreventionConfig
|
Loop prevention safeguards. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
default_pattern(CommunicationPattern) -
message_bus(MessageBusConfig) -
meetings(MeetingsConfig) -
hierarchy(HierarchyConfig) -
loop_prevention(LoopPreventionConfig) -
conflict_resolution(ConflictResolutionConfig)
conflict_resolution
pydantic-field
¶
Conflict resolution configuration (see Communication design page)
Errors¶
errors
¶
Communication error hierarchy (see Communication design page).
All communication errors carry an immutable context mapping for
structured metadata, following the same pattern as ToolError.
CommunicationError
¶
Bases: DomainError
Base exception for all communication-layer errors.
Attributes:
| Name | Type | Description |
|---|---|---|
message |
Human-readable error description. |
|
context |
MappingProxyType[str, Any]
|
Immutable metadata about the error. |
Class Attributes
status_code: HTTP 500 default.
error_code: COMMUNICATION_ERROR.
error_category: INTERNAL.
retryable: False.
default_message: Generic 5xx-safe message.
Initialize a communication error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Human-readable error description. |
required |
context
|
dict[str, Any] | None
|
Arbitrary metadata about the error. Stored as an immutable mapping; defaults to empty if not provided. |
None
|
Source code in src/synthorg/communication/errors.py
__str__
¶
Format error with optional context metadata.
ChannelNotFoundError
¶
Bases: CommunicationError
Requested channel does not exist.
Source code in src/synthorg/communication/errors.py
ChannelAlreadyExistsError
¶
Bases: CommunicationError
Channel with the given name already exists.
Source code in src/synthorg/communication/errors.py
NotSubscribedError
¶
Bases: CommunicationError
Agent is not subscribed to the specified channel.
Source code in src/synthorg/communication/errors.py
MessageBusNotRunningError
¶
Bases: CommunicationError
Operation attempted on a message bus that is not running.
Source code in src/synthorg/communication/errors.py
MessageBusAlreadyRunningError
¶
Bases: CommunicationError
start() called on a message bus that is already running.
Source code in src/synthorg/communication/errors.py
DelegationError
¶
Bases: CommunicationError
Base exception for delegation-related errors.
Source code in src/synthorg/communication/errors.py
DelegationAuthorityError
¶
Bases: DelegationError
Delegator lacks authority to delegate to the target agent.
Source code in src/synthorg/communication/errors.py
DelegationLoopError
¶
Bases: DelegationError
Base for loop prevention mechanism rejections.
Source code in src/synthorg/communication/errors.py
DelegationDepthError
¶
Bases: DelegationLoopError
Delegation chain exceeds maximum depth.
Source code in src/synthorg/communication/errors.py
DelegationAncestryError
¶
Bases: DelegationLoopError
Delegation would create a cycle in the task ancestry.
Source code in src/synthorg/communication/errors.py
DelegationRateLimitError
¶
Bases: DelegationLoopError
Delegation rate limit exceeded for agent pair.
Source code in src/synthorg/communication/errors.py
DelegationCircuitOpenError
¶
Bases: DelegationLoopError
Circuit breaker is open for agent pair.
Source code in src/synthorg/communication/errors.py
DelegationDuplicateError
¶
Bases: DelegationLoopError
Duplicate delegation detected within dedup window.
Source code in src/synthorg/communication/errors.py
HierarchyResolutionError
¶
Bases: CommunicationError
Error resolving organizational hierarchy.
Source code in src/synthorg/communication/errors.py
ConflictResolutionError
¶
Bases: CommunicationError
Base exception for conflict resolution errors.
Source code in src/synthorg/communication/errors.py
ConflictStrategyError
¶
Bases: ConflictResolutionError
Error within a conflict resolution strategy.
Source code in src/synthorg/communication/errors.py
ConflictHierarchyError
¶
Bases: ConflictResolutionError
No common manager found for cross-department conflict.
Source code in src/synthorg/communication/errors.py
EscalationDecisionError
¶
Bases: ConflictResolutionError
A human-supplied escalation decision cannot be applied.
Concrete subclasses distinguish the failure mode: the decision shape
is not accepted by the active processor variant
(:class:EscalationDecisionShapeError) or the decision references
an agent outside the conflict
(:class:EscalationDecisionAgentError).
Source code in src/synthorg/communication/errors.py
EscalationDecisionShapeError
¶
Bases: EscalationDecisionError
The decision variant is not accepted by the active processor.
Source code in src/synthorg/communication/errors.py
EscalationDecisionAgentError
¶
Bases: EscalationDecisionError
A winner decision references an agent outside the conflict.
Source code in src/synthorg/communication/errors.py
Delegation¶
models
¶
Delegation request, result, and audit trail models.
DelegationRequest
pydantic-model
¶
Bases: BaseModel
Request to delegate a task down the hierarchy.
Attributes:
| Name | Type | Description |
|---|---|---|
delegator_id |
NotBlankStr
|
Agent ID of the delegator. |
delegatee_id |
NotBlankStr
|
Agent ID of the target agent. |
task |
Task
|
The task to delegate. |
refinement |
str
|
Additional context from the delegator. |
constraints |
tuple[NotBlankStr, ...]
|
Extra constraints for the delegatee. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
delegator_id(NotBlankStr) -
delegatee_id(NotBlankStr) -
task(Task) -
refinement(str) -
constraints(tuple[NotBlankStr, ...]) -
entity_versions(Mapping[str, int] | None)
Validators:
-
_validate_self_delegation
DelegationResult
pydantic-model
¶
Bases: BaseModel
Outcome of a delegation attempt.
Attributes:
| Name | Type | Description |
|---|---|---|
success |
bool
|
Whether the delegation succeeded. |
delegated_task |
Task | None
|
The sub-task created, if successful. |
rejection_reason |
str | None
|
Reason for rejection, if unsuccessful. |
blocked_by |
NotBlankStr | None
|
Mechanism name that blocked, if applicable. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
success(bool) -
delegated_task(Task | None) -
rejection_reason(str | None) -
blocked_by(NotBlankStr | None)
Validators:
-
_validate_success_consistency
DelegationRecord
pydantic-model
¶
Bases: BaseModel
Audit trail entry for a completed delegation.
Attributes:
| Name | Type | Description |
|---|---|---|
delegation_id |
NotBlankStr
|
Unique delegation identifier. |
delegator_id |
NotBlankStr
|
Agent ID of the delegator. |
delegatee_id |
NotBlankStr
|
Agent ID of the delegatee. |
original_task_id |
NotBlankStr
|
ID of the original task. |
delegated_task_id |
NotBlankStr
|
ID of the created sub-task. |
timestamp |
AwareDatetime
|
When the delegation occurred. |
refinement |
str
|
Context provided by the delegator. |
entity_versions |
Mapping[str, int] | None
|
Entity version manifest at delegation time. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
delegation_id(NotBlankStr) -
delegator_id(NotBlankStr) -
delegatee_id(NotBlankStr) -
original_task_id(NotBlankStr) -
delegated_task_id(NotBlankStr) -
timestamp(AwareDatetime) -
refinement(str) -
entity_versions(Mapping[str, int] | None)
service
¶
Delegation service orchestrating hierarchy, authority, and loop prevention.
DelegationService
¶
Orchestrates hierarchical delegation with loop prevention.
Validates authority, checks loop prevention guards, creates sub-tasks, and records audit trail entries. The core logic is synchronous (CPU-only); messaging is a separate async concern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hierarchy
|
HierarchyResolver
|
Resolved organizational hierarchy. |
required |
authority_validator
|
AuthorityValidator
|
Authority validation logic. |
required |
guard
|
DelegationGuard
|
Loop prevention guard. |
required |
record_store
|
DelegationRecordStore | None
|
Optional delegation record store for activity tracking. |
None
|
Source code in src/synthorg/communication/delegation/service.py
delegate
async
¶
Execute a delegation: authority, loops, sub-task, audit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
DelegationRequest
|
The delegation request. |
required |
delegator
|
AgentIdentity
|
Identity of the delegating agent. |
required |
delegatee
|
AgentIdentity
|
Identity of the target agent. |
required |
Returns:
| Type | Description |
|---|---|
DelegationResult
|
Result indicating success or rejection with reason. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If request IDs do not match identity objects. |
DelegationError
|
If sub-task construction fails. |
Source code in src/synthorg/communication/delegation/service.py
reject_delegated_task
staticmethod
¶
Transition a CREATED task to REJECTED.
Used when a delegatee explicitly refuses a task that was already created for them. The task must be in CREATED status.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
The task to reject (must be in CREATED status). |
required |
Returns:
| Type | Description |
|---|---|
Task
|
A new Task in REJECTED status. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the task is not in CREATED status. |
Source code in src/synthorg/communication/delegation/service.py
get_audit_trail
¶
Return all delegation audit records.
Returns:
| Type | Description |
|---|---|
tuple[DelegationRecord, ...]
|
Tuple of delegation records in chronological order. |
get_supervisor_of
¶
Expose hierarchy lookup for escalation callers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_name
|
str
|
Agent name to look up. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Supervisor name or None if at the top. |
Source code in src/synthorg/communication/delegation/service.py
authority
¶
Authority validation for hierarchical delegation.
AuthorityCheckResult
pydantic-model
¶
AuthorityValidator
¶
Validates delegation authority using hierarchy and role permissions.
Checks
- Hierarchy: delegatee must be a subordinate of delegator (direct or skip-level depending on config).
- Roles: if
delegator.authority.can_delegate_tois non-empty,delegatee.rolemust be in it; if empty, all roles are permitted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hierarchy
|
HierarchyResolver
|
Resolved org hierarchy. |
required |
hierarchy_config
|
HierarchyConfig
|
Hierarchy enforcement configuration. |
required |
Source code in src/synthorg/communication/delegation/authority.py
validate
¶
Validate whether delegator can delegate to delegatee.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delegator
|
AgentIdentity
|
Identity of the delegating agent. |
required |
delegatee
|
AgentIdentity
|
Identity of the target agent. |
required |
Returns:
| Type | Description |
|---|---|
AuthorityCheckResult
|
Result indicating whether delegation is authorized. |
Source code in src/synthorg/communication/delegation/authority.py
Loop Prevention¶
guard
¶
Delegation guard orchestrating all loop prevention mechanisms.
DelegationGuard
¶
Orchestrates all loop prevention mechanisms.
Checks run in order: ancestry, depth, dedup, rate_limit, circuit_breaker. Short-circuits on the first failure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
LoopPreventionConfig
|
Loop prevention configuration. |
required |
circuit_breaker_repo
|
CircuitBreakerStateRepository | None
|
Optional persistence repository for circuit breaker state (survives restarts). |
None
|
Source code in src/synthorg/communication/loop_prevention/guard.py
check
¶
Run all loop prevention checks.
Returns the first failing outcome, or a success outcome if all checks pass.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delegation_chain
|
tuple[str, ...]
|
Current delegation ancestry. |
required |
delegator_id
|
str
|
ID of the delegating agent. |
required |
delegatee_id
|
str
|
ID of the proposed delegatee. |
required |
task_id
|
str
|
Unique ID of the task being delegated. |
required |
Returns:
| Type | Description |
|---|---|
GuardCheckOutcome
|
First failing outcome or an all-passed success. |
Source code in src/synthorg/communication/loop_prevention/guard.py
record_delegation
¶
Record a successful delegation in all stateful mechanisms.
Each delegation between a pair contributes to the circuit breaker bounce count. Back-and-forth patterns (A->B then B->A) both increment the same counter because the pair key is direction- agnostic, so repeated ping-pong will trip the breaker fastest.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delegator_id
|
str
|
ID of the delegating agent. |
required |
delegatee_id
|
str
|
ID of the target agent. |
required |
task_id
|
str
|
Unique ID of the delegated task. |
required |
Source code in src/synthorg/communication/loop_prevention/guard.py
load_state
async
¶
Load persisted circuit breaker state from the repository.
Called once at startup. No-op if no repository is configured.
persist
async
¶
Flush dirty circuit breaker state to the repository.
Best-effort. No-op if no repository is configured.
models
¶
Loop prevention check outcome model.
GuardCheckOutcome
pydantic-model
¶
Bases: BaseModel
Result of a single loop prevention check.
Attributes:
| Name | Type | Description |
|---|---|---|
passed |
bool
|
Whether the check passed (delegation allowed). |
mechanism |
NotBlankStr
|
Name of the mechanism that produced this outcome. |
message |
str
|
Human-readable detail (empty on success). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
passed(bool) -
mechanism(NotBlankStr) -
message(str)
Validators:
-
_validate_passed_message
Conflict Resolution¶
protocol
¶
Conflict resolution protocol interfaces (see Communication design page).
Defines the pluggable strategy interface that varies per resolution
approach (resolve + build_dissent_records). Detection logic
lives on the service, not the protocol, because it is strategy-agnostic.
JudgeDecision
¶
Bases: NamedTuple
Result of an LLM-based judge evaluation.
Attributes:
| Name | Type | Description |
|---|---|---|
winning_agent_id |
str
|
Agent whose position was chosen. |
reasoning |
str
|
Explanation for the decision. |
ConflictResolver
¶
Bases: Protocol
Protocol for conflict resolution strategies.
Each strategy implements resolve (async, may need LLM calls)
and build_dissent_records (sync, builds audit artifacts for
every overruled position).
resolve
async
¶
Resolve a conflict and produce a decision.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conflict
|
Conflict
|
The conflict to resolve. |
required |
Returns:
| Type | Description |
|---|---|
ConflictResolution
|
Resolution decision. |
build_dissent_records
¶
Build audit records for all losing positions.
For N-party conflicts, produces one DissentRecord per
overruled agent. For escalated conflicts, produces one
record per position (all are pending human review).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conflict
|
Conflict
|
The original conflict. |
required |
resolution
|
ConflictResolution
|
The resolution decision. |
required |
Returns:
| Type | Description |
|---|---|
tuple[DissentRecord, ...]
|
Dissent records preserving every overruled reasoning. |
Source code in src/synthorg/communication/conflict_resolution/protocol.py
JudgeEvaluator
¶
Bases: Protocol
Protocol for LLM-based judge evaluation.
Used by debate and hybrid strategies. When absent, strategies fall back to authority-based judging.
evaluate
async
¶
Evaluate conflict positions and pick a winner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conflict
|
Conflict
|
The conflict with agent positions. |
required |
judge_agent_id
|
NotBlankStr
|
The agent acting as judge. |
required |
Returns:
| Type | Description |
|---|---|
JudgeDecision
|
Decision containing the winning agent ID and reasoning. |
Source code in src/synthorg/communication/conflict_resolution/protocol.py
config
¶
Conflict resolution configuration models (see Communication design page).
DebateConfig
pydantic-model
¶
Bases: BaseModel
Configuration for the structured debate strategy.
Attributes:
| Name | Type | Description |
|---|---|---|
judge |
NotBlankStr
|
Judge selection -- |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
judge
pydantic-field
¶
Judge selection: "shared_manager", "ceo", or agent name
HybridConfig
pydantic-model
¶
Bases: BaseModel
Configuration for the hybrid resolution strategy.
Attributes:
| Name | Type | Description |
|---|---|---|
review_agent |
NotBlankStr
|
Agent tasked with reviewing positions. |
escalate_on_ambiguity |
bool
|
Whether to escalate to human when the review result is ambiguous. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
review_agent(NotBlankStr) -
escalate_on_ambiguity(bool)
ConflictResolutionConfig
pydantic-model
¶
Bases: BaseModel
Top-level conflict resolution configuration.
Attributes:
| Name | Type | Description |
|---|---|---|
strategy |
ConflictResolutionStrategy
|
Default resolution strategy. |
debate |
DebateConfig
|
Configuration for the debate strategy. |
hybrid |
HybridConfig
|
Configuration for the hybrid strategy. |
escalation |
EscalationQueueConfig
|
Configuration for the human escalation queue. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
strategy(ConflictResolutionStrategy) -
debate(DebateConfig) -
hybrid(HybridConfig) -
escalation(EscalationQueueConfig)
models
¶
Conflict resolution domain models (see Communication design page).
All models are frozen Pydantic v2 with NotBlankStr identifiers,
following the patterns established in delegation/models.py.
ConflictResolutionOutcome
¶
Bases: StrEnum
Outcome of a conflict resolution attempt.
Members
RESOLVED_BY_AUTHORITY: Decided by seniority/hierarchy. RESOLVED_BY_DEBATE: Decided by structured debate + judge. RESOLVED_BY_HYBRID: Decided by hybrid review process. RESOLVED_BY_HUMAN: Human operator picked a winning position via the escalation queue. REJECTED_BY_HUMAN: Human operator rejected all positions via the escalation queue. ESCALATED_TO_HUMAN: Escalation pending or timed out with no human decision collected.
ConflictPosition
pydantic-model
¶
Bases: BaseModel
One agent's stance in a conflict.
Attributes:
| Name | Type | Description |
|---|---|---|
agent_id |
NotBlankStr
|
Identifier of the agent taking the position. |
agent_department |
NotBlankStr
|
Department the agent belongs to. |
agent_level |
SeniorityLevel
|
Seniority level of the agent. |
position |
NotBlankStr
|
Summary of the agent's stance. |
reasoning |
NotBlankStr
|
Detailed justification for the position. |
timestamp |
AwareDatetime
|
When the position was stated. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
agent_id(NotBlankStr) -
agent_department(NotBlankStr) -
agent_level(SeniorityLevel) -
position(NotBlankStr) -
reasoning(NotBlankStr) -
timestamp(AwareDatetime)
Conflict
pydantic-model
¶
Bases: BaseModel
A dispute between two or more agents.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
NotBlankStr
|
Unique conflict identifier (e.g. |
type |
ConflictType
|
Category of the conflict. |
task_id |
NotBlankStr | None
|
Related task, if any. |
subject |
NotBlankStr
|
Brief description of the dispute. |
positions |
tuple[ConflictPosition, ...]
|
Agent positions (minimum 2, unique agent IDs). |
detected_at |
AwareDatetime
|
When the conflict was detected. |
is_cross_department |
bool
|
Whether agents span multiple departments (computed from positions). |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
id(NotBlankStr) -
type(ConflictType) -
task_id(NotBlankStr | None) -
subject(NotBlankStr) -
positions(tuple[ConflictPosition, ...]) -
detected_at(AwareDatetime)
Validators:
-
_validate_positions
ConflictResolution
pydantic-model
¶
Bases: BaseModel
Decision produced by a conflict resolution strategy.
Attributes:
| Name | Type | Description |
|---|---|---|
conflict_id |
NotBlankStr
|
ID of the resolved conflict. |
outcome |
ConflictResolutionOutcome
|
How the conflict was resolved. |
winning_agent_id |
NotBlankStr | None
|
Agent whose position was chosen (None if escalated). |
winning_position |
NotBlankStr | None
|
The winning position text (None if escalated). |
decided_by |
NotBlankStr
|
Entity that made the decision (agent name or |
reasoning |
NotBlankStr
|
Explanation for the decision. |
resolved_at |
AwareDatetime
|
When the resolution was produced. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
conflict_id(NotBlankStr) -
outcome(ConflictResolutionOutcome) -
winning_agent_id(NotBlankStr | None) -
winning_position(NotBlankStr | None) -
decided_by(NotBlankStr) -
reasoning(NotBlankStr) -
resolved_at(AwareDatetime)
Validators:
-
_validate_outcome_consistency
DissentRecord
pydantic-model
¶
Bases: BaseModel
Audit artifact for a resolved conflict.
Preserves the losing agent's reasoning for organizational learning.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
NotBlankStr
|
Unique dissent record identifier. |
conflict |
Conflict
|
The original conflict. |
resolution |
ConflictResolution
|
The resolution decision. |
dissenting_agent_id |
NotBlankStr
|
Agent whose position was overruled. |
dissenting_position |
NotBlankStr
|
The overruled position text. |
strategy_used |
ConflictResolutionStrategy
|
Strategy that was used. |
timestamp |
AwareDatetime
|
When the record was created. |
metadata |
tuple[tuple[NotBlankStr, NotBlankStr], ...]
|
Extra key-value metadata pairs. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
id(NotBlankStr) -
conflict(Conflict) -
resolution(ConflictResolution) -
dissenting_agent_id(NotBlankStr) -
dissenting_position(NotBlankStr) -
strategy_used(ConflictResolutionStrategy) -
timestamp(AwareDatetime) -
metadata(tuple[tuple[NotBlankStr, NotBlankStr], ...])
Validators:
-
_validate_dissent_consistency
DissentPayload
pydantic-model
¶
Bases: BaseModel
Typed payload for DISSENT bus messages.
Extracts the key fields from a DissentRecord into a
structured, serializable payload suitable for the internal
message bus and SSE event stream.
Attributes:
| Name | Type | Description |
|---|---|---|
dissent_id |
NotBlankStr
|
Unique dissent record identifier. |
conflict_id |
NotBlankStr
|
Identifier of the originating conflict. |
dissenting_agent_id |
NotBlankStr
|
Agent whose position was overruled. |
conflict_type |
ConflictType
|
Type of the originating conflict. |
strategy_used |
ConflictResolutionStrategy
|
Resolution strategy that was applied. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
dissent_id(NotBlankStr) -
conflict_id(NotBlankStr) -
dissenting_agent_id(NotBlankStr) -
conflict_type(ConflictType) -
strategy_used(ConflictResolutionStrategy)
from_record
classmethod
¶
Build a payload from a dissent record.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record
|
DissentRecord
|
The dissent record. |
required |
Returns:
| Type | Description |
|---|---|
DissentPayload
|
A typed dissent payload. |
Source code in src/synthorg/communication/conflict_resolution/models.py
service
¶
Conflict resolution service orchestrator (see Communication design page).
Follows the DelegationService pattern: __slots__, keyword-only
constructor, audit trail list, structured logging.
ConflictResolutionService
¶
Orchestrates conflict detection, resolution, and audit.
Selects the configured strategy, delegates to the resolver, builds dissent records for all overruled positions, and maintains an audit trail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConflictResolutionConfig
|
Conflict resolution configuration. |
required |
resolvers
|
Mapping[ConflictResolutionStrategy, ConflictResolver]
|
Strategy → resolver mapping. |
required |
Source code in src/synthorg/communication/conflict_resolution/service.py
create_conflict
¶
Create a conflict from agent positions.
Validates minimum positions and unique agent IDs, and generates an ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conflict_type
|
ConflictType
|
Category of the conflict. |
required |
subject
|
NotBlankStr
|
Brief description of the dispute. |
required |
positions
|
Sequence[ConflictPosition]
|
Agent positions (minimum 2). |
required |
task_id
|
NotBlankStr | None
|
Related task ID, if any. |
None
|
Returns:
| Type | Description |
|---|---|
Conflict
|
New Conflict instance. |
Raises:
| Type | Description |
|---|---|
ConflictResolutionError
|
If fewer than 2 positions or duplicate agent IDs. |
Source code in src/synthorg/communication/conflict_resolution/service.py
resolve
async
¶
Resolve a conflict using the configured strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conflict
|
Conflict
|
The conflict to resolve. |
required |
Returns:
| Type | Description |
|---|---|
tuple[ConflictResolution, tuple[DissentRecord, ...]]
|
Tuple of |
Raises:
| Type | Description |
|---|---|
ConflictResolutionError
|
If the configured strategy has no registered resolver. |
Source code in src/synthorg/communication/conflict_resolution/service.py
get_dissent_records
¶
Return all dissent records.
Returns:
| Type | Description |
|---|---|
tuple[DissentRecord, ...]
|
Tuple of dissent records in chronological order. |
query_dissent_records
¶
Query dissent records with optional filters.
All filters are combined with AND logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
NotBlankStr | None
|
Filter by dissenting agent ID. |
None
|
conflict_type
|
ConflictType | None
|
Filter by conflict type. |
None
|
strategy
|
ConflictResolutionStrategy | None
|
Filter by strategy used. |
None
|
since
|
AwareDatetime | None
|
Filter by records after this timestamp (must be timezone-aware). |
None
|
Returns:
| Type | Description |
|---|---|
tuple[DissentRecord, ...]
|
Matching dissent records. |
Source code in src/synthorg/communication/conflict_resolution/service.py
Meeting Protocol¶
protocol
¶
Meeting protocol interface (see Communication design page).
Defines the MeetingProtocol protocol, the ConflictDetector
protocol, and the AgentCaller type alias used to invoke agents
during a meeting without coupling to the engine layer.
AgentCaller
module-attribute
¶
AgentCaller = Callable[[str, str, int, str], Awaitable[AgentResponse]]
Callback to invoke an agent during a meeting.
Signature: (agent_id, prompt, max_tokens, meeting_id) -> AgentResponse
The orchestrator constructs this from the engine layer, decoupling
protocol implementations from the execution engine. meeting_id
is threaded through so cost-recording attribution carries the real
meeting identifier per turn instead of a synthetic placeholder.
TaskCreator
module-attribute
¶
TaskCreator = Callable[[str, str | None, Priority], None]
Callback to create a task from a meeting action item.
Signature: (description, assignee_id, priority: Priority) -> None
Used by the orchestrator to optionally create tasks from extracted action items.
ConflictDetector
¶
Bases: Protocol
Strategy for detecting conflicts in agent responses.
Used by StructuredPhasesProtocol to determine whether a
discussion round is needed. The default implementation uses
keyword matching; alternative implementations might use
structured JSON output or tool calling for more robust detection.
detect
¶
Determine whether the response indicates conflicts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response_content
|
str
|
The conflict-check agent response text. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if conflicts were detected, False otherwise. |
Source code in src/synthorg/communication/meeting/protocol.py
MeetingProtocol
¶
Bases: Protocol
Strategy interface for meeting protocol implementations.
Each implementation defines a different structure for how agents interact during a meeting (round-robin turns, parallel position papers, structured phases with discussion).
run
async
¶
run(
*,
meeting_id,
agenda,
leader_id,
participant_ids,
agent_caller,
token_budget,
lens_assignments=None,
)
Execute the meeting protocol and produce minutes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
meeting_id
|
str
|
Unique identifier for this meeting. |
required |
agenda
|
MeetingAgenda
|
The meeting agenda. |
required |
leader_id
|
str
|
ID of the agent leading the meeting. |
required |
participant_ids
|
tuple[str, ...]
|
IDs of participating agents. |
required |
agent_caller
|
AgentCaller
|
Callback to invoke agents. |
required |
token_budget
|
int
|
Maximum tokens for the entire meeting. |
required |
lens_assignments
|
Mapping[str, str] | None
|
Optional mapping of participant ID to strategic lens name. When provided, protocols inject the lens perspective into each participant's prompt. |
None
|
Returns:
| Type | Description |
|---|---|
MeetingMinutes
|
Complete meeting minutes. |
Source code in src/synthorg/communication/meeting/protocol.py
get_protocol_type
¶
Return the protocol type this implementation handles.
Returns:
| Type | Description |
|---|---|
MeetingProtocolType
|
The meeting protocol type enum value. |
config
¶
Meeting protocol configuration models (see Communication design page).
RoundRobinConfig
pydantic-model
¶
Bases: BaseModel
Configuration for the round-robin meeting protocol.
Attributes:
| Name | Type | Description |
|---|---|---|
max_turns_per_agent |
int
|
Maximum turns each agent may take. |
max_total_turns |
int
|
Hard cap on total turns across all agents. |
leader_summarizes |
bool
|
Whether the leader produces a final summary. |
summary_reserve_fraction |
float
|
Fraction of the token budget reserved for the summary phase (0.0--1.0). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
max_turns_per_agent(int) -
max_total_turns(int) -
leader_summarizes(bool) -
summary_reserve_fraction(float)
PositionPapersConfig
pydantic-model
¶
Bases: BaseModel
Configuration for the position-papers meeting protocol.
Attributes:
| Name | Type | Description |
|---|---|---|
max_tokens_per_position |
int
|
Token budget per position paper. |
synthesizer |
NotBlankStr
|
Who performs synthesis. The sentinel
|
synthesis_reserve_fraction |
float
|
Fraction of the token budget reserved for the synthesis phase (0.0--1.0). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
max_tokens_per_position(int) -
synthesizer(NotBlankStr) -
synthesis_reserve_fraction(float)
StructuredPhasesConfig
pydantic-model
¶
Bases: BaseModel
Configuration for the structured-phases meeting protocol.
Attributes:
| Name | Type | Description |
|---|---|---|
skip_discussion_if_no_conflicts |
bool
|
Skip discussion when no conflicts are detected. |
max_discussion_tokens |
int
|
Token budget for the discussion round. |
synthesis_reserve_fraction |
float
|
Fraction of the remaining token budget reserved for the synthesis phase (0.0--1.0). |
conflict_detector |
ConflictDetectorType
|
Which conflict-detection strategy the structured-phases protocol uses to decide whether discussion is needed. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
skip_discussion_if_no_conflicts(bool) -
max_discussion_tokens(int) -
synthesis_reserve_fraction(float) -
conflict_detector(ConflictDetectorType)
skip_discussion_if_no_conflicts
pydantic-field
¶
Skip discussion when no conflicts detected
max_discussion_tokens
pydantic-field
¶
Token budget for discussion round
synthesis_reserve_fraction
pydantic-field
¶
Fraction of remaining token budget reserved for synthesis
conflict_detector
pydantic-field
¶
Conflict-detection strategy discriminator
MeetingProtocolConfig
pydantic-model
¶
Bases: BaseModel
Top-level meeting protocol configuration.
Selects which protocol strategy to use and carries the
per-protocol settings. The three sub-config fields below are all
materialised eagerly with sensible defaults; the factory consumes
ONLY the sub-config matching :attr:protocol. Setting fields on a
sub-config that does not match the active protocol (for example
position_papers.synthesizer = "alice" while protocol is
ROUND_ROBIN) is silently ignored. Pydantic discriminated
unions would express this invariant in the type system but were
deferred so the YAML config can serialise every sub-config slot
independently without a discriminator wrapper.
Attributes:
| Name | Type | Description |
|---|---|---|
protocol |
MeetingProtocolType
|
Which protocol strategy to use. |
auto_create_tasks |
bool
|
Whether to auto-create tasks from action items extracted during any protocol execution. |
max_tasks_per_meeting |
int | None
|
Optional cap on how many tasks to create from a single meeting's action items. |
round_robin |
RoundRobinConfig
|
Round-robin protocol settings (used only when
|
position_papers |
PositionPapersConfig
|
Position-papers protocol settings (used only
when |
structured_phases |
StructuredPhasesConfig
|
Structured-phases protocol settings (used
only when |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
protocol(MeetingProtocolType) -
auto_create_tasks(bool) -
max_tasks_per_meeting(int | None) -
round_robin(RoundRobinConfig) -
position_papers(PositionPapersConfig) -
structured_phases(StructuredPhasesConfig)
max_tasks_per_meeting
pydantic-field
¶
Maximum tasks to create from a single meeting's action items
models
¶
Meeting protocol domain models (see Communication design page).
AgentResponse
pydantic-model
¶
Bases: BaseModel
Result of a single agent invocation during a meeting.
Attributes:
| Name | Type | Description |
|---|---|---|
agent_id |
NotBlankStr
|
Identifier of the agent that responded. |
content |
str
|
Text content of the response. |
input_tokens |
int
|
Tokens consumed by the prompt. |
output_tokens |
int
|
Tokens generated in the response. |
cost |
float
|
Estimated cost of the invocation. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
agent_id(NotBlankStr) -
content(str) -
input_tokens(int) -
output_tokens(int) -
cost(float)
MeetingAgendaItem
pydantic-model
¶
Bases: BaseModel
A single topic on the meeting agenda.
Attributes:
| Name | Type | Description |
|---|---|---|
title |
NotBlankStr
|
Short title of the agenda topic. |
description |
str
|
Detailed description of the topic. |
presenter_id |
NotBlankStr | None
|
Agent who presents this item (optional). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
title(NotBlankStr) -
description(str) -
presenter_id(NotBlankStr | None)
MeetingAgenda
pydantic-model
¶
Bases: BaseModel
Full meeting agenda.
Attributes:
| Name | Type | Description |
|---|---|---|
title |
NotBlankStr
|
Meeting title. |
context |
str
|
Background context for the meeting. |
items |
tuple[MeetingAgendaItem, ...]
|
Ordered agenda items. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
title(NotBlankStr) -
context(str) -
items(tuple[MeetingAgendaItem, ...])
MeetingContribution
pydantic-model
¶
Bases: BaseModel
An agent's contribution during a meeting phase.
Attributes:
| Name | Type | Description |
|---|---|---|
agent_id |
NotBlankStr
|
Identifier of the contributing agent. |
content |
str
|
Text content of the contribution. |
phase |
MeetingPhase
|
Meeting phase during which this was contributed. |
turn_number |
int
|
Turn number within the meeting. |
input_tokens |
int
|
Prompt tokens consumed. |
output_tokens |
int
|
Response tokens generated. |
timestamp |
AwareDatetime
|
When the contribution was recorded. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
agent_id(NotBlankStr) -
content(str) -
phase(MeetingPhase) -
turn_number(int) -
input_tokens(int) -
output_tokens(int) -
timestamp(AwareDatetime)
ActionItem
pydantic-model
¶
Bases: BaseModel
An action item extracted from meeting decisions.
Attributes:
| Name | Type | Description |
|---|---|---|
description |
NotBlankStr
|
What needs to be done. |
assignee_id |
NotBlankStr | None
|
Agent responsible for the action. |
priority |
Priority
|
Urgency of the action item. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
description(NotBlankStr) -
assignee_id(NotBlankStr | None) -
priority(Priority)
MeetingMinutes
pydantic-model
¶
Bases: BaseModel
Complete output of a meeting protocol execution.
Attributes:
| Name | Type | Description |
|---|---|---|
meeting_id |
NotBlankStr
|
Unique meeting identifier. |
protocol_type |
MeetingProtocolType
|
Protocol that produced these minutes. |
leader_id |
NotBlankStr
|
Agent who led the meeting. |
participant_ids |
tuple[NotBlankStr, ...]
|
Agents who participated. |
agenda |
MeetingAgenda
|
The meeting agenda. |
contributions |
tuple[MeetingContribution, ...]
|
All agent contributions in order. |
summary |
str
|
Final meeting summary text. |
decisions |
tuple[NotBlankStr, ...]
|
Decisions made during the meeting. |
action_items |
tuple[ActionItem, ...]
|
Extracted action items. |
conflicts_detected |
bool
|
Whether conflicts were detected. |
total_input_tokens |
int
|
Total prompt tokens consumed. |
total_output_tokens |
int
|
Total response tokens generated. |
started_at |
AwareDatetime
|
When the meeting started. |
ended_at |
AwareDatetime
|
When the meeting ended. |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
meeting_id(NotBlankStr) -
protocol_type(MeetingProtocolType) -
leader_id(NotBlankStr) -
participant_ids(tuple[NotBlankStr, ...]) -
agenda(MeetingAgenda) -
contributions(tuple[MeetingContribution, ...]) -
summary(str) -
decisions(tuple[NotBlankStr, ...]) -
action_items(tuple[ActionItem, ...]) -
conflicts_detected(bool) -
total_input_tokens(int) -
total_output_tokens(int) -
started_at(AwareDatetime) -
ended_at(AwareDatetime)
Validators:
-
_validate_timing -
_validate_participants -
_validate_token_aggregates
MeetingRecord
pydantic-model
¶
Bases: BaseModel
Audit trail entry for a meeting execution.
Attributes:
| Name | Type | Description |
|---|---|---|
meeting_id |
NotBlankStr
|
Unique meeting identifier. |
meeting_type_name |
NotBlankStr
|
Name of the meeting type from config. |
protocol_type |
MeetingProtocolType
|
Protocol strategy used. |
status |
MeetingStatus
|
Final status of the meeting. |
minutes |
MeetingMinutes | None
|
Complete minutes if meeting succeeded. |
error_message |
NotBlankStr | None
|
Error description if meeting failed. |
token_budget |
int
|
Token budget that was allocated. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
meeting_id(NotBlankStr) -
meeting_type_name(NotBlankStr) -
protocol_type(MeetingProtocolType) -
status(MeetingStatus) -
minutes(MeetingMinutes | None) -
error_message(NotBlankStr | None) -
token_budget(int)
Validators:
-
_validate_status_consistency
orchestrator
¶
Meeting orchestrator -- lifecycle manager (see Communication design page).
Manages the full meeting lifecycle: validates inputs, selects the configured protocol, executes the meeting, optionally creates tasks from action items, and records audit trail entries.
MeetingOrchestrator
¶
MeetingOrchestrator(
*,
protocol_registry,
agent_caller,
task_creator=None,
strategy_config=None,
lens_assigner=None,
config_resolver=None,
)
Lifecycle manager for meeting execution.
Coordinates protocol selection, execution, task creation from action items, and audit trail recording. Meeting records are stored in memory; see the persistence layer for durable storage when available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
protocol_registry
|
Mapping[MeetingProtocolType, MeetingProtocol]
|
Mapping of protocol types to implementations. |
required |
agent_caller
|
AgentCaller
|
Callback to invoke agents during meetings. |
required |
task_creator
|
TaskCreator | None
|
Optional callback to create tasks from action items. |
None
|
Source code in src/synthorg/communication/meeting/orchestrator.py
run_meeting
async
¶
run_meeting(
*,
meeting_type_name,
protocol_config,
agenda,
leader_id,
participant_ids,
token_budget,
)
Execute a meeting and return the audit record.
Validation errors (MeetingParticipantError,
MeetingProtocolNotFoundError) are raised directly.
Domain and runtime errors during protocol execution are caught
and returned as a MeetingRecord with FAILED or
BUDGET_EXHAUSTED status. BaseException subclasses
(e.g. KeyboardInterrupt) are NOT caught.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
meeting_type_name
|
str
|
Name of the meeting type from config. |
required |
protocol_config
|
MeetingProtocolConfig
|
Protocol configuration to use. |
required |
agenda
|
MeetingAgenda
|
The meeting agenda. |
required |
leader_id
|
str
|
ID of the agent leading the meeting. |
required |
participant_ids
|
tuple[str, ...]
|
IDs of participating agents. |
required |
token_budget
|
int
|
Maximum tokens for the meeting (must be > 0). |
required |
Returns:
| Type | Description |
|---|---|
MeetingRecord
|
Meeting record with status and optional minutes. |
Raises:
| Type | Description |
|---|---|
MeetingProtocolNotFoundError
|
If the configured protocol is not in the registry. |
MeetingParticipantError
|
If participant list is empty, contains duplicates, or leader is in participants. |
ValueError
|
If token_budget is not positive. |
Source code in src/synthorg/communication/meeting/orchestrator.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | |
get_records
¶
Return all meeting audit records.
Returns:
| Type | Description |
|---|---|
tuple[MeetingRecord, ...]
|
Tuple of meeting records in chronological order. |
get_record
¶
Return the meeting record matching meeting_id or None.
O(1) lookup via the by-id mirror; controller endpoints fetch a single meeting without scanning the full record list.
Source code in src/synthorg/communication/meeting/orchestrator.py
delete_record
¶
Remove the meeting record matching meeting_id.
Returns True when a record was removed, False when no
record had the supplied id. Synchronous because the in-memory
store has no I/O; the surrounding service-layer wrapper is
async to match the rest of the persistence contract.