Engine¶
Agent orchestration, execution loops, task decomposition, routing, and parallel execution.
Agent Engine¶
agent_engine
¶
Agent engine -- top-level orchestrator.
Ties together prompt construction, execution context, execution loop,
tool invocation, and budget tracking into a single run() entry point.
PersonalityTrimNotifier
¶
PersonalityTrimNotifier = Callable[[PersonalityTrimPayload], Awaitable[None]]
Async callback invoked when an agent's personality section is trimmed.
PersonalityTrimPayload
¶
Bases: TypedDict
Structured payload forwarded to :data:PersonalityTrimNotifier callbacks.
AgentEngine
¶
AgentEngine(
*,
provider,
execution_loop=None,
tool_registry=None,
cost_tracker=None,
recovery_strategy=_DEFAULT_RECOVERY_STRATEGY,
shutdown_checker=None,
error_taxonomy_config=None,
budget_enforcer=None,
security_config=None,
approval_store=None,
parked_context_repo=None,
approval_gate=None,
trust_service=None,
mcp_self_consumer=None,
task_engine=None,
checkpoint_repo=None,
heartbeat_repo=None,
checkpoint_config=None,
coordinator=None,
stagnation_detector=None,
auto_loop_config=None,
hybrid_loop_config=None,
compaction_callback=None,
plan_execute_config=None,
provider_registry=None,
provider_configs=None,
model_resolver=None,
tool_invocation_tracker=None,
memory_injection_strategy=None,
ontology_injection_strategy=None,
procedural_memory_config=None,
memory_backend=None,
distillation_capture_enabled=False,
config_resolver=None,
personality_trim_notifier=None,
coordination_metrics_collector=None,
audit_log=None,
project_repo=None,
agent_middleware_chain=None,
event_reader=None,
event_stream_hub=None,
interrupt_store=None,
approval_interrupt_timeout_seconds=None,
clock=None,
)
Bases: AgentEngineContextMixin, AgentEngineErrorsMixin, AgentEngineFactoriesMixin, AgentEnginePostExecMixin, AgentEngineRecoveryMixin, AgentEngineResumeMixin
Top-level orchestrator for agent execution.
Source code in src/synthorg/engine/agent_engine.py
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 | |
coordinate
async
¶
Delegate to the multi-agent coordinator.
Source code in src/synthorg/engine/agent_engine.py
run
async
¶
run(
*,
identity,
task,
completion_config=None,
max_turns=DEFAULT_MAX_TURNS,
memory_messages=(),
timeout_seconds=None,
effective_autonomy=None,
resume_execution_id=None,
)
Execute an agent on a task.
Source code in src/synthorg/engine/agent_engine.py
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 | |
Execution Loop Protocol¶
loop_protocol
¶
Execution loop protocol and supporting models.
Defines the ExecutionLoop protocol that the agent engine calls to
run a task, along with ExecutionResult, TurnRecord,
TerminationReason, and the BudgetChecker and ShutdownChecker
type aliases.
BudgetChecker
module-attribute
¶
BudgetChecker = Callable[[AgentContext], bool]
Callback that returns True when the budget is exhausted.
ShutdownChecker
module-attribute
¶
Callback that returns True when a graceful shutdown has been requested.
NodeType
¶
Bases: StrEnum
Type of computation node executed within a turn.
Used for structural credit assignment and post-hoc trace analysis. Each turn records which node types executed, enabling fine-grained attribution of costs and failures.
BehaviorTag
¶
Bases: StrEnum
Behavior category for trace capture and eval routing.
Starting taxonomy derived from agent evaluation patterns. Extend as usage patterns reveal category fragmentation or generalization.
TerminationReason
¶
Bases: StrEnum
Why the execution loop terminated.
TurnRecord
pydantic-model
¶
Bases: BaseModel
Per-turn metadata recorded during execution.
Attributes:
| Name | Type | Description |
|---|---|---|
turn_number |
int
|
1-indexed turn number. |
input_tokens |
int
|
Input tokens consumed this turn. |
output_tokens |
int
|
Output tokens generated this turn. |
total_tokens |
int
|
Sum of input and output tokens (computed). |
cost |
float
|
Cost in the configured currency for this turn. |
tool_calls_made |
tuple[NotBlankStr, ...]
|
Names of tools invoked this turn. |
tool_call_fingerprints |
tuple[NotBlankStr, ...]
|
Deterministic fingerprints of tool
calls ( |
finish_reason |
FinishReason
|
LLM finish reason for this turn. |
call_category |
LLMCallCategory | None
|
Optional LLM call category for coordination metrics (productive, coordination, system). |
latency_ms |
float | None
|
Round-trip latency in milliseconds ( |
cache_hit |
bool | None
|
Whether the provider served this turn from cache. |
retry_count |
int | None
|
Number of retry attempts before success. |
retry_reason |
NotBlankStr | None
|
Exception type name of the last retried error. |
node_types |
tuple[NodeType, ...]
|
Node types that executed in this turn (e.g. LLM_CALL, TOOL_INVOCATION). Defaults to empty for deserialization of legacy data. |
behavior_tags |
tuple[BehaviorTag, ...]
|
Behavior categories inferred by BehaviorTaggerMiddleware. |
efficiency_delta |
EfficiencyRatios | None
|
Efficiency ratios against an ideal baseline. |
prior_tool_call_count |
int
|
Cumulative tool calls before this turn (for PTE). |
tool_response_tokens |
int
|
Tokens from tool responses this turn (for PTE). |
semantic_drift_score |
float | None
|
Similarity score (0.0--1.0) from
SemanticDriftDetector, or |
success |
bool
|
Whether this turn completed without error or content filter (computed). |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
turn_number(int) -
input_tokens(int) -
output_tokens(int) -
cost(float) -
tool_calls_made(tuple[NotBlankStr, ...]) -
tool_call_fingerprints(tuple[NotBlankStr, ...]) -
finish_reason(FinishReason) -
call_category(LLMCallCategory | None) -
latency_ms(float | None) -
cache_hit(bool | None) -
retry_count(int | None) -
retry_reason(NotBlankStr | None) -
node_types(tuple[NodeType, ...]) -
behavior_tags(tuple[BehaviorTag, ...]) -
efficiency_delta(EfficiencyRatios | None) -
prior_tool_call_count(int) -
tool_response_tokens(int) -
semantic_drift_score(float | None)
Validators:
-
_validate_retry_consistency
tool_call_fingerprints
pydantic-field
¶
Deterministic fingerprints of tool calls (name:args_hash)
call_category
pydantic-field
¶
LLM call category (productive, coordination, system)
latency_ms
pydantic-field
¶
Round-trip latency in milliseconds from provider base class
behavior_tags
pydantic-field
¶
Behavior categories inferred by BehaviorTaggerMiddleware
efficiency_delta
pydantic-field
¶
Efficiency ratios against an ideal baseline
prior_tool_call_count
pydantic-field
¶
Cumulative tool calls before this turn (for PTE)
tool_response_tokens
pydantic-field
¶
Tokens from tool responses this turn (for PTE)
semantic_drift_score
pydantic-field
¶
Semantic drift similarity score (from SemanticDriftDetector)
ExecutionResult
pydantic-model
¶
Bases: BaseModel
Result returned by an execution loop.
Attributes:
| Name | Type | Description |
|---|---|---|
context |
AgentContext
|
Final agent context after execution. |
termination_reason |
TerminationReason
|
Why the loop stopped. |
turns |
tuple[TurnRecord, ...]
|
Per-turn metadata records. |
total_tool_calls |
int
|
Total tool calls across all turns (computed). |
error_message |
str | None
|
Error description when termination_reason is ERROR. |
metadata |
dict[str, object]
|
Forward-compatible dict for future loop types.
Note: |
Deep-copy metadata dict at construction boundary.
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
context(AgentContext) -
termination_reason(TerminationReason) -
turns(tuple[TurnRecord, ...]) -
error_message(str | None) -
metadata(dict[str, object])
Validators:
-
_validate_error_message
Source code in src/synthorg/engine/loop_protocol.py
ExecutionLoop
¶
Bases: Protocol
Protocol for agent execution loops.
The agent engine calls execute to run a task through the loop.
Implementations decide the control flow (ReAct, Plan-and-Execute, etc.)
but all return an ExecutionResult with a TerminationReason.
execute
async
¶
execute(
*,
context,
provider,
tool_invoker=None,
budget_checker=None,
shutdown_checker=None,
completion_config=None,
)
Run the execution loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
AgentContext
|
Initial agent context with conversation and identity. |
required |
provider
|
CompletionProvider
|
LLM completion provider. |
required |
tool_invoker
|
ToolInvokerProtocol | None
|
Optional tool invoker for tool execution. |
None
|
budget_checker
|
BudgetChecker | None
|
Optional callback; returns |
None
|
shutdown_checker
|
ShutdownChecker | None
|
Optional callback; returns |
None
|
completion_config
|
CompletionConfig | None
|
Optional per-execution override for temperature/max_tokens (defaults to identity's model config). |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
Execution result with final context and termination reason. |
Source code in src/synthorg/engine/loop_protocol.py
make_budget_checker
¶
Create a budget checker if the task has a positive budget limit.
The returned callable returns True when accumulated cost meets
or exceeds the limit (budget exhausted), False otherwise.
Returns None when there is no positive budget limit.
Source code in src/synthorg/engine/loop_protocol.py
ReAct Loop¶
react_loop
¶
ReAct execution loop -- think, act, observe.
Implements the ExecutionLoop protocol using the ReAct pattern:
check shutdown -> check budget -> call LLM -> record turn ->
check for LLM errors -> update context -> handle completion or
(check shutdown -> execute tools) -> repeat.
ReactLoop
¶
ReactLoop(
checkpoint_callback=None,
*,
approval_gate=None,
stagnation_detector=None,
compaction_callback=None,
)
ReAct execution loop: reason, act, observe.
The loop checks for shutdown, checks the budget, calls the LLM, checks for termination conditions, executes any requested tools, feeds results back, and repeats until the LLM signals completion, the turn limit is reached, the budget is exhausted, a shutdown is requested, or an error occurs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
checkpoint_callback
|
CheckpointCallback | None
|
Optional async callback invoked after each completed turn; the callback itself decides whether to persist. |
None
|
approval_gate
|
ApprovalGate | None
|
Optional gate that checks for pending escalations
after tool execution and parks the agent when approval is
required. |
None
|
stagnation_detector
|
StagnationDetector | None
|
Optional detector that checks for
repetitive tool-call patterns and intervenes with
corrective prompts or early termination. |
None
|
compaction_callback
|
CompactionCallback | None
|
Optional async callback invoked at turn
boundaries to compress older conversation turns when the
context fill level is high. |
None
|
Source code in src/synthorg/engine/react_loop.py
get_loop_type
¶
execute
async
¶
execute(
*,
context,
provider,
tool_invoker=None,
budget_checker=None,
shutdown_checker=None,
completion_config=None,
)
Run the ReAct loop until termination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
AgentContext
|
Initial agent context with conversation. |
required |
provider
|
CompletionProvider
|
LLM completion provider. |
required |
tool_invoker
|
ToolInvokerProtocol | None
|
Optional tool invoker for tool execution. |
None
|
budget_checker
|
BudgetChecker | None
|
Optional budget exhaustion callback. |
None
|
shutdown_checker
|
ShutdownChecker | None
|
Optional callback; returns |
None
|
completion_config
|
CompletionConfig | None
|
Optional per-execution config override. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
Execution result with final context and termination info. |
Raises:
| Type | Description |
|---|---|
MemoryError
|
Re-raised unconditionally (non-recoverable). |
RecursionError
|
Re-raised unconditionally (non-recoverable). |
Source code in src/synthorg/engine/react_loop.py
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | |
Plan-and-Execute Loop¶
plan_execute_loop
¶
Plan-and-Execute execution loop.
Implements the ExecutionLoop protocol using a two-phase approach:
1. Plan -- ask the LLM to decompose the task into ordered steps.
Planning calls pass tools=None (no tool access during planning).
2. Execute -- run each step via a mini-ReAct sub-loop with tools.
Re-planning is triggered when a step fails, up to a configurable limit. When re-planning is exhausted, the loop terminates with ERROR.
PlanExecuteLoop
¶
PlanExecuteLoop(
config=None,
checkpoint_callback=None,
*,
approval_gate=None,
stagnation_detector=None,
compaction_callback=None,
)
Bases: PlanExecuteStepMixin
Plan-and-Execute execution loop.
Decomposes a task into steps via LLM planning, then executes each step with a mini-ReAct sub-loop. Supports re-planning on failure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
PlanExecuteConfig | None
|
Loop configuration. Defaults to |
None
|
checkpoint_callback
|
CheckpointCallback | None
|
Optional per-turn checkpoint callback. |
None
|
approval_gate
|
ApprovalGate | None
|
Optional gate that checks for pending escalations
after tool execution and parks the agent when approval is
required. |
None
|
stagnation_detector
|
StagnationDetector | None
|
Optional detector that checks for
repetitive tool-call patterns within each step and
intervenes with corrective prompts or early termination.
|
None
|
compaction_callback
|
CompactionCallback | None
|
Optional async callback invoked at turn
boundaries to compress older conversation turns when the
context fill level is high. |
None
|
Source code in src/synthorg/engine/plan_execute_loop.py
get_loop_type
¶
execute
async
¶
execute(
*,
context,
provider,
tool_invoker=None,
budget_checker=None,
shutdown_checker=None,
completion_config=None,
)
Run the Plan-and-Execute loop until termination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
AgentContext
|
Initial agent context with conversation. |
required |
provider
|
CompletionProvider
|
LLM completion provider. |
required |
tool_invoker
|
ToolInvokerProtocol | None
|
Optional tool invoker for tool execution. |
None
|
budget_checker
|
BudgetChecker | None
|
Optional budget exhaustion callback. |
None
|
shutdown_checker
|
ShutdownChecker | None
|
Optional callback; returns |
None
|
completion_config
|
CompletionConfig | None
|
Optional per-execution config override. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
Execution result with final context and termination info. |
Raises:
| Type | Description |
|---|---|
MemoryError
|
Re-raised unconditionally (non-recoverable). |
RecursionError
|
Re-raised unconditionally (non-recoverable). |
Source code in src/synthorg/engine/plan_execute_loop.py
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | |
Plan Models¶
plan_models
¶
Data models for the Plan-and-Execute execution loop.
Defines the plan structure (steps, status, revisions) and the configuration model for the plan-execute loop.
StepStatus
¶
Bases: StrEnum
Execution status of a plan step.
PlanStep
pydantic-model
¶
Bases: BaseModel
A single step within an execution plan.
Attributes:
| Name | Type | Description |
|---|---|---|
step_number |
int
|
1-indexed position in the plan. |
description |
NotBlankStr
|
What this step should accomplish. |
expected_outcome |
NotBlankStr
|
The anticipated result of this step. |
status |
StepStatus
|
Current execution status of this step. |
actual_outcome |
NotBlankStr | None
|
Observed result after execution (if any). |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
step_number(int) -
description(NotBlankStr) -
expected_outcome(NotBlankStr) -
status(StepStatus) -
actual_outcome(NotBlankStr | None)
ExecutionPlan
pydantic-model
¶
Bases: BaseModel
An ordered sequence of plan steps for task execution.
Attributes:
| Name | Type | Description |
|---|---|---|
steps |
tuple[PlanStep, ...]
|
Ordered tuple of plan steps. |
revision_number |
int
|
Plan revision counter (0 = original). |
original_task_summary |
NotBlankStr
|
Brief summary of the task being planned. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
steps(tuple[PlanStep, ...]) -
revision_number(int) -
original_task_summary(NotBlankStr)
Validators:
-
_validate_sequential_step_numbers
PlanExecuteConfig
pydantic-model
¶
Bases: BaseModel
Configuration for the Plan-and-Execute loop.
Attributes:
| Name | Type | Description |
|---|---|---|
planner_model |
NotBlankStr | None
|
Model override for plan generation.
|
executor_model |
NotBlankStr | None
|
Model override for step execution.
|
max_replans |
int
|
Maximum number of re-planning attempts on step failure. |
Config:
frozen:Trueextra:forbidallow_inf_nan:False
Fields:
-
planner_model(NotBlankStr | None) -
executor_model(NotBlankStr | None) -
max_replans(int)
Execution Context¶
context
¶
Agent execution context.
Wraps an AgentIdentity (frozen config) with evolving runtime state
(conversation, cost, turn count, task execution) using
model_copy(update=...) for cheap, immutable state transitions.
DEFAULT_MAX_TURNS
module-attribute
¶
Default hard limit on LLM turns per agent execution.
AgentContextSnapshot
pydantic-model
¶
Bases: BaseModel
Compact frozen snapshot of an AgentContext for reporting.
Attributes:
| Name | Type | Description |
|---|---|---|
execution_id |
NotBlankStr
|
Unique execution run identifier. |
agent_id |
NotBlankStr
|
Agent identifier (string form of UUID). |
task_id |
NotBlankStr | None
|
Task identifier, if a task is active. |
turn_count |
int
|
Number of turns completed. |
accumulated_cost |
TokenUsage
|
Running cost totals. |
task_status |
TaskStatus | None
|
Current task status, if a task is active. |
started_at |
AwareDatetime
|
When the execution began. |
snapshot_at |
AwareDatetime
|
When this snapshot was taken. |
message_count |
int
|
Number of messages in the conversation. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
execution_id(NotBlankStr) -
agent_id(NotBlankStr) -
task_id(NotBlankStr | None) -
turn_count(int) -
accumulated_cost(TokenUsage) -
task_status(TaskStatus | None) -
started_at(AwareDatetime) -
snapshot_at(AwareDatetime) -
message_count(int) -
context_fill_tokens(int) -
context_fill_percent(float | None)
Validators:
-
_validate_task_pair
AgentContext
pydantic-model
¶
Bases: BaseModel
Frozen runtime context for agent execution.
All state evolution happens via model_copy(update=...).
The context tracks the conversation, accumulated cost, and
optionally a TaskExecution for task-bound agent runs.
Attributes:
| Name | Type | Description |
|---|---|---|
execution_id |
NotBlankStr
|
Unique identifier for this execution run. |
identity |
AgentIdentity
|
Frozen agent identity configuration. |
task_execution |
TaskExecution | None
|
Current task execution state (if any). |
conversation |
tuple[ChatMessage, ...]
|
Accumulated chat messages. |
accumulated_cost |
TokenUsage
|
Running token usage and cost totals. |
turn_count |
int
|
Number of LLM turns completed. |
max_turns |
int
|
Hard limit on turns before the engine stops. |
started_at |
AwareDatetime
|
When this execution began. |
context_fill_tokens |
int
|
Estimated tokens currently in the full context (system prompt + conversation + tool defs). |
context_capacity_tokens |
int | None
|
Model's max context window tokens,
or |
compression_metadata |
CompressionMetadata | None
|
Metadata about conversation compression, set when compaction has occurred. |
async_task_state |
AsyncTaskStateChannel
|
Dedicated state channel for tracked async
tasks. Separate from |
loaded_tools |
frozenset[str]
|
Tool names with L2 bodies active in context. |
loaded_resources |
frozenset[tuple[str, str]]
|
|
tool_load_order |
tuple[str, ...]
|
Insertion-ordered tool names for FIFO auto-unload under budget pressure. |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
execution_id(NotBlankStr) -
identity(AgentIdentity) -
task_execution(TaskExecution | None) -
conversation(tuple[ChatMessage, ...]) -
accumulated_cost(TokenUsage) -
turn_count(int) -
max_turns(int) -
started_at(AwareDatetime) -
context_fill_tokens(int) -
context_capacity_tokens(int | None) -
compression_metadata(CompressionMetadata | None) -
async_task_state(AsyncTaskStateChannel) -
loaded_tools(frozenset[str]) -
loaded_resources(frozenset[tuple[str, str]]) -
tool_load_order(tuple[str, ...])
Validators:
-
_validate_disclosure_consistency
accumulated_cost
pydantic-field
¶
accumulated_cost = ZERO_TOKEN_USAGE
Running cost totals across all turns
context_capacity_tokens
pydantic-field
¶
Model's max context window tokens
compression_metadata
pydantic-field
¶
Compression metadata when compacted
async_task_state
pydantic-field
¶
Async task tracking state (survives compaction and context reset)
loaded_resources
pydantic-field
¶
(tool_name, resource_id) pairs with L3 active
context_fill_percent
property
¶
Percentage of context window currently filled.
Returns None when context capacity is unknown.
has_turns_remaining
property
¶
Whether the agent has turns remaining before hitting max_turns.
from_identity
classmethod
¶
from_identity(
identity, *, task=None, max_turns=DEFAULT_MAX_TURNS, context_capacity_tokens=None
)
Create a fresh execution context from an agent identity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
identity
|
AgentIdentity
|
The frozen agent identity card. |
required |
task
|
Task | None
|
Optional task to bind to this execution. |
None
|
max_turns
|
int
|
Maximum number of LLM turns allowed. |
DEFAULT_MAX_TURNS
|
context_capacity_tokens
|
int | None
|
Model's max context window
tokens, or |
None
|
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Source code in src/synthorg/engine/context.py
with_message
¶
Append a single message to the conversation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg
|
ChatMessage
|
The chat message to append. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Source code in src/synthorg/engine/context.py
with_turn_completed
¶
Record a completed turn.
Increments turn count, appends the response message, and accumulates cost on both the context and the task execution (if present).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
usage
|
TokenUsage
|
Token usage from this turn's LLM call. |
required |
response_msg
|
ChatMessage
|
The assistant's response message. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Raises:
| Type | Description |
|---|---|
MaxTurnsExceededError
|
If |
Source code in src/synthorg/engine/context.py
with_context_fill
¶
Update the estimated context fill level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fill_tokens
|
int
|
New estimated fill in tokens. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/synthorg/engine/context.py
with_async_task_state
¶
Replace the async task state channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AsyncTaskStateChannel
|
New state channel. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Source code in src/synthorg/engine/context.py
with_compression
¶
Replace conversation with a compressed version.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metadata
|
CompressionMetadata
|
Compression metadata to attach. |
required |
compressed_conversation
|
tuple[ChatMessage, ...]
|
The compressed message tuple. |
required |
fill_tokens
|
int
|
Updated fill estimate after compression. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Source code in src/synthorg/engine/context.py
with_task_transition
¶
Transition the task execution status.
Delegates to
:meth:~synthorg.engine.task_execution.TaskExecution.with_transition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
TaskStatus
|
The desired target status. |
required |
reason
|
str
|
Optional reason for the transition. |
''
|
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Raises:
| Type | Description |
|---|---|
ExecutionStateError
|
If no task execution is set. |
ValueError
|
If the transition is invalid (from
|
Source code in src/synthorg/engine/context.py
to_snapshot
¶
Create a compact snapshot for reporting and logging.
Returns:
| Type | Description |
|---|---|
AgentContextSnapshot
|
Frozen |
Source code in src/synthorg/engine/context.py
with_tool_loaded
¶
Mark a tool's L2 body as loaded.
Idempotent: loading an already-loaded tool is a no-op.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_name
|
str
|
Name of the tool to load. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Source code in src/synthorg/engine/context.py
with_tool_unloaded
¶
Mark a tool's L2 body as unloaded.
Also removes any L3 resources for the unloaded tool. Idempotent: unloading an already-unloaded tool is a no-op.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_name
|
str
|
Name of the tool to unload. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Source code in src/synthorg/engine/context.py
with_resource_loaded
¶
Mark an L3 resource as fetched.
Idempotent: loading an already-loaded resource is a no-op.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tool_name
|
str
|
Name of the tool owning the resource. |
required |
resource_id
|
str
|
Identifier of the resource. |
required |
Returns:
| Type | Description |
|---|---|
AgentContext
|
New |
Source code in src/synthorg/engine/context.py
Prompt Builder¶
prompt
¶
System prompt construction from agent identity and context.
Translates agent configuration (personality, skills, authority, role) into contextually rich system prompts that shape agent behavior during LLM calls.
Non-inferable principle: System prompts should contain only information
that agents cannot discover by reading the codebase or environment. Full
tool definitions are delivered via the LLM provider's API tools
parameter. However, lightweight L1 metadata (name, category, cost tier,
one-line description) IS injected into the system prompt so agents can
discover what tools exist and decide which to load via load_tool().
Example::
from synthorg.engine.prompt import build_system_prompt
prompt = build_system_prompt(agent=agent_identity, task=task)
prompt.content # rendered system prompt string
SystemPrompt
pydantic-model
¶
Bases: BaseModel
Immutable result of system prompt construction.
Attributes:
| Name | Type | Description |
|---|---|---|
content |
str
|
Full rendered prompt text. |
template_version |
str
|
Version of the template that produced this prompt. |
estimated_tokens |
int
|
Token estimate of the prompt content. |
sections |
tuple[str, ...]
|
Names of sections included in the prompt. |
metadata |
dict[str, str]
|
Agent identity metadata (agent_id, name, role, department, level, and optionally profile_tier). |
personality_trim_info |
PersonalityTrimInfo | None
|
Populated when personality section was trimmed to fit the profile's token budget. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
content(str) -
template_version(str) -
estimated_tokens(int) -
sections(tuple[str, ...]) -
metadata(dict[str, str]) -
personality_trim_info(PersonalityTrimInfo | None)
personality_trim_info
pydantic-field
¶
Populated when personality section was trimmed
build_system_prompt
¶
build_system_prompt(
*,
agent,
role=None,
task=None,
available_tools=(),
l1_summaries=(),
company=None,
org_policies=(),
max_tokens=None,
custom_template=None,
token_estimator=None,
effective_autonomy=None,
context_budget_indicator=None,
currency=DEFAULT_CURRENCY,
model_tier=None,
personality_trimming_enabled=True,
max_personality_tokens_override=None,
strategy_config=None,
async_task_state=None,
)
Build a system prompt from agent identity and optional context.
When max_tokens is provided and the prompt exceeds it, optional
sections are progressively trimmed (strategy, company, task,
org_policies).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
AgentIdentity
|
Agent identity containing personality, skills, authority. |
required |
role
|
Role | None
|
Optional role with description and responsibilities. |
None
|
task
|
Task | None
|
Optional task context injected into the prompt. |
None
|
available_tools
|
tuple[ToolDefinition, ...]
|
Tool definitions populated into template context for custom templates only; the default template omits tools per D22 (non-inferable principle). |
()
|
l1_summaries
|
tuple[ToolL1Metadata, ...]
|
L1 metadata for system prompt injection. Lightweight tool summaries rendered in the Available Tools section of the default template. |
()
|
company
|
Company | None
|
Opt-in. Non-inferable principle recommends omitting unless agents need org-level context they cannot discover. |
None
|
org_policies
|
tuple[str, ...]
|
Company-wide policy texts to inject into prompt. |
()
|
max_tokens
|
int | None
|
Token budget; sections are trimmed if exceeded. |
None
|
custom_template
|
str | None
|
Optional Jinja2 template string override. |
None
|
token_estimator
|
PromptTokenEstimator | None
|
Custom token estimator (defaults to char/4). |
None
|
effective_autonomy
|
EffectiveAutonomy | None
|
Resolved autonomy for the current run. |
None
|
context_budget_indicator
|
str | None
|
Formatted context budget indicator string to inject into the prompt. |
None
|
currency
|
CurrencyCode
|
ISO 4217 currency code for budget displays. Validated
against the allowlist in |
DEFAULT_CURRENCY
|
model_tier
|
ModelTier | None
|
Model capability tier for prompt profile selection.
|
None
|
personality_trimming_enabled
|
bool
|
When |
True
|
max_personality_tokens_override
|
int | None
|
When set to a positive value,
overrides the profile's |
None
|
strategy_config
|
StrategyConfig | None
|
Strategy and trendslop mitigation config.
When provided and the agent qualifies (C-suite/VP/Director
or has explicit |
None
|
async_task_state
|
AsyncTaskStateChannel | None
|
Optional async task state channel.
When non-empty, appends an |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Immutable |
SystemPrompt
|
class: |
Raises:
| Type | Description |
|---|---|
PromptBuildError
|
If prompt construction fails. |
Source code in src/synthorg/engine/prompt.py
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 | |
build_error_prompt
¶
Return the existing system prompt or a minimal error placeholder.
Used by the engine when the execution pipeline fails and a
SystemPrompt was never built (or was partially built).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
identity
|
AgentIdentity
|
Agent identity for metadata. |
required |
agent_id
|
str
|
String agent identifier. |
required |
system_prompt
|
SystemPrompt | None
|
Previously built prompt, or |
required |
Returns:
| Type | Description |
|---|---|
SystemPrompt
|
The existing prompt if available, else a minimal placeholder. |
Source code in src/synthorg/engine/prompt.py
Task Execution¶
task_execution
¶
Runtime task execution state.
Wraps the frozen Task config model with evolving execution state
(status, cost, turn count) using model_copy(update=...) for cheap,
immutable state transitions.
StatusTransition
pydantic-model
¶
Bases: BaseModel
Frozen audit record for a single status transition.
Attributes:
| Name | Type | Description |
|---|---|---|
from_status |
TaskStatus
|
Status before the transition. |
to_status |
TaskStatus
|
Status after the transition. |
timestamp |
AwareDatetime
|
When the transition occurred (timezone-aware). |
reason |
str
|
Optional human-readable reason for the transition. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
from_status(TaskStatus) -
to_status(TaskStatus) -
timestamp(AwareDatetime) -
reason(str)
TaskExecution
pydantic-model
¶
Bases: BaseModel
Frozen runtime wrapper around a Task for execution tracking.
All state evolution happens via model_copy(update=...).
Transitions are validated explicitly via
:func:~synthorg.core.task_transitions.validate_transition before
the copy is made.
Attributes:
| Name | Type | Description |
|---|---|---|
task |
Task
|
Original frozen task definition. |
status |
TaskStatus
|
Current execution status (starts from |
transition_log |
tuple[StatusTransition, ...]
|
Audit trail of status transitions. |
accumulated_cost |
TokenUsage
|
Running token usage and cost totals. |
turn_count |
int
|
Number of LLM turns completed. |
retry_count |
int
|
Number of previous failure-reassignment cycles. |
started_at |
AwareDatetime | None
|
Set by |
completed_at |
AwareDatetime | None
|
When execution reached a terminal state. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
task(Task) -
status(TaskStatus) -
transition_log(tuple[StatusTransition, ...]) -
accumulated_cost(TokenUsage) -
turn_count(int) -
retry_count(int) -
started_at(AwareDatetime | None) -
completed_at(AwareDatetime | None)
from_task
classmethod
¶
Create a fresh execution from a task definition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
The frozen task to wrap. |
required |
retry_count
|
int
|
Number of previous failure-reassignment cycles. |
0
|
Returns:
| Type | Description |
|---|---|
TaskExecution
|
New |
Source code in src/synthorg/engine/task_execution.py
with_transition
¶
Validate and apply a status transition.
Raises:
| Type | Description |
|---|---|
ValueError
|
If the transition is invalid. |
Source code in src/synthorg/engine/task_execution.py
with_cost
¶
Accumulate token usage and increment turn count.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
usage
|
TokenUsage
|
Token usage from a single LLM call. |
required |
Returns:
| Type | Description |
|---|---|
TaskExecution
|
New |
Raises:
| Type | Description |
|---|---|
ExecutionStateError
|
If execution is in a terminal state. |
Source code in src/synthorg/engine/task_execution.py
Parallel Execution¶
parallel
¶
Parallel agent execution orchestrator.
Coordinates multiple AgentEngine.run() calls in parallel using
structured concurrency (asyncio.TaskGroup), with error isolation,
concurrency limits, resource locking, and progress tracking.
Inspired by the ToolInvoker.invoke_all() pattern from
tools/invoker.py (TaskGroup + Semaphore + guarded
execution), extended with fail-fast, progress tracking, and
CancelledError handling.
ProgressCallback
module-attribute
¶
Synchronous callback invoked on progress updates.
Called directly (not awaited) from the executor's event loop; must not block. Async functions will produce un-awaited coroutines.
ParallelExecutor
¶
ParallelExecutor(
*,
engine,
shutdown_manager=None,
resource_lock=None,
progress_callback=None,
clock=None,
)
Orchestrates concurrent agent execution.
Composition over inheritance -- takes an AgentEngine and
coordinates concurrent run() calls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
engine
|
AgentEngine
|
Agent execution engine. |
required |
shutdown_manager
|
ShutdownManager | None
|
Optional shutdown manager for task registration. |
None
|
resource_lock
|
ResourceLock | None
|
Optional resource lock for exclusive file access.
Defaults to |
None
|
progress_callback
|
ProgressCallback | None
|
Optional synchronous callback invoked on progress updates. |
None
|
Source code in src/synthorg/engine/parallel.py
execute_group
async
¶
Execute a parallel group of agent assignments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group
|
ParallelExecutionGroup
|
The execution group to run. |
required |
Returns:
| Type | Description |
|---|---|
ParallelExecutionResult
|
Result with all agent outcomes. |
Raises:
| Type | Description |
|---|---|
ResourceConflictError
|
If resource claims conflict between assignments. |
ParallelExecutionError
|
If fatal errors (MemoryError, RecursionError) occurred during execution. |
Source code in src/synthorg/engine/parallel.py
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | |
Run Result¶
run_result
¶
Agent run result model.
Frozen Pydantic model wrapping ExecutionResult with outer metadata
from the engine layer (system prompt, wall-clock duration, agent/task IDs).
AgentRunResult
pydantic-model
¶
Bases: BaseModel
Immutable result of a complete agent engine run.
Wraps the ExecutionResult from the loop with engine-level
metadata: system prompt, wall-clock duration, and agent/task IDs.
Attributes:
| Name | Type | Description |
|---|---|---|
execution_result |
ExecutionResult
|
Outcome from the execution loop. |
system_prompt |
SystemPrompt
|
System prompt used for this run. |
duration_seconds |
float
|
Wall-clock run time in seconds. |
agent_id |
NotBlankStr
|
Agent identifier (string form of UUID). |
task_id |
NotBlankStr | None
|
Task identifier (always set currently; |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
execution_result(ExecutionResult) -
system_prompt(SystemPrompt) -
duration_seconds(float) -
agent_id(NotBlankStr) -
task_id(NotBlankStr | None) -
produced_artifacts(tuple[Artifact, ...]) -
currency(CurrencyCode)
currency
pydantic-field
¶
ISO 4217 currency that denominates total_cost. Populated by the engine from the active BudgetConfig.currency so cross-agent aggregations (e.g. ParallelExecutionResult.total_cost) can enforce the same-currency invariant before summing. Required so constructor sites cannot silently mis-label a non-default run as DEFAULT_CURRENCY.
completion_summary
property
¶
Extract the last assistant message content as a work summary.
Walks the conversation in reverse to find the most recent
assistant message with non-empty text content. Tool-call-only
assistant messages (content is None or empty) are skipped.
Returns:
| Type | Description |
|---|---|
str | None
|
The content string, or |
Metrics¶
metrics
¶
Task completion metrics model.
Proxy overhead metrics for an agent run, computed from
AgentRunResult data per the Operations design page.
TaskCompletionMetrics
pydantic-model
¶
Bases: BaseModel
Proxy overhead metrics for an agent run (see Operations design page).
Computed from AgentRunResult after execution to surface
orchestration overhead indicators (turns, tokens, cost, duration).
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
NotBlankStr | None
|
Task identifier ( |
agent_id |
NotBlankStr
|
Agent identifier (string form of UUID). |
turns_per_task |
int
|
Number of LLM turns to complete the task. |
tokens_per_task |
int
|
Total tokens consumed (input + output). |
cost_per_task |
float
|
Total cost for the task in the configured currency. |
duration_seconds |
float
|
Wall-clock execution time in seconds. |
prompt_tokens |
int
|
Estimated system prompt tokens (per-call estimate
from |
prompt_token_ratio |
float
|
Per-call ratio of prompt tokens to total tokens
(overhead indicator, derived via |
accuracy_effort_ratio |
float | None
|
Accuracy-effort ratio from step-level
quality signals ( |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
task_id(NotBlankStr | None) -
agent_id(NotBlankStr) -
turns_per_task(int) -
tokens_per_task(int) -
cost_per_task(float) -
duration_seconds(float) -
prompt_tokens(int) -
accuracy_effort_ratio(float | None)
Validators:
-
_cap_prompt_tokens
accuracy_effort_ratio
pydantic-field
¶
Accuracy-effort ratio from step-level quality signals (None when quality signals are unavailable)
prompt_token_ratio
property
¶
Per-call ratio of prompt tokens to total tokens (overhead indicator).
For multi-turn runs the actual overhead is higher because the system prompt is resent on every turn.
from_run_result
classmethod
¶
Build metrics from an agent run result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result
|
AgentRunResult
|
The |
required |
Returns:
| Type | Description |
|---|---|
TaskCompletionMetrics
|
New |
TaskCompletionMetrics
|
the result's execution context and metadata. |
Source code in src/synthorg/engine/metrics.py
Errors¶
errors
¶
Engine-layer error hierarchy.
EngineError
¶
Bases: DomainError
Base exception for all engine-layer errors.
Inherits from :class:DomainError so the prefix-vs-category
validator runs on every subclass; a typo in a subclass
error_code whose first digit no longer matches the declared
error_category is rejected at class-definition time.
Class Attributes
status_code: Default HTTP status for API exposure (500). error_code: Default RFC 9457 error code. error_category: Default RFC 9457 error category. retryable: Whether the client should retry the request. default_message: Generic 5xx-safe message used by exception handlers.
Source code in src/synthorg/core/domain_errors.py
PromptBuildError
¶
Bases: EngineError
Raised when system prompt construction fails.
Source code in src/synthorg/core/domain_errors.py
ExecutionStateError
¶
Bases: EngineError
Raised when an execution state transition is invalid.
Source code in src/synthorg/core/domain_errors.py
MaxTurnsExceededError
¶
Bases: EngineError
Raised when turn_count reaches max_turns during execution.
Enforced by AgentContext.with_turn_completed when the hard turn
limit has been reached.
Source code in src/synthorg/core/domain_errors.py
LoopExecutionError
¶
Bases: EngineError
Non-recoverable execution loop error for the engine layer.
The execution loop returns TerminationReason.ERROR internally.
This exception is available for the engine layer above the loop to
convert that result into a raised error when appropriate.
Source code in src/synthorg/core/domain_errors.py
ParallelExecutionError
¶
Bases: EngineError
Raised when a parallel execution group encounters a fatal error.
Source code in src/synthorg/core/domain_errors.py
ResourceConflictError
¶
Bases: EngineError
Raised when resource claims conflict between assignments.
Source code in src/synthorg/core/domain_errors.py
DecompositionError
¶
Bases: EngineError
Base exception for task decomposition failures.
Source code in src/synthorg/core/domain_errors.py
DecompositionCycleError
¶
Bases: DecompositionError
Raised when a dependency cycle is detected in the subtask graph.
Source code in src/synthorg/core/domain_errors.py
DecompositionDepthError
¶
Bases: DecompositionError
Raised when decomposition exceeds the maximum nesting depth.
Source code in src/synthorg/core/domain_errors.py
TaskRoutingError
¶
Bases: EngineError
Raised when task routing to an agent fails.
Source code in src/synthorg/core/domain_errors.py
TaskAssignmentError
¶
Bases: EngineError
Raised when task assignment fails.
Source code in src/synthorg/core/domain_errors.py
NoEligibleAgentError
¶
Bases: TaskAssignmentError
Raised when no eligible agent is found for assignment.
Source code in src/synthorg/core/domain_errors.py
RecoveryConfigError
¶
Bases: EngineError
Configuration cannot satisfy the selected recovery strategy.
Typical cause: EngineRecoveryConfig.strategy == CHECKPOINT but
no :class:CheckpointRepository was wired through to the factory.
Source code in src/synthorg/core/domain_errors.py
ProjectNotFoundError
¶
Bases: EngineError
Referenced project does not exist.
The message is deliberately generic to avoid leaking internal
identifiers. The project_id attribute is available for
structured logs but must NOT be included in user-facing responses.
Attributes:
| Name | Type | Description |
|---|---|---|
project_id |
NotBlankStr
|
The project identifier that was not found. |
Source code in src/synthorg/engine/errors.py
ProjectAgentNotMemberError
¶
Bases: EngineError
Agent is not a member of the task's project team.
The message is deliberately generic to avoid leaking internal identifiers. Attributes are available for structured logs only.
Attributes:
| Name | Type | Description |
|---|---|---|
project_id |
NotBlankStr
|
The project the agent attempted to access. |
agent_id |
NotBlankStr
|
The agent that is not in the project team. |
Source code in src/synthorg/engine/errors.py
WorkspaceError
¶
Bases: EngineError
Base exception for workspace isolation failures.
Source code in src/synthorg/core/domain_errors.py
WorkspaceSetupError
¶
Bases: WorkspaceError
Raised when workspace creation fails.
Source code in src/synthorg/core/domain_errors.py
WorkspaceMergeError
¶
Bases: WorkspaceError
Raised when workspace merge fails.
Source code in src/synthorg/core/domain_errors.py
WorkspaceCleanupError
¶
Bases: WorkspaceError
Raised when workspace teardown fails.
Source code in src/synthorg/core/domain_errors.py
WorkspaceLimitError
¶
Bases: WorkspaceError
Raised when maximum concurrent workspaces reached.
Source code in src/synthorg/core/domain_errors.py
TaskEngineError
¶
Bases: EngineError
Base exception for all task engine errors.
Source code in src/synthorg/core/domain_errors.py
TaskEngineNotRunningError
¶
Bases: TaskEngineError
Raised when a mutation is submitted to a stopped task engine.
Source code in src/synthorg/core/domain_errors.py
TaskEngineQueueFullError
¶
Bases: TaskEngineError
Raised when the task engine queue is at capacity.
Source code in src/synthorg/core/domain_errors.py
TaskMutationError
¶
Bases: TaskEngineError
Raised when a task mutation fails (not found, validation, etc.).
Source code in src/synthorg/core/domain_errors.py
TaskNotFoundError
¶
Bases: TaskMutationError
Raised when a task is not found during mutation.
Source code in src/synthorg/core/domain_errors.py
TaskVersionConflictError
¶
Bases: TaskMutationError
Raised when optimistic concurrency version does not match.
Source code in src/synthorg/core/domain_errors.py
TaskInternalError
¶
Bases: TaskEngineError
Raised when a task mutation fails due to an internal engine error.
Sibling of :class:TaskMutationError, not a subtype, so a broad
except TaskMutationError handler does not accidentally catch
internal engine faults. Inherits the default 500 / ENGINE_ERROR /
INTERNAL metadata from :class:TaskEngineError.
Source code in src/synthorg/core/domain_errors.py
DelegationRoundLimitError
¶
Bases: EngineError
Hard abort when delegation rounds exceed 2x the soft cap.
Attributes:
| Name | Type | Description |
|---|---|---|
current_round |
int
|
The round number that triggered the abort. |
soft_limit |
int
|
The configured soft cap on delegation rounds. |
Source code in src/synthorg/engine/errors.py
CoordinationError
¶
Bases: EngineError
Base exception for multi-agent coordination failures.
Source code in src/synthorg/core/domain_errors.py
CoordinationPhaseError
¶
Bases: CoordinationError
Raised when a coordination pipeline phase fails.
Carries the failing phase name and all phase results accumulated up to and including the failure, enabling partial-result inspection.
Attributes:
| Name | Type | Description |
|---|---|---|
phase |
str
|
Name of the phase that failed. |
partial_phases |
tuple[CoordinationPhaseResult, ...]
|
Phase results accumulated before and including this failure. |
Source code in src/synthorg/engine/errors.py
RuntimeServicesBuildError
¶
Bases: EngineError
Raised when the boot/reinit runtime-services build fails.
Wraps the underlying failure from build_runtime_services (provider
registry, tool registry, agent engine, or coordinator factory) so the
boot hook and the /setup/complete controller see a typed domain
error instead of a raw exception. The original cause is preserved via
raise ... from exc.
Source code in src/synthorg/core/domain_errors.py
WorkflowExecutionError
¶
Bases: EngineError
Base exception for workflow execution failures.
Source code in src/synthorg/core/domain_errors.py
WorkflowDefinitionInvalidError
¶
Bases: WorkflowExecutionError
Raised when a workflow definition fails validation at activation time.
422 + REQUEST_VALIDATION_ERROR: a definition that fails activation-time
structural checks is a caller-side validation failure surfaced after the
request reached the engine, not an internal fault. Aligns with
:class:WorkflowDefinitionValidationError (the create/update path) so
every "invalid workflow definition" surface emits the same 422 envelope.
Source code in src/synthorg/core/domain_errors.py
WorkflowConditionEvalError
¶
Bases: WorkflowExecutionError
Raised when a condition expression cannot be evaluated.
422 + REQUEST_VALIDATION_ERROR: a condition expression that fails
evaluation is authored by the caller as part of the workflow definition,
so the failure is a request-shape problem rather than an engine fault.
Source code in src/synthorg/core/domain_errors.py
WorkflowExecutionNotFoundError
¶
Bases: WorkflowExecutionError
Raised when a workflow execution instance is not found.
Source code in src/synthorg/core/domain_errors.py
SubworkflowNotFoundError
¶
Bases: WorkflowExecutionError
Raised when a referenced subworkflow version cannot be resolved.
Attributes:
| Name | Type | Description |
|---|---|---|
subworkflow_id |
NotBlankStr
|
The subworkflow identifier. |
version |
NotBlankStr
|
The semver pin that failed to resolve. |
Source code in src/synthorg/engine/errors.py
SubworkflowCycleError
¶
Bases: WorkflowExecutionError
Raised when the subworkflow reference graph contains a cycle.
Attributes:
| Name | Type | Description |
|---|---|---|
cycle_path |
tuple[tuple[str, str], ...]
|
Ordered |
Source code in src/synthorg/engine/errors.py
SubworkflowDepthExceededError
¶
Bases: WorkflowExecutionError
Raised when runtime subworkflow nesting exceeds the configured limit.
Attributes:
| Name | Type | Description |
|---|---|---|
depth |
int
|
The depth at which the limit was exceeded. |
max_depth |
int
|
The configured maximum. |
Source code in src/synthorg/engine/errors.py
SubworkflowIOError
¶
Bases: WorkflowExecutionError
Raised when subworkflow input or output binding is invalid.
Covers missing required inputs, unknown inputs, unknown outputs, type mismatches, and invalid binding expressions. The 422 mapping treats binding mismatches as caller-side validation failures so the centralised RFC 9457 dispatch surfaces a structured envelope without controller-level translation.
Source code in src/synthorg/core/domain_errors.py
WorkflowTypeInvalidError
¶
Bases: WorkflowExecutionError
Raised when a request specifies an unknown workflow_type value.
Uses REQUEST_VALIDATION_ERROR (2001) and 400 to align with the
other request-shape failures in this module: the value did not parse
against the WorkflowType enum at the API boundary.
Source code in src/synthorg/core/domain_errors.py
WorkflowDefinitionValidationError
¶
Bases: WorkflowExecutionError
Raised when a workflow definition fails structural checks.
The default message is intentionally generic so Pydantic validation
detail does not leak to API clients; callers may still chain the
underlying exception with raise … from exc for the structured
log emitted by the centralised handler.
Source code in src/synthorg/core/domain_errors.py
WorkflowYamlExportError
¶
Bases: WorkflowExecutionError
Raised when YAML serialisation of a workflow definition fails.
Maps to 422 (Unprocessable Entity) on /workflows/{id}/export:
the request itself is well-formed, but the persisted definition
cannot be serialised to YAML -- a content-level failure rather
than a request-syntax problem.
Source code in src/synthorg/core/domain_errors.py
WorkflowExecutionAlreadyTerminalError
¶
Bases: VersionConflictError
Raised when cancel targets an execution already in a terminal status.
Distinct from :class:synthorg.core.domain_errors.VersionConflictError
(4002) so API clients can discriminate "the execution finished before
you cancelled" (no retry will succeed) from a row-level optimistic-
concurrency race where the caller can re-read and try again. Both
map to 409 + CONFLICT so the HTTP envelope shape is unchanged;
only the error_code differs.
Source code in src/synthorg/core/domain_errors.py
SelfReviewError
¶
Bases: EngineError
Raised when an agent attempts to review their own work.
Structurally prevents an agent from acting as reviewer on a task they executed, enforcing separation of duties at the approval gate.
The exception message is deliberately generic ("Self-review is not
permitted") to avoid leaking internal agent/task identifiers across
authorization boundaries when the message is surfaced via an HTTP
error response. The task_id and agent_id attributes are
available for structured logs but must NOT be passed to user-facing
error responses.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
NotBlankStr
|
The task identifier the self-review was attempted on. |
agent_id |
NotBlankStr
|
The agent identifier that is both executor and reviewer. |
Source code in src/synthorg/engine/errors.py
Task Decomposition¶
protocol
¶
Decomposition strategy protocol.
DecompositionStrategy
¶
Bases: Protocol
Protocol for task decomposition strategies.
Implementations produce a DecompositionPlan from a parent task
and a decomposition context. The plan describes subtask definitions
and their dependency relationships.
decompose
async
¶
Decompose a task into subtasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
The parent task to decompose. |
required |
context
|
DecompositionContext
|
Decomposition constraints (max subtasks, depth). |
required |
Returns:
| Type | Description |
|---|---|
DecompositionPlan
|
A decomposition plan with subtask definitions. |
Source code in src/synthorg/engine/decomposition/protocol.py
models
¶
Decomposition domain models.
Frozen Pydantic models for subtask definitions, decomposition plans, results, status rollups, and decomposition context.
SubtaskDefinition
pydantic-model
¶
Bases: BaseModel
Definition of a single subtask within a decomposition plan.
Attributes:
| Name | Type | Description |
|---|---|---|
id |
NotBlankStr
|
Unique subtask identifier (within this decomposition). |
title |
NotBlankStr
|
Short subtask title. |
description |
NotBlankStr
|
Detailed subtask description. |
dependencies |
tuple[NotBlankStr, ...]
|
IDs of other subtasks this one depends on. |
estimated_complexity |
Complexity
|
Complexity estimate for routing. |
required_skills |
tuple[NotBlankStr, ...]
|
Skill IDs needed for routing. |
required_tags |
tuple[NotBlankStr, ...]
|
Tags needed for multi-faceted routing match. When set, the routing scorer awards a small bonus to agents whose matched-skill tags cover every required tag. Empty tuple disables the tag-match tier. |
required_role |
NotBlankStr | None
|
Optional role name for routing. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
id(NotBlankStr) -
title(NotBlankStr) -
description(NotBlankStr) -
dependencies(tuple[NotBlankStr, ...]) -
estimated_complexity(Complexity) -
required_skills(tuple[NotBlankStr, ...]) -
required_tags(tuple[NotBlankStr, ...]) -
required_role(NotBlankStr | None)
Validators:
-
_validate_no_self_dependency
DecompositionPlan
pydantic-model
¶
Bases: BaseModel
Plan describing how a parent task is decomposed into subtasks.
Validates subtask collection integrity at construction:
non-empty, unique IDs, valid dependency references.
Cycle detection is handled by DependencyGraph.validate()
in the service layer.
Attributes:
| Name | Type | Description |
|---|---|---|
parent_task_id |
NotBlankStr
|
ID of the task being decomposed. |
subtasks |
tuple[SubtaskDefinition, ...]
|
Ordered subtask definitions. |
task_structure |
TaskStructure
|
Classified structure of the subtask graph. |
coordination_topology |
CoordinationTopology
|
Selected coordination topology. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
parent_task_id(NotBlankStr) -
subtasks(tuple[SubtaskDefinition, ...]) -
task_structure(TaskStructure) -
coordination_topology(CoordinationTopology)
Validators:
-
_validate_subtasks
DecompositionResult
pydantic-model
¶
Bases: BaseModel
Result of a complete task decomposition.
Attributes:
| Name | Type | Description |
|---|---|---|
plan |
DecompositionPlan
|
The decomposition plan that was executed. |
created_tasks |
tuple[Task, ...]
|
Task objects created from subtask definitions. |
dependency_edges |
tuple[tuple[NotBlankStr, NotBlankStr], ...]
|
Directed edges (from_id, to_id) in the DAG. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
plan(DecompositionPlan) -
created_tasks(tuple[Task, ...]) -
dependency_edges(tuple[tuple[NotBlankStr, NotBlankStr], ...])
Validators:
-
_validate_plan_task_consistency
SubtaskStatusRollup
pydantic-model
¶
Bases: BaseModel
Aggregated status of subtasks for a parent task.
Tracks six explicit statuses: COMPLETED, FAILED, IN_PROGRESS,
BLOCKED, CANCELLED, and SUSPENDED. Other statuses (CREATED,
ASSIGNED, IN_REVIEW, INTERRUPTED) are not individually tracked;
the gap between the sum of tracked counts and total accounts
for these. The derived_parent_status treats any such remainder
as work still pending (IN_PROGRESS).
When all subtasks are in terminal states but with a mix of
completed and cancelled, derived_parent_status returns
CANCELLED (some work was abandoned).
Attributes:
| Name | Type | Description |
|---|---|---|
parent_task_id |
NotBlankStr
|
ID of the parent task. |
total |
int
|
Total number of subtasks. |
completed |
int
|
Count of COMPLETED subtasks. |
failed |
int
|
Count of FAILED subtasks. |
in_progress |
int
|
Count of IN_PROGRESS subtasks. |
blocked |
int
|
Count of BLOCKED subtasks. |
cancelled |
int
|
Count of CANCELLED subtasks. |
suspended |
int
|
Count of SUSPENDED subtasks. |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
parent_task_id(NotBlankStr) -
total(int) -
completed(int) -
failed(int) -
in_progress(int) -
blocked(int) -
cancelled(int) -
suspended(int)
Validators:
-
_validate_counts
derived_parent_status
property
¶
Derive the parent task status from subtask statuses.
DecompositionContext
pydantic-model
¶
Bases: BaseModel
Configuration context for a decomposition operation.
Attributes:
| Name | Type | Description |
|---|---|---|
max_subtasks |
int
|
Maximum number of subtasks allowed. |
max_depth |
int
|
Maximum nesting depth for recursive decomposition. |
current_depth |
int
|
Current nesting depth. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
max_subtasks(int) -
max_depth(int) -
current_depth(int)
service
¶
Decomposition service.
Orchestrates strategy, classifier, DAG validation, and task creation to decompose a parent task into executable subtasks.
DecompositionService
¶
Service orchestrating task decomposition.
Composes a decomposition strategy with a structure classifier, DAG validator, and task factory to produce executable subtasks.
Source code in src/synthorg/engine/decomposition/service.py
decompose_task
async
¶
Decompose a task into subtasks.
- Classify task structure (uses explicit if set, otherwise heuristic inference). Override the plan's structure with the classifier's result when they differ.
- Call strategy.decompose().
- Validate DAG via DependencyGraph.
- Create Task objects from SubtaskDefinitions.
- Return DecompositionResult.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
Task
|
The parent task to decompose. |
required |
context
|
DecompositionContext
|
Decomposition constraints. |
required |
Returns:
| Type | Description |
|---|---|
DecompositionResult
|
Decomposition result with created tasks and dependency edges. |
Source code in src/synthorg/engine/decomposition/service.py
rollup_status
¶
Compute status rollup for a parent task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parent_task_id
|
NotBlankStr
|
The parent task identifier. |
required |
subtask_statuses
|
tuple[TaskStatus, ...]
|
Statuses of all subtasks. |
required |
Returns:
| Type | Description |
|---|---|
SubtaskStatusRollup
|
Aggregated status rollup. |
Source code in src/synthorg/engine/decomposition/service.py
Task Routing¶
models
¶
Task routing domain models.
Frozen Pydantic models for routing candidates, decisions, results, and topology configuration.
RoutingCandidate
pydantic-model
¶
Bases: BaseModel
A candidate agent for a subtask with scoring details.
Attributes:
| Name | Type | Description |
|---|---|---|
agent_identity |
AgentIdentity
|
The candidate agent. |
score |
float
|
Match score between 0.0 and 1.0. |
matched_skills |
tuple[NotBlankStr, ...]
|
Skills that matched the subtask requirements. |
reason |
NotBlankStr
|
Human-readable explanation of the score. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
agent_identity(AgentIdentity) -
score(float) -
matched_skills(tuple[NotBlankStr, ...]) -
reason(NotBlankStr)
RoutingDecision
pydantic-model
¶
Bases: BaseModel
Routing decision for a single subtask.
Attributes:
| Name | Type | Description |
|---|---|---|
subtask_id |
NotBlankStr
|
ID of the subtask being routed. |
selected_candidate |
RoutingCandidate
|
The chosen agent candidate. |
alternatives |
tuple[RoutingCandidate, ...]
|
Other candidates considered. |
topology |
CoordinationTopology
|
Coordination topology for this subtask. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
subtask_id(NotBlankStr) -
selected_candidate(RoutingCandidate) -
alternatives(tuple[RoutingCandidate, ...]) -
topology(CoordinationTopology)
Validators:
-
_validate_selected_not_in_alternatives
RoutingResult
pydantic-model
¶
Bases: BaseModel
Result of routing all subtasks in a decomposition.
Attributes:
| Name | Type | Description |
|---|---|---|
parent_task_id |
NotBlankStr
|
ID of the parent task. |
decisions |
tuple[RoutingDecision, ...]
|
Routing decisions for routable subtasks. |
unroutable |
tuple[NotBlankStr, ...]
|
IDs of subtasks with no matching agent. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
parent_task_id(NotBlankStr) -
decisions(tuple[RoutingDecision, ...]) -
unroutable(tuple[NotBlankStr, ...])
Validators:
-
_validate_unique_subtask_ids
AutoTopologyConfig
pydantic-model
¶
Bases: BaseModel
Configuration for automatic topology selection.
Attributes:
| Name | Type | Description |
|---|---|---|
sequential_override |
CoordinationTopology
|
Topology for sequential structures. |
parallel_default |
CoordinationTopology
|
Topology for parallel structures. |
mixed_default |
CoordinationTopology
|
Topology for mixed structures. |
parallel_artifact_threshold |
int
|
Artifact count above which parallel tasks use decentralized topology. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
sequential_override(CoordinationTopology) -
parallel_default(CoordinationTopology) -
mixed_default(CoordinationTopology) -
parallel_artifact_threshold(int)
Validators:
-
_validate_no_auto_defaults
parallel_artifact_threshold
pydantic-field
¶
Artifact count threshold for decentralized topology
service
¶
Task routing service.
Routes decomposed subtasks to appropriate agents based on scoring, then selects coordination topology.
TaskRoutingService
¶
Routes subtasks to agents based on skill matching.
For each subtask in a decomposition result, scores all available agents and selects the best match. Subtasks with no viable candidate are reported as unroutable.
Source code in src/synthorg/engine/routing/service.py
route
¶
Route all subtasks to appropriate agents.
For each subtask: 1. Score all available agents. 2. Select the best candidate (highest score >= min_score). 3. Select topology from parent task override or plan structure. 4. Report unroutable subtasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
decomposition_result
|
DecompositionResult
|
The decomposition to route. |
required |
available_agents
|
tuple[AgentIdentity, ...]
|
Pool of agents to consider. |
required |
parent_task
|
Task
|
The parent task (for topology selection). |
required |
Returns:
| Type | Description |
|---|---|
RoutingResult
|
Routing result with decisions and unroutable subtask IDs. |
Source code in src/synthorg/engine/routing/service.py
Task Assignment¶
protocol
¶
Task assignment strategy protocol.
Defines the pluggable interface for assignment strategies.
TaskAssignmentStrategy
¶
Bases: Protocol
Protocol for task assignment strategies.
Implementations must be synchronous (pure computation, no I/O)
and return an AssignmentResult with the selected agent and
ranked alternatives. TaskAssignmentService calls assign()
synchronously -- async implementations will NOT work correctly.
Error signaling contract:
ManualAssignmentStrategyraisesNoEligibleAgentErrorwhen the designated agent is not found or not ACTIVE, andTaskAssignmentErrorwhentask.assigned_toisNone.- Scoring-based strategies (the
ScoringBasedAssignmentStrategycompositions for role-based, load-balanced, cost-optimized, hierarchical, and auction) returnAssignmentResult(selected=None, ...)when no agent meets the minimum score threshold.
TaskAssignmentService propagates both patterns: it re-raises
TaskAssignmentError (including its subclass
NoEligibleAgentError) and logs a warning when
result.selected is None, returning the result to the
caller for handling.
assign
¶
Assign a task to an agent based on the strategy's algorithm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
AssignmentRequest
|
The assignment request with task and agent pool. |
required |
Returns:
| Type | Description |
|---|---|
AssignmentResult
|
Assignment result with selected agent and alternatives. |
AssignmentResult
|
|
AssignmentResult
|
found (scoring strategies) -- callers must check this. |
Raises:
| Type | Description |
|---|---|
TaskAssignmentError
|
When preconditions are violated
(e.g. missing |
NoEligibleAgentError
|
When the designated agent cannot be found or is not ACTIVE (manual strategy only). |
Source code in src/synthorg/engine/assignment/protocol.py
models
¶
Task assignment domain models.
Frozen Pydantic models for assignment requests, results, agent workloads, and assignment candidates.
AgentWorkload
pydantic-model
¶
Bases: BaseModel
Snapshot of an agent's current workload.
Attributes:
| Name | Type | Description |
|---|---|---|
agent_id |
NotBlankStr
|
Unique agent identifier. |
active_task_count |
int
|
Number of tasks currently in progress. |
total_cost |
float
|
Total cost incurred by this agent in the configured currency. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
agent_id(NotBlankStr) -
active_task_count(int) -
total_cost(float)
AssignmentCandidate
pydantic-model
¶
Bases: BaseModel
A candidate agent for task assignment with scoring details.
Attributes:
| Name | Type | Description |
|---|---|---|
agent_identity |
AgentIdentity
|
The candidate agent. |
score |
float
|
Match score between 0.0 and 1.0. |
matched_skills |
tuple[NotBlankStr, ...]
|
Skills that matched the assignment requirements. |
reason |
NotBlankStr
|
Human-readable explanation of the score. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
agent_identity(AgentIdentity) -
score(float) -
matched_skills(tuple[NotBlankStr, ...]) -
reason(NotBlankStr)
AssignmentRequest
pydantic-model
¶
Bases: BaseModel
Request for task assignment to an agent.
The required_skills and required_role fields live here
(not on Task) so that scoring strategies can evaluate agent-task
fit without modifying the Task model.
Attributes:
| Name | Type | Description |
|---|---|---|
task |
Task
|
The task to assign. |
available_agents |
tuple[AgentIdentity, ...]
|
Pool of agents to consider (must be non-empty, unique by agent id). |
workloads |
tuple[AgentWorkload, ...]
|
Current workload snapshots per agent (unique by agent_id). |
min_score |
float
|
Minimum score threshold for eligibility. |
required_skills |
tuple[NotBlankStr, ...]
|
Skill names needed for scoring. |
required_role |
NotBlankStr | None
|
Optional role name for scoring. |
max_concurrent_tasks |
int | None
|
Maximum concurrent tasks per agent.
Agents at or above this limit are excluded from scoring.
|
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
task(Task) -
available_agents(tuple[AgentIdentity, ...]) -
workloads(tuple[AgentWorkload, ...]) -
min_score(float) -
required_skills(tuple[NotBlankStr, ...]) -
required_role(NotBlankStr | None) -
max_concurrent_tasks(int | None) -
project_team(tuple[NotBlankStr, ...])
Validators:
-
_validate_collections
AssignmentResult
pydantic-model
¶
Bases: BaseModel
Result of a task assignment operation.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
NotBlankStr
|
ID of the task that was assigned. |
strategy_used |
NotBlankStr
|
Name of the strategy that produced this result. |
selected |
AssignmentCandidate | None
|
The selected candidate (None if no viable agent). |
alternatives |
tuple[AssignmentCandidate, ...]
|
Other candidates considered, ranked by score. |
reason |
NotBlankStr
|
Human-readable explanation of the assignment decision. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
task_id(NotBlankStr) -
strategy_used(NotBlankStr) -
selected(AssignmentCandidate | None) -
alternatives(tuple[AssignmentCandidate, ...]) -
reason(NotBlankStr)
Validators:
-
_validate_selected_not_in_alternatives
service
¶
Task assignment service.
Orchestrates task assignment by delegating to a pluggable
TaskAssignmentStrategy with logging and validation.
TaskAssignmentService
¶
Orchestrates task assignment via a pluggable strategy.
Validates task status before delegating to the strategy. Does NOT mutate the task -- callers are responsible for any subsequent status transitions.
Source code in src/synthorg/engine/assignment/service.py
assign
¶
Assign a task to an agent using the configured strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
AssignmentRequest
|
The assignment request. |
required |
Returns:
| Type | Description |
|---|---|
AssignmentResult
|
Assignment result from the strategy. |
Raises:
| Type | Description |
|---|---|
TaskAssignmentError
|
If the task status is not eligible for assignment. |
Source code in src/synthorg/engine/assignment/service.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | |
Error Classification¶
models
¶
Classification result models for the error taxonomy pipeline.
Defines severity levels, individual error findings, and aggregated classification results produced by the detection pipeline.
ErrorSeverity
¶
Bases: StrEnum
Severity level for a detected coordination error.
ErrorFinding
pydantic-model
¶
Bases: BaseModel
A single coordination error detected during classification.
Attributes:
| Name | Type | Description |
|---|---|---|
category |
ErrorCategory
|
The error category from the taxonomy. |
severity |
ErrorSeverity
|
Severity level of the finding. |
description |
NotBlankStr
|
Human-readable description of the error. |
evidence |
tuple[NotBlankStr, ...]
|
Supporting evidence extracted from the conversation. |
turn_range |
tuple[int, int] | None
|
(start, end) 0-based index range where the error
was observed, or |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
category(ErrorCategory) -
severity(ErrorSeverity) -
description(NotBlankStr) -
evidence(tuple[NotBlankStr, ...]) -
turn_range(tuple[int, int] | None)
Validators:
-
_validate_turn_range
turn_range
pydantic-field
¶
0-based index range (start, end) where error was observed. For conversation-based detectors this is the message index in the conversation tuple; for turn-based detectors this is the index into the turns tuple.
ClassificationResult
pydantic-model
¶
Bases: BaseModel
Aggregated result from the error classification pipeline.
Attributes:
| Name | Type | Description |
|---|---|---|
execution_id |
NotBlankStr
|
Unique identifier for the execution run. |
agent_id |
NotBlankStr
|
Agent that was executing. |
task_id |
NotBlankStr
|
Task being executed. |
categories_checked |
tuple[ErrorCategory, ...]
|
Which error categories were checked. |
findings |
tuple[ErrorFinding, ...]
|
All detected error findings. |
classified_at |
AwareDatetime
|
Timestamp when classification completed. |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
execution_id(NotBlankStr) -
agent_id(NotBlankStr) -
task_id(NotBlankStr) -
categories_checked(tuple[ErrorCategory, ...]) -
findings(tuple[ErrorFinding, ...]) -
classified_at(AwareDatetime)
Validators:
-
_validate_findings_match_categories
pipeline
¶
Error classification pipeline.
Orchestrates the detection of coordination errors from an execution
result using the configured error taxonomy. Detectors are discovered
dynamically from the ErrorTaxonomyConfig.detectors dict and
dispatched via the Detector protocol. The pipeline never raises
exceptions -- all errors are caught and logged.
classify_execution_errors
async
¶
classify_execution_errors(
execution_result,
agent_id,
task_id,
*,
config,
task_repo=None,
provider=None,
sinks=(),
)
Classify coordination errors from an execution result.
Discovers detectors from config.detectors, loads
scope-appropriate context, runs detectors sequentially
(concurrency happens inside CompositeDetector), and
dispatches results to registered sinks.
Rate limiting is handled by the BaseCompletionProvider
internally -- semantic detectors no longer accept a separate
rate limiter to avoid double-throttling.
Returns None when the taxonomy is disabled. Never raises;
all exceptions except MemoryError/RecursionError are
caught and logged as CLASSIFICATION_ERROR.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
execution_result
|
ExecutionResult
|
The completed execution result to analyse. |
required |
agent_id
|
NotBlankStr
|
Agent that executed the task. |
required |
task_id
|
NotBlankStr
|
Task that was executed. |
required |
config
|
ErrorTaxonomyConfig
|
Error taxonomy configuration. |
required |
task_repo
|
TaskRepository | None
|
Optional task repository for TASK_TREE scope. |
None
|
provider
|
BaseCompletionProvider | None
|
Optional LLM provider for semantic detectors. |
None
|
sinks
|
tuple[ClassificationSink, ...]
|
Downstream consumers to notify after classification. |
()
|
Returns:
| Type | Description |
|---|---|
ClassificationResult | None
|
Classification result with findings, or |
Source code in src/synthorg/engine/classification/pipeline.py
Workspace Isolation¶
protocol
¶
Workspace isolation strategy protocol.
WorkspaceIsolationStrategy
¶
Bases: Protocol
Protocol for workspace isolation strategies.
Implementations provide the ability to create, merge, and tear down isolated workspaces for concurrent agent execution.
setup_workspace
async
¶
Create an isolated workspace for an agent task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
WorkspaceRequest
|
Workspace creation request. |
required |
Returns:
| Type | Description |
|---|---|
Workspace
|
The created workspace. |
Raises:
| Type | Description |
|---|---|
WorkspaceLimitError
|
When max concurrent worktrees reached. |
WorkspaceSetupError
|
When git operations fail. |
Source code in src/synthorg/engine/workspace/protocol.py
teardown_workspace
async
¶
Remove an isolated workspace and clean up resources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workspace
|
Workspace
|
The workspace to tear down. |
required |
Raises:
| Type | Description |
|---|---|
WorkspaceCleanupError
|
When git cleanup operations fail. |
Source code in src/synthorg/engine/workspace/protocol.py
merge_workspace
async
¶
Merge a workspace branch back into the base branch.
Merge conflicts are returned as a MergeResult with
success=False rather than raised as exceptions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workspace
|
Workspace
|
The workspace to merge. |
required |
Returns:
| Type | Description |
|---|---|
MergeResult
|
The merge result with conflict details if any. |
Raises:
| Type | Description |
|---|---|
WorkspaceMergeError
|
When checkout or merge abort fails. |
Source code in src/synthorg/engine/workspace/protocol.py
list_active_workspaces
async
¶
Return all currently active workspaces.
Returns:
| Type | Description |
|---|---|
tuple[Workspace, ...]
|
Tuple of active workspaces. |
get_strategy_type
¶
models
¶
Workspace isolation domain models.
WorkspaceRequest
pydantic-model
¶
Bases: BaseModel
Request to create an isolated workspace for an agent task.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
NotBlankStr
|
Identifier of the task requiring isolation. |
agent_id |
NotBlankStr
|
Identifier of the agent that will work in the workspace. |
base_branch |
NotBlankStr
|
Git branch to branch from. |
file_scope |
tuple[NotBlankStr, ...]
|
Optional file path hints for the workspace. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
task_id(NotBlankStr) -
agent_id(NotBlankStr) -
base_branch(NotBlankStr) -
file_scope(tuple[NotBlankStr, ...])
Workspace
pydantic-model
¶
Bases: BaseModel
An active isolated workspace backed by a git worktree.
Attributes:
| Name | Type | Description |
|---|---|---|
workspace_id |
NotBlankStr
|
Unique identifier for this workspace. |
task_id |
NotBlankStr
|
Task this workspace serves. |
agent_id |
NotBlankStr
|
Agent operating in this workspace. |
branch_name |
NotBlankStr
|
Git branch created for this workspace. |
worktree_path |
NotBlankStr
|
Filesystem path to the worktree directory. |
base_branch |
NotBlankStr
|
Branch this workspace was created from. |
created_at |
datetime
|
Timestamp of workspace creation. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
workspace_id(NotBlankStr) -
task_id(NotBlankStr) -
agent_id(NotBlankStr) -
branch_name(NotBlankStr) -
worktree_path(NotBlankStr) -
base_branch(NotBlankStr) -
created_at(datetime)
MergeConflict
pydantic-model
¶
Bases: BaseModel
A single merge conflict detected during workspace merge.
Attributes:
| Name | Type | Description |
|---|---|---|
file_path |
NotBlankStr
|
Path of the conflicting file. |
conflict_type |
ConflictType
|
Type of conflict (e.g. textual, semantic). |
ours_content |
str
|
Content from the base branch side. |
theirs_content |
str
|
Content from the workspace branch side. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
file_path(NotBlankStr) -
conflict_type(ConflictType) -
ours_content(str) -
theirs_content(str) -
description(str)
Validators:
-
_validate_semantic_description
MergeResult
pydantic-model
¶
Bases: BaseModel
Result of merging a single workspace branch back.
Attributes:
| Name | Type | Description |
|---|---|---|
workspace_id |
NotBlankStr
|
Workspace that was merged. |
branch_name |
NotBlankStr
|
Branch that was merged. |
success |
bool
|
Whether the merge completed without conflicts. |
conflicts |
tuple[MergeConflict, ...]
|
Any textual conflicts encountered during merge. |
escalation |
ConflictEscalation | None
|
Escalation strategy applied, if any. |
merged_commit_sha |
NotBlankStr | None
|
SHA of the merge commit, if successful. |
duration_seconds |
float
|
Time taken for the merge operation. |
semantic_conflicts |
tuple[MergeConflict, ...]
|
Semantic conflicts detected after merge. |
Config:
frozen:Trueallow_inf_nan:Falseextra:forbid
Fields:
-
workspace_id(NotBlankStr) -
branch_name(NotBlankStr) -
success(bool) -
conflicts(tuple[MergeConflict, ...]) -
escalation(ConflictEscalation | None) -
merged_commit_sha(NotBlankStr | None) -
duration_seconds(float) -
semantic_conflicts(tuple[MergeConflict, ...])
Validators:
-
_validate_success_consistency
semantic_conflicts
pydantic-field
¶
Semantic conflicts detected after successful merge
WorkspaceGroupResult
pydantic-model
¶
Bases: BaseModel
Aggregated result of merging a group of workspaces.
Attributes:
| Name | Type | Description |
|---|---|---|
group_id |
NotBlankStr
|
Identifier for this merge group. |
merge_results |
tuple[MergeResult, ...]
|
Individual merge results for each workspace. |
duration_seconds |
float
|
Total time for the group merge operation. |
Config:
frozen:Trueallow_inf_nan:False
Fields:
-
group_id(NotBlankStr) -
merge_results(tuple[MergeResult, ...]) -
duration_seconds(float)
total_semantic_conflicts
property
¶
Sum of semantic conflicts from all merge results.
service
¶
Workspace isolation service.
High-level service that coordinates workspace lifecycle: setup, merge, and teardown for groups of agent workspaces.
WorkspaceIsolationService
¶
Service for managing workspace isolation lifecycle.
Coordinates creating, merging, and tearing down workspaces for groups of concurrent agent tasks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy
|
WorkspaceIsolationStrategy
|
Workspace isolation strategy implementation. |
required |
config
|
WorkspaceIsolationConfig
|
Workspace isolation configuration. |
required |
Source code in src/synthorg/engine/workspace/service.py
setup_group
async
¶
Create workspaces for a group of agent tasks.
Rolls back all already-created workspaces if any setup fails.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
requests
|
tuple[WorkspaceRequest, ...]
|
Workspace creation requests. |
required |
Returns:
| Type | Description |
|---|---|
tuple[Workspace, ...]
|
Tuple of created workspaces. |
Raises:
| Type | Description |
|---|---|
WorkspaceLimitError
|
When max concurrent worktrees reached. |
WorkspaceSetupError
|
When git operations fail. |
Source code in src/synthorg/engine/workspace/service.py
merge_group
async
¶
Merge all workspaces and return aggregated result.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workspaces
|
tuple[Workspace, ...]
|
Workspaces to merge. |
required |
Returns:
| Type | Description |
|---|---|
WorkspaceGroupResult
|
Aggregated merge result for the group. |
Raises:
| Type | Description |
|---|---|
WorkspaceMergeError
|
When a merge operation fails fatally. |
Source code in src/synthorg/engine/workspace/service.py
teardown_group
async
¶
Tear down all workspaces in a group.
Uses best-effort teardown: attempts all workspaces even if some fail, then raises a combined error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workspaces
|
tuple[Workspace, ...]
|
Workspaces to tear down. |
required |
Raises:
| Type | Description |
|---|---|
WorkspaceCleanupError
|
When any teardown operation fails. |