Skip to content

Engine

Agent orchestration, execution loops, task decomposition, routing, and parallel execution.

Agent Engine

agent_engine

Agent engine -- top-level orchestrator.

Ties together prompt construction, execution context, execution loop, tool invocation, and budget tracking into a single run() entry point.

PersonalityTrimNotifier

PersonalityTrimNotifier = Callable[[PersonalityTrimPayload], Awaitable[None]]

Async callback invoked when an agent's personality section is trimmed.

PersonalityTrimPayload

Bases: TypedDict

Structured payload forwarded to :data:PersonalityTrimNotifier callbacks.

AgentEngine

AgentEngine(
    *,
    provider,
    execution_loop=None,
    tool_registry=None,
    cost_tracker=None,
    recovery_strategy=_DEFAULT_RECOVERY_STRATEGY,
    shutdown_checker=None,
    error_taxonomy_config=None,
    budget_enforcer=None,
    security_config=None,
    approval_store=None,
    parked_context_repo=None,
    approval_gate=None,
    trust_service=None,
    mcp_self_consumer=None,
    task_engine=None,
    checkpoint_repo=None,
    heartbeat_repo=None,
    checkpoint_config=None,
    coordinator=None,
    stagnation_detector=None,
    auto_loop_config=None,
    hybrid_loop_config=None,
    compaction_callback=None,
    plan_execute_config=None,
    provider_registry=None,
    provider_configs=None,
    model_resolver=None,
    tool_invocation_tracker=None,
    memory_injection_strategy=None,
    ontology_injection_strategy=None,
    procedural_memory_config=None,
    memory_backend=None,
    distillation_capture_enabled=False,
    config_resolver=None,
    personality_trim_notifier=None,
    coordination_metrics_collector=None,
    audit_log=None,
    project_repo=None,
    agent_middleware_chain=None,
    event_reader=None,
    event_stream_hub=None,
    interrupt_store=None,
    approval_interrupt_timeout_seconds=None,
    clock=None,
)

Bases: AgentEngineContextMixin, AgentEngineErrorsMixin, AgentEngineFactoriesMixin, AgentEnginePostExecMixin, AgentEngineRecoveryMixin, AgentEngineResumeMixin

Top-level orchestrator for agent execution.

Source code in src/synthorg/engine/agent_engine.py
def __init__(  # noqa: PLR0913, PLR0915
    self,
    *,
    provider: CompletionProvider,
    execution_loop: ExecutionLoop | None = None,
    tool_registry: ToolRegistry | None = None,
    cost_tracker: CostTracker | None = None,
    recovery_strategy: RecoveryStrategy | None = _DEFAULT_RECOVERY_STRATEGY,
    shutdown_checker: ShutdownChecker | None = None,
    error_taxonomy_config: ErrorTaxonomyConfig | None = None,
    budget_enforcer: BudgetEnforcer | None = None,
    security_config: SecurityConfig | None = None,
    approval_store: ApprovalStoreProtocol | None = None,
    parked_context_repo: ParkedContextRepository | None = None,
    approval_gate: ApprovalGate | None = None,
    trust_service: TrustService | None = None,
    mcp_self_consumer: MCPSelfConsumerProvider | None = None,
    task_engine: TaskEngine | None = None,
    checkpoint_repo: CheckpointRepository | None = None,
    heartbeat_repo: HeartbeatRepository | None = None,
    checkpoint_config: CheckpointConfig | None = None,
    coordinator: MultiAgentCoordinator | None = None,
    stagnation_detector: StagnationDetector | None = None,
    auto_loop_config: AutoLoopConfig | None = None,
    hybrid_loop_config: HybridLoopConfig | None = None,
    compaction_callback: CompactionCallback | None = None,
    plan_execute_config: PlanExecuteConfig | None = None,
    provider_registry: ProviderRegistry | None = None,
    provider_configs: Mapping[str, ProviderConfig] | None = None,
    model_resolver: ModelResolver | None = None,
    tool_invocation_tracker: ToolInvocationTracker | None = None,
    memory_injection_strategy: MemoryInjectionStrategy | None = None,
    ontology_injection_strategy: OntologyInjectionStrategy | None = None,
    procedural_memory_config: ProceduralMemoryConfig | None = None,
    memory_backend: MemoryBackend | None = None,
    distillation_capture_enabled: bool = False,
    config_resolver: ConfigResolver | None = None,
    personality_trim_notifier: PersonalityTrimNotifier | None = None,
    coordination_metrics_collector: CoordinationMetricsCollector | None = None,
    audit_log: AuditLog | None = None,
    project_repo: ProjectRepository | None = None,
    agent_middleware_chain: AgentMiddlewareChain | None = None,
    event_reader: EventReader | None = None,
    event_stream_hub: EventStreamHub | None = None,
    interrupt_store: InterruptStore | None = None,
    approval_interrupt_timeout_seconds: float | None = None,
    clock: Clock | None = None,
) -> None:
    self._agent_middleware_chain = agent_middleware_chain
    self._event_reader = event_reader
    self._clock: Clock = clock if clock is not None else SystemClock()
    self._event_stream_hub = event_stream_hub
    self._interrupt_store = interrupt_store
    if execution_loop is not None and auto_loop_config is not None:
        msg = "execution_loop and auto_loop_config are mutually exclusive"
        logger.warning(
            EXECUTION_ENGINE_ERROR,
            reason=msg,
        )
        raise ValueError(msg)
    self._provider = provider
    self._provider_registry = provider_registry
    self._provider_configs = provider_configs
    self._model_resolver = model_resolver
    self._approval_store = approval_store
    self._parked_context_repo = parked_context_repo
    # The boot path constructs one ApprovalGate (backed by the
    # persistence ParkedContextRepository) and injects it so the
    # engine parks and the /approvals controller resumes on the
    # same gate. When absent (standalone / legacy callers) the
    # factory builds a gate from the engine's own collaborators.
    self._injected_approval_gate = approval_gate
    # Progressive trust: when wired, the tool-invoker factory
    # narrows an agent's effective tool access to the more
    # restrictive of its identity level and its earned trust
    # level. ``None`` (trust strategy DISABLED) is a no-op.
    self._trust_service = trust_service
    # Agent -> SynthOrg-MCP self-consumer: when wired, the
    # tool-invoker factory adds trust-scoped SynthOrg MCP tools to
    # the agent's registry. ``None`` (mode DISABLED) is a no-op.
    self._mcp_self_consumer = mcp_self_consumer
    self._approval_interrupt_timeout_seconds = approval_interrupt_timeout_seconds
    self._stagnation_detector = stagnation_detector
    self._auto_loop_config = auto_loop_config
    self._hybrid_loop_config = hybrid_loop_config
    self._compaction_callback = compaction_callback
    self._plan_execute_config = plan_execute_config
    self._approval_gate = self._make_approval_gate()
    if execution_loop is not None and (
        self._approval_gate is not None
        or self._stagnation_detector is not None
        or self._compaction_callback is not None
    ):
        logger.warning(
            APPROVAL_GATE_LOOP_WIRING_WARNING,
            note=(
                "execution_loop provided externally -- approval_gate, "
                "stagnation_detector, and compaction_callback will NOT "
                "be wired automatically. Configure the loop with "
                "approval_gate=, stagnation_detector=, and "
                "compaction_callback= explicitly."
            ),
        )
    self._loop: ExecutionLoop = execution_loop or self._make_default_loop()
    self._tool_registry = tool_registry
    self._budget_enforcer = budget_enforcer
    if (checkpoint_repo is None) != (heartbeat_repo is None):
        msg = (
            "checkpoint_repo and heartbeat_repo must both be "
            "provided or both omitted"
        )
        raise ValueError(msg)
    self._checkpoint_repo = checkpoint_repo
    self._heartbeat_repo = heartbeat_repo
    self._checkpoint_config = checkpoint_config or CheckpointConfig()
    self._cost_tracker: CostTracker | None
    if budget_enforcer is not None:
        if (
            cost_tracker is not None
            and cost_tracker is not budget_enforcer.cost_tracker
        ):
            msg = (
                "cost_tracker must match budget_enforcer.cost_tracker "
                "when budget_enforcer is provided"
            )
            raise ValueError(msg)
        self._cost_tracker = budget_enforcer.cost_tracker
    else:
        self._cost_tracker = cost_tracker
    self._security_config = security_config
    self._task_engine = task_engine
    self._recovery_strategy = recovery_strategy
    self._shutdown_checker = shutdown_checker
    self._error_taxonomy_config = error_taxonomy_config
    self._coordinator = coordinator
    self._tool_invocation_tracker = tool_invocation_tracker
    self._memory_injection_strategy = memory_injection_strategy
    self._ontology_injection_strategy = ontology_injection_strategy
    self._procedural_memory_config = procedural_memory_config
    self._memory_backend = memory_backend
    self._distillation_capture_enabled = distillation_capture_enabled
    self._config_resolver = config_resolver
    self._personality_trim_notifier = personality_trim_notifier
    self._coordination_metrics_collector = coordination_metrics_collector
    self._procedural_proposer: ProceduralMemoryProposer | None = None
    if (
        procedural_memory_config is not None
        and procedural_memory_config.enabled
        and memory_backend is not None
    ):
        from synthorg.memory.procedural.proposer import (  # noqa: PLC0415
            ProceduralMemoryProposer,
        )

        self._procedural_proposer = ProceduralMemoryProposer(
            provider=provider,
            config=procedural_memory_config,
        )
    self._audit_log = audit_log if audit_log is not None else AuditLog()
    self._project_repo = project_repo
    logger.debug(
        EXECUTION_ENGINE_CREATED,
        loop_type=(
            "auto"
            if self._auto_loop_config is not None
            else self._loop.get_loop_type()
        ),
        has_tool_registry=self._tool_registry is not None,
        has_cost_tracker=self._cost_tracker is not None,
        has_budget_enforcer=self._budget_enforcer is not None,
        has_coordinator=self._coordinator is not None,
        has_compaction_callback=self._compaction_callback is not None,
        has_plan_execute_config=self._plan_execute_config is not None,
        has_hybrid_loop_config=self._hybrid_loop_config is not None,
        has_personality_trim_notifier=self._personality_trim_notifier is not None,
    )

coordinator property

coordinator

Return the multi-agent coordinator, or None if not configured.

coordinate async

coordinate(context)

Delegate to the multi-agent coordinator.

Source code in src/synthorg/engine/agent_engine.py
async def coordinate(
    self,
    context: CoordinationContext,
) -> CoordinationResultWithAttribution:
    """Delegate to the multi-agent coordinator."""
    if self._coordinator is None:
        msg = "No coordinator configured for multi-agent dispatch"
        logger.warning(
            EXECUTION_ENGINE_ERROR,
            error=msg,
        )
        raise ExecutionStateError(msg)
    return await self._coordinator.coordinate(context)

run async

run(
    *,
    identity,
    task,
    completion_config=None,
    max_turns=DEFAULT_MAX_TURNS,
    memory_messages=(),
    timeout_seconds=None,
    effective_autonomy=None,
    resume_execution_id=None,
)

Execute an agent on a task.

Source code in src/synthorg/engine/agent_engine.py
async def run(  # noqa: PLR0913, C901
    self,
    *,
    identity: AgentIdentity,
    task: Task,
    completion_config: CompletionConfig | None = None,
    max_turns: int = DEFAULT_MAX_TURNS,
    memory_messages: tuple[ChatMessage, ...] = (),
    timeout_seconds: float | None = None,
    effective_autonomy: EffectiveAutonomy | None = None,
    resume_execution_id: str | None = None,
) -> AgentRunResult:
    """Execute an agent on a task."""
    agent_id = str(identity.id)
    task_id = task.id

    validate_run_inputs(
        agent_id=agent_id,
        task_id=task_id,
        max_turns=max_turns,
        timeout_seconds=timeout_seconds,
    )
    validate_agent(identity, agent_id)
    validate_task(task, agent_id, task_id)
    validate_task_metadata(task, agent_id, task_id)

    with correlation_scope(agent_id=agent_id, task_id=task_id):
        start = self._clock.monotonic()
        ctx: AgentContext | None = None
        system_prompt: SystemPrompt | None = None
        provider: CompletionProvider = self._provider
        _project_budget: float = 0.0
        try:
            loop_mode = (
                "auto"
                if self._auto_loop_config is not None
                else self._loop.get_loop_type()
            )
            logger.info(
                EXECUTION_ENGINE_START,
                agent_id=agent_id,
                task_id=task_id,
                loop_type=loop_mode,
                max_turns=max_turns,
            )

            if self._budget_enforcer:
                preflight = await self._budget_enforcer.check_can_execute(
                    agent_id,
                    provider_name=identity.model.provider,
                )
                provider, identity = self._apply_degradation(
                    preflight,
                    identity,
                    provider,
                )
                identity = await self._budget_enforcer.resolve_model(
                    identity,
                )

            if self._project_repo is not None:
                _project_budget = await self._validate_project(
                    task=task,
                    agent_id=agent_id,
                    task_id=task_id,
                )
            elif task.project:
                logger.warning(
                    EXECUTION_PROJECT_VALIDATION_FAILED,
                    agent_id=agent_id,
                    task_id=task_id,
                    project_id=task.project,
                    reason="project_repo_not_configured",
                )

            replay_ctx: AgentContext | None = None
            if resume_execution_id is not None and self._event_reader is not None:
                from synthorg.engine.session import Session  # noqa: PLC0415

                replay_result = await Session.replay(
                    execution_id=resume_execution_id,
                    event_reader=self._event_reader,
                    identity=identity,
                    task=task,
                    max_turns=max_turns,
                )
                if (
                    replay_result.replay_completeness
                    < _REPLAY_LOW_COMPLETENESS_THRESHOLD
                ):
                    logger.warning(
                        SESSION_REPLAY_LOW_COMPLETENESS,
                        execution_id=resume_execution_id,
                        replay_completeness=replay_result.replay_completeness,
                    )
                replay_ctx = replay_result.context

            tool_invoker = self._make_tool_invoker(
                identity,
                task_id=task_id,
                effective_autonomy=effective_autonomy,
            )
            ctx, system_prompt = await self._prepare_context(
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                max_turns=max_turns,
                memory_messages=memory_messages,
                tool_invoker=tool_invoker,
                effective_autonomy=effective_autonomy,
            )
            if replay_ctx is not None:
                ctx = ctx.model_copy(
                    update={
                        "execution_id": replay_ctx.execution_id,
                        "started_at": replay_ctx.started_at,
                        "conversation": (
                            *ctx.conversation,
                            *replay_ctx.conversation,
                        ),
                        "accumulated_cost": replay_ctx.accumulated_cost,
                        "turn_count": replay_ctx.turn_count,
                        "task_execution": (
                            replay_ctx.task_execution or ctx.task_execution
                        ),
                    },
                )
            return await self._execute(
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                completion_config=completion_config,
                ctx=ctx,
                system_prompt=system_prompt,
                start=start,
                timeout_seconds=timeout_seconds,
                tool_invoker=tool_invoker,
                effective_autonomy=effective_autonomy,
                provider=provider,
                project_budget=_project_budget,
            )
        except MemoryError, RecursionError:
            logger.exception(
                EXECUTION_ENGINE_ERROR,
                agent_id=agent_id,
                task_id=task_id,
                error="non-recoverable error in run()",
            )
            raise
        except ProjectNotFoundError, ProjectAgentNotMemberError:
            raise
        except BudgetExhaustedError as exc:
            return self._handle_budget_error(
                exc=exc,
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                duration_seconds=self._clock.monotonic() - start,
                ctx=ctx,
                system_prompt=system_prompt,
            )
        except Exception as exc:
            return await self._handle_fatal_error(
                exc=exc,
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                duration_seconds=self._clock.monotonic() - start,
                ctx=ctx,
                system_prompt=system_prompt,
                completion_config=completion_config,
                effective_autonomy=effective_autonomy,
                provider=provider,
            )

Execution Loop Protocol

loop_protocol

Execution loop protocol and supporting models.

Defines the ExecutionLoop protocol that the agent engine calls to run a task, along with ExecutionResult, TurnRecord, TerminationReason, and the BudgetChecker and ShutdownChecker type aliases.

BudgetChecker module-attribute

BudgetChecker = Callable[[AgentContext], bool]

Callback that returns True when the budget is exhausted.

ShutdownChecker module-attribute

ShutdownChecker = Callable[[], bool]

Callback that returns True when a graceful shutdown has been requested.

NodeType

Bases: StrEnum

Type of computation node executed within a turn.

Used for structural credit assignment and post-hoc trace analysis. Each turn records which node types executed, enabling fine-grained attribution of costs and failures.

BehaviorTag

Bases: StrEnum

Behavior category for trace capture and eval routing.

Starting taxonomy derived from agent evaluation patterns. Extend as usage patterns reveal category fragmentation or generalization.

TerminationReason

Bases: StrEnum

Why the execution loop terminated.

TurnRecord pydantic-model

Bases: BaseModel

Per-turn metadata recorded during execution.

Attributes:

Name Type Description
turn_number int

1-indexed turn number.

input_tokens int

Input tokens consumed this turn.

output_tokens int

Output tokens generated this turn.

total_tokens int

Sum of input and output tokens (computed).

cost float

Cost in the configured currency for this turn.

tool_calls_made tuple[NotBlankStr, ...]

Names of tools invoked this turn.

tool_call_fingerprints tuple[NotBlankStr, ...]

Deterministic fingerprints of tool calls (name:args_hash) for stagnation detection.

finish_reason FinishReason

LLM finish reason for this turn.

call_category LLMCallCategory | None

Optional LLM call category for coordination metrics (productive, coordination, system).

latency_ms float | None

Round-trip latency in milliseconds (None if not measured).

cache_hit bool | None

Whether the provider served this turn from cache.

retry_count int | None

Number of retry attempts before success.

retry_reason NotBlankStr | None

Exception type name of the last retried error.

node_types tuple[NodeType, ...]

Node types that executed in this turn (e.g. LLM_CALL, TOOL_INVOCATION). Defaults to empty for deserialization of legacy data.

behavior_tags tuple[BehaviorTag, ...]

Behavior categories inferred by BehaviorTaggerMiddleware.

efficiency_delta EfficiencyRatios | None

Efficiency ratios against an ideal baseline.

prior_tool_call_count int

Cumulative tool calls before this turn (for PTE).

tool_response_tokens int

Tokens from tool responses this turn (for PTE).

semantic_drift_score float | None

Similarity score (0.0--1.0) from SemanticDriftDetector, or None if not measured.

success bool

Whether this turn completed without error or content filter (computed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_retry_consistency

turn_number pydantic-field

turn_number

1-indexed turn number

input_tokens pydantic-field

input_tokens

Input tokens this turn

output_tokens pydantic-field

output_tokens

Output tokens this turn

cost pydantic-field

cost

Cost in the configured currency this turn

tool_calls_made pydantic-field

tool_calls_made = ()

Tool names invoked this turn

tool_call_fingerprints pydantic-field

tool_call_fingerprints = ()

Deterministic fingerprints of tool calls (name:args_hash)

finish_reason pydantic-field

finish_reason

LLM finish reason this turn

call_category pydantic-field

call_category = None

LLM call category (productive, coordination, system)

latency_ms pydantic-field

latency_ms = None

Round-trip latency in milliseconds from provider base class

cache_hit pydantic-field

cache_hit = None

Whether the provider served this turn from cache

retry_count pydantic-field

retry_count = None

Number of retry attempts before success

retry_reason pydantic-field

retry_reason = None

Exception type name of the last retried error

node_types pydantic-field

node_types = ()

Node types that executed in this turn

behavior_tags pydantic-field

behavior_tags = ()

Behavior categories inferred by BehaviorTaggerMiddleware

efficiency_delta pydantic-field

efficiency_delta = None

Efficiency ratios against an ideal baseline

prior_tool_call_count pydantic-field

prior_tool_call_count = 0

Cumulative tool calls before this turn (for PTE)

tool_response_tokens pydantic-field

tool_response_tokens = 0

Tokens from tool responses this turn (for PTE)

semantic_drift_score pydantic-field

semantic_drift_score = None

Semantic drift similarity score (from SemanticDriftDetector)

total_tokens property

total_tokens

Sum of input and output tokens.

success property

success

True unless finish_reason is ERROR or CONTENT_FILTER.

ExecutionResult pydantic-model

ExecutionResult(**data)

Bases: BaseModel

Result returned by an execution loop.

Attributes:

Name Type Description
context AgentContext

Final agent context after execution.

termination_reason TerminationReason

Why the loop stopped.

turns tuple[TurnRecord, ...]

Per-turn metadata records.

total_tool_calls int

Total tool calls across all turns (computed).

error_message str | None

Error description when termination_reason is ERROR.

metadata dict[str, object]

Forward-compatible dict for future loop types. Note: frozen=True prevents field reassignment but not in-place mutation of the dict contents; deep-copy at system boundaries per project conventions.

Deep-copy metadata dict at construction boundary.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_error_message
Source code in src/synthorg/engine/loop_protocol.py
def __init__(self, **data: object) -> None:
    """Deep-copy metadata dict at construction boundary."""
    if "metadata" in data and isinstance(data["metadata"], dict):
        data["metadata"] = copy.deepcopy(data["metadata"])
    super().__init__(**data)

context pydantic-field

context

Final agent context

termination_reason pydantic-field

termination_reason

Why the loop stopped

turns pydantic-field

turns = ()

Per-turn metadata

error_message pydantic-field

error_message = None

Error description (when reason is ERROR)

metadata pydantic-field

metadata

Forward-compatible metadata for future loop types

total_tool_calls property

total_tool_calls

Sum of tool calls from all turn records.

ExecutionLoop

Bases: Protocol

Protocol for agent execution loops.

The agent engine calls execute to run a task through the loop. Implementations decide the control flow (ReAct, Plan-and-Execute, etc.) but all return an ExecutionResult with a TerminationReason.

execute async

execute(
    *,
    context,
    provider,
    tool_invoker=None,
    budget_checker=None,
    shutdown_checker=None,
    completion_config=None,
)

Run the execution loop.

Parameters:

Name Type Description Default
context AgentContext

Initial agent context with conversation and identity.

required
provider CompletionProvider

LLM completion provider.

required
tool_invoker ToolInvokerProtocol | None

Optional tool invoker for tool execution.

None
budget_checker BudgetChecker | None

Optional callback; returns True when budget is exhausted.

None
shutdown_checker ShutdownChecker | None

Optional callback; returns True when a graceful shutdown has been requested.

None
completion_config CompletionConfig | None

Optional per-execution override for temperature/max_tokens (defaults to identity's model config).

None

Returns:

Type Description
ExecutionResult

Execution result with final context and termination reason.

Source code in src/synthorg/engine/loop_protocol.py
async def execute(  # noqa: PLR0913
    self,
    *,
    context: AgentContext,
    provider: CompletionProvider,
    tool_invoker: ToolInvokerProtocol | None = None,
    budget_checker: BudgetChecker | None = None,
    shutdown_checker: ShutdownChecker | None = None,
    completion_config: CompletionConfig | None = None,
) -> ExecutionResult:
    """Run the execution loop.

    Args:
        context: Initial agent context with conversation and identity.
        provider: LLM completion provider.
        tool_invoker: Optional tool invoker for tool execution.
        budget_checker: Optional callback; returns ``True`` when
            budget is exhausted.
        shutdown_checker: Optional callback; returns ``True`` when
            a graceful shutdown has been requested.
        completion_config: Optional per-execution override for
            temperature/max_tokens (defaults to identity's model config).

    Returns:
        Execution result with final context and termination reason.
    """
    ...

get_loop_type

get_loop_type()

Return the loop type identifier (e.g. "react").

Source code in src/synthorg/engine/loop_protocol.py
def get_loop_type(self) -> str:
    """Return the loop type identifier (e.g. ``"react"``)."""
    ...

make_budget_checker

make_budget_checker(task)

Create a budget checker if the task has a positive budget limit.

The returned callable returns True when accumulated cost meets or exceeds the limit (budget exhausted), False otherwise. Returns None when there is no positive budget limit.

Source code in src/synthorg/engine/loop_protocol.py
def make_budget_checker(task: Task) -> BudgetChecker | None:
    """Create a budget checker if the task has a positive budget limit.

    The returned callable returns ``True`` when accumulated cost meets
    or exceeds the limit (budget exhausted), ``False`` otherwise.
    Returns ``None`` when there is no positive budget limit.
    """
    if task.budget_limit <= 0:
        return None

    limit = task.budget_limit

    def _check(ctx: AgentContext) -> bool:
        return ctx.accumulated_cost.cost >= limit

    return _check

ReAct Loop

react_loop

ReAct execution loop -- think, act, observe.

Implements the ExecutionLoop protocol using the ReAct pattern: check shutdown -> check budget -> call LLM -> record turn -> check for LLM errors -> update context -> handle completion or (check shutdown -> execute tools) -> repeat.

ReactLoop

ReactLoop(
    checkpoint_callback=None,
    *,
    approval_gate=None,
    stagnation_detector=None,
    compaction_callback=None,
)

ReAct execution loop: reason, act, observe.

The loop checks for shutdown, checks the budget, calls the LLM, checks for termination conditions, executes any requested tools, feeds results back, and repeats until the LLM signals completion, the turn limit is reached, the budget is exhausted, a shutdown is requested, or an error occurs.

Parameters:

Name Type Description Default
checkpoint_callback CheckpointCallback | None

Optional async callback invoked after each completed turn; the callback itself decides whether to persist.

None
approval_gate ApprovalGate | None

Optional gate that checks for pending escalations after tool execution and parks the agent when approval is required. None disables approval checks.

None
stagnation_detector StagnationDetector | None

Optional detector that checks for repetitive tool-call patterns and intervenes with corrective prompts or early termination. None disables stagnation detection.

None
compaction_callback CompactionCallback | None

Optional async callback invoked at turn boundaries to compress older conversation turns when the context fill level is high. None disables compaction.

None
Source code in src/synthorg/engine/react_loop.py
def __init__(
    self,
    checkpoint_callback: CheckpointCallback | None = None,
    *,
    approval_gate: ApprovalGate | None = None,
    stagnation_detector: StagnationDetector | None = None,
    compaction_callback: CompactionCallback | None = None,
) -> None:
    self._checkpoint_callback = checkpoint_callback
    self._approval_gate = approval_gate
    self._stagnation_detector = stagnation_detector
    self._compaction_callback = compaction_callback

approval_gate property

approval_gate

Return the approval gate, or None.

stagnation_detector property

stagnation_detector

Return the stagnation detector, or None.

compaction_callback property

compaction_callback

Return the compaction callback, or None.

get_loop_type

get_loop_type()

Return the loop type identifier.

Source code in src/synthorg/engine/react_loop.py
def get_loop_type(self) -> str:
    """Return the loop type identifier."""
    return "react"

execute async

execute(
    *,
    context,
    provider,
    tool_invoker=None,
    budget_checker=None,
    shutdown_checker=None,
    completion_config=None,
)

Run the ReAct loop until termination.

Parameters:

Name Type Description Default
context AgentContext

Initial agent context with conversation.

required
provider CompletionProvider

LLM completion provider.

required
tool_invoker ToolInvokerProtocol | None

Optional tool invoker for tool execution.

None
budget_checker BudgetChecker | None

Optional budget exhaustion callback.

None
shutdown_checker ShutdownChecker | None

Optional callback; returns True when a graceful shutdown has been requested.

None
completion_config CompletionConfig | None

Optional per-execution config override.

None

Returns:

Type Description
ExecutionResult

Execution result with final context and termination info.

Raises:

Type Description
MemoryError

Re-raised unconditionally (non-recoverable).

RecursionError

Re-raised unconditionally (non-recoverable).

Source code in src/synthorg/engine/react_loop.py
async def execute(  # noqa: PLR0913
    self,
    *,
    context: AgentContext,
    provider: CompletionProvider,
    tool_invoker: ToolInvokerProtocol | None = None,
    budget_checker: BudgetChecker | None = None,
    shutdown_checker: ShutdownChecker | None = None,
    completion_config: CompletionConfig | None = None,
) -> ExecutionResult:
    """Run the ReAct loop until termination.

    Args:
        context: Initial agent context with conversation.
        provider: LLM completion provider.
        tool_invoker: Optional tool invoker for tool execution.
        budget_checker: Optional budget exhaustion callback.
        shutdown_checker: Optional callback; returns ``True`` when
            a graceful shutdown has been requested.
        completion_config: Optional per-execution config override.

    Returns:
        Execution result with final context and termination info.

    Raises:
        MemoryError: Re-raised unconditionally (non-recoverable).
        RecursionError: Re-raised unconditionally (non-recoverable).
    """
    model_id, config, tool_defs, turns = self._prepare_loop(
        context, completion_config, tool_invoker
    )
    ctx = context
    corrections_injected = 0

    while ctx.has_turns_remaining:
        shutdown_result = check_shutdown(ctx, shutdown_checker, turns)
        if shutdown_result is not None:
            return shutdown_result

        budget_result = check_budget(ctx, budget_checker, turns)
        if budget_result is not None:
            return budget_result

        # Refresh tool defs each turn so newly loaded tools appear
        tool_defs = get_tool_definitions(tool_invoker, ctx.loaded_tools)

        turn_number = ctx.turn_count + 1
        response = await call_provider(
            ctx,
            provider,
            model_id,
            tool_defs,
            config,
            turn_number,
            turns,
        )
        if isinstance(response, ExecutionResult):
            return response

        turns.append(
            make_turn_record(
                turn_number,
                response,
                call_category=classify_turn(turn_number, response, ctx),
                provider_metadata=response.provider_metadata,
            )
        )

        result = await self._process_turn_response(
            ctx,
            response,
            turn_number,
            turns,
            tool_invoker,
            shutdown_checker,
        )
        if isinstance(result, ExecutionResult):
            return result
        ctx = result

        # Stagnation detection after successful turn processing
        stag_outcome = await check_stagnation(
            ctx,
            self._stagnation_detector,
            turns,
            corrections_injected,
            execution_id=ctx.execution_id,
        )
        if isinstance(stag_outcome, ExecutionResult):
            return stag_outcome
        if isinstance(stag_outcome, tuple):
            ctx, corrections_injected = stag_outcome

        # Context compaction at turn boundaries
        compacted = await invoke_compaction(
            ctx,
            self._compaction_callback,
            turn_number,
        )
        if compacted is not None:
            ctx = compacted

    logger.info(
        EXECUTION_LOOP_TERMINATED,
        execution_id=ctx.execution_id,
        reason=TerminationReason.MAX_TURNS.value,
        turns=len(turns),
    )
    return build_result(ctx, TerminationReason.MAX_TURNS, turns)

Plan-and-Execute Loop

plan_execute_loop

Plan-and-Execute execution loop.

Implements the ExecutionLoop protocol using a two-phase approach: 1. Plan -- ask the LLM to decompose the task into ordered steps. Planning calls pass tools=None (no tool access during planning). 2. Execute -- run each step via a mini-ReAct sub-loop with tools.

Re-planning is triggered when a step fails, up to a configurable limit. When re-planning is exhausted, the loop terminates with ERROR.

PlanExecuteLoop

PlanExecuteLoop(
    config=None,
    checkpoint_callback=None,
    *,
    approval_gate=None,
    stagnation_detector=None,
    compaction_callback=None,
)

Bases: PlanExecuteStepMixin

Plan-and-Execute execution loop.

Decomposes a task into steps via LLM planning, then executes each step with a mini-ReAct sub-loop. Supports re-planning on failure.

Parameters:

Name Type Description Default
config PlanExecuteConfig | None

Loop configuration. Defaults to PlanExecuteConfig().

None
checkpoint_callback CheckpointCallback | None

Optional per-turn checkpoint callback.

None
approval_gate ApprovalGate | None

Optional gate that checks for pending escalations after tool execution and parks the agent when approval is required. None disables approval checks.

None
stagnation_detector StagnationDetector | None

Optional detector that checks for repetitive tool-call patterns within each step and intervenes with corrective prompts or early termination. None disables stagnation detection.

None
compaction_callback CompactionCallback | None

Optional async callback invoked at turn boundaries to compress older conversation turns when the context fill level is high. None disables compaction.

None
Source code in src/synthorg/engine/plan_execute_loop.py
def __init__(
    self,
    config: PlanExecuteConfig | None = None,
    checkpoint_callback: CheckpointCallback | None = None,
    *,
    approval_gate: ApprovalGate | None = None,
    stagnation_detector: StagnationDetector | None = None,
    compaction_callback: CompactionCallback | None = None,
) -> None:
    self._config = config or PlanExecuteConfig()
    self._checkpoint_callback = checkpoint_callback
    self._approval_gate = approval_gate
    self._stagnation_detector = stagnation_detector
    self._compaction_callback = compaction_callback

config property

config

Return the loop configuration.

approval_gate property

approval_gate

Return the approval gate, or None.

stagnation_detector property

stagnation_detector

Return the stagnation detector, or None.

compaction_callback property

compaction_callback

Return the compaction callback, or None.

get_loop_type

get_loop_type()

Return the loop type identifier.

Source code in src/synthorg/engine/plan_execute_loop.py
def get_loop_type(self) -> str:
    """Return the loop type identifier."""
    return "plan_execute"

execute async

execute(
    *,
    context,
    provider,
    tool_invoker=None,
    budget_checker=None,
    shutdown_checker=None,
    completion_config=None,
)

Run the Plan-and-Execute loop until termination.

Parameters:

Name Type Description Default
context AgentContext

Initial agent context with conversation.

required
provider CompletionProvider

LLM completion provider.

required
tool_invoker ToolInvokerProtocol | None

Optional tool invoker for tool execution.

None
budget_checker BudgetChecker | None

Optional budget exhaustion callback.

None
shutdown_checker ShutdownChecker | None

Optional callback; returns True when a graceful shutdown has been requested.

None
completion_config CompletionConfig | None

Optional per-execution config override.

None

Returns:

Type Description
ExecutionResult

Execution result with final context and termination info.

Raises:

Type Description
MemoryError

Re-raised unconditionally (non-recoverable).

RecursionError

Re-raised unconditionally (non-recoverable).

Source code in src/synthorg/engine/plan_execute_loop.py
async def execute(  # noqa: PLR0913
    self,
    *,
    context: AgentContext,
    provider: CompletionProvider,
    tool_invoker: ToolInvokerProtocol | None = None,
    budget_checker: BudgetChecker | None = None,
    shutdown_checker: ShutdownChecker | None = None,
    completion_config: CompletionConfig | None = None,
) -> ExecutionResult:
    """Run the Plan-and-Execute loop until termination.

    Args:
        context: Initial agent context with conversation.
        provider: LLM completion provider.
        tool_invoker: Optional tool invoker for tool execution.
        budget_checker: Optional budget exhaustion callback.
        shutdown_checker: Optional callback; returns ``True`` when
            a graceful shutdown has been requested.
        completion_config: Optional per-execution config override.

    Returns:
        Execution result with final context and termination info.

    Raises:
        MemoryError: Re-raised unconditionally (non-recoverable).
        RecursionError: Re-raised unconditionally (non-recoverable).
    """
    logger.info(
        EXECUTION_LOOP_START,
        execution_id=context.execution_id,
        loop_type=self.get_loop_type(),
        max_turns=context.max_turns,
    )

    ctx = context
    default_model = ctx.identity.model.model_id
    planner_model = self._config.planner_model or default_model
    executor_model = self._config.executor_model or default_model
    default_config = completion_config or CompletionConfig(
        temperature=ctx.identity.model.temperature,
        max_tokens=ctx.identity.model.max_tokens,
    )
    tool_defs = get_tool_definitions(tool_invoker, ctx.loaded_tools)
    turns: list[TurnRecord] = []
    all_plans: list[ExecutionPlan] = []
    replans_used = 0

    # Planning.
    plan_result = await self._run_planning_phase(
        ctx,
        provider,
        planner_model,
        default_config,
        turns,
        shutdown_checker,
        budget_checker,
    )
    if isinstance(plan_result, ExecutionResult):
        return self._finalize(plan_result, all_plans, replans_used)
    ctx, plan = plan_result
    all_plans.append(plan)

    # Execute steps.
    return await self._run_steps(
        ctx,
        provider,
        executor_model,
        default_config,
        tool_defs,
        tool_invoker,
        plan,
        turns,
        all_plans,
        replans_used,
        planner_model,
        budget_checker,
        shutdown_checker,
    )

Plan Models

plan_models

Data models for the Plan-and-Execute execution loop.

Defines the plan structure (steps, status, revisions) and the configuration model for the plan-execute loop.

StepStatus

Bases: StrEnum

Execution status of a plan step.

PlanStep pydantic-model

Bases: BaseModel

A single step within an execution plan.

Attributes:

Name Type Description
step_number int

1-indexed position in the plan.

description NotBlankStr

What this step should accomplish.

expected_outcome NotBlankStr

The anticipated result of this step.

status StepStatus

Current execution status of this step.

actual_outcome NotBlankStr | None

Observed result after execution (if any).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

step_number pydantic-field

step_number

1-indexed step number

description pydantic-field

description

Step description

expected_outcome pydantic-field

expected_outcome

Anticipated result of this step

status pydantic-field

status = PENDING

Current execution status

actual_outcome pydantic-field

actual_outcome = None

Observed result after execution

ExecutionPlan pydantic-model

Bases: BaseModel

An ordered sequence of plan steps for task execution.

Attributes:

Name Type Description
steps tuple[PlanStep, ...]

Ordered tuple of plan steps.

revision_number int

Plan revision counter (0 = original).

original_task_summary NotBlankStr

Brief summary of the task being planned.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_sequential_step_numbers

steps pydantic-field

steps

Ordered plan steps

revision_number pydantic-field

revision_number = 0

Plan revision counter (0 = original)

original_task_summary pydantic-field

original_task_summary

Brief summary of the task being planned

PlanExecuteConfig pydantic-model

Bases: BaseModel

Configuration for the Plan-and-Execute loop.

Attributes:

Name Type Description
planner_model NotBlankStr | None

Model override for plan generation. None uses the agent's default model.

executor_model NotBlankStr | None

Model override for step execution. None uses the agent's default model.

max_replans int

Maximum number of re-planning attempts on step failure.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

planner_model pydantic-field

planner_model = None

Model override for plan generation (None = agent default)

executor_model pydantic-field

executor_model = None

Model override for step execution (None = agent default)

max_replans pydantic-field

max_replans = 3

Maximum number of re-planning attempts

Execution Context

context

Agent execution context.

Wraps an AgentIdentity (frozen config) with evolving runtime state (conversation, cost, turn count, task execution) using model_copy(update=...) for cheap, immutable state transitions.

DEFAULT_MAX_TURNS module-attribute

DEFAULT_MAX_TURNS = 20

Default hard limit on LLM turns per agent execution.

AgentContextSnapshot pydantic-model

Bases: BaseModel

Compact frozen snapshot of an AgentContext for reporting.

Attributes:

Name Type Description
execution_id NotBlankStr

Unique execution run identifier.

agent_id NotBlankStr

Agent identifier (string form of UUID).

task_id NotBlankStr | None

Task identifier, if a task is active.

turn_count int

Number of turns completed.

accumulated_cost TokenUsage

Running cost totals.

task_status TaskStatus | None

Current task status, if a task is active.

started_at AwareDatetime

When the execution began.

snapshot_at AwareDatetime

When this snapshot was taken.

message_count int

Number of messages in the conversation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_task_pair

execution_id pydantic-field

execution_id

Unique execution identifier

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id = None

Task identifier

turn_count pydantic-field

turn_count

Turns completed

accumulated_cost pydantic-field

accumulated_cost

Running cost totals

task_status pydantic-field

task_status = None

Current task status

started_at pydantic-field

started_at

Execution start time

snapshot_at pydantic-field

snapshot_at

When snapshot was taken

message_count pydantic-field

message_count

Messages in conversation

context_fill_tokens pydantic-field

context_fill_tokens = 0

Estimated context fill tokens

context_fill_percent pydantic-field

context_fill_percent = None

Context fill percentage

AgentContext pydantic-model

Bases: BaseModel

Frozen runtime context for agent execution.

All state evolution happens via model_copy(update=...). The context tracks the conversation, accumulated cost, and optionally a TaskExecution for task-bound agent runs.

Attributes:

Name Type Description
execution_id NotBlankStr

Unique identifier for this execution run.

identity AgentIdentity

Frozen agent identity configuration.

task_execution TaskExecution | None

Current task execution state (if any).

conversation tuple[ChatMessage, ...]

Accumulated chat messages.

accumulated_cost TokenUsage

Running token usage and cost totals.

turn_count int

Number of LLM turns completed.

max_turns int

Hard limit on turns before the engine stops.

started_at AwareDatetime

When this execution began.

context_fill_tokens int

Estimated tokens currently in the full context (system prompt + conversation + tool defs).

context_capacity_tokens int | None

Model's max context window tokens, or None when unknown.

compression_metadata CompressionMetadata | None

Metadata about conversation compression, set when compaction has occurred.

async_task_state AsyncTaskStateChannel

Dedicated state channel for tracked async tasks. Separate from conversation -- not touched by compaction or context reset.

loaded_tools frozenset[str]

Tool names with L2 bodies active in context.

loaded_resources frozenset[tuple[str, str]]

(tool_name, resource_id) pairs with L3 resources fetched.

tool_load_order tuple[str, ...]

Insertion-ordered tool names for FIFO auto-unload under budget pressure.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_disclosure_consistency

execution_id pydantic-field

execution_id

Unique execution run identifier

identity pydantic-field

identity

Frozen agent identity config

task_execution pydantic-field

task_execution = None

Current task execution state

conversation pydantic-field

conversation = ()

Accumulated conversation messages

accumulated_cost pydantic-field

accumulated_cost = ZERO_TOKEN_USAGE

Running cost totals across all turns

turn_count pydantic-field

turn_count = 0

Turns completed

max_turns pydantic-field

max_turns = DEFAULT_MAX_TURNS

Hard turn limit

started_at pydantic-field

started_at

When execution began

context_fill_tokens pydantic-field

context_fill_tokens = 0

Estimated tokens in the full context

context_capacity_tokens pydantic-field

context_capacity_tokens = None

Model's max context window tokens

compression_metadata pydantic-field

compression_metadata = None

Compression metadata when compacted

async_task_state pydantic-field

async_task_state

Async task tracking state (survives compaction and context reset)

loaded_tools pydantic-field

loaded_tools = frozenset()

Tool names with L2 body active in context

loaded_resources pydantic-field

loaded_resources = frozenset()

(tool_name, resource_id) pairs with L3 active

tool_load_order pydantic-field

tool_load_order = ()

Insertion-ordered tool names for FIFO unload

context_fill_percent property

context_fill_percent

Percentage of context window currently filled.

Returns None when context capacity is unknown.

has_turns_remaining property

has_turns_remaining

Whether the agent has turns remaining before hitting max_turns.

from_identity classmethod

from_identity(
    identity, *, task=None, max_turns=DEFAULT_MAX_TURNS, context_capacity_tokens=None
)

Create a fresh execution context from an agent identity.

Parameters:

Name Type Description Default
identity AgentIdentity

The frozen agent identity card.

required
task Task | None

Optional task to bind to this execution.

None
max_turns int

Maximum number of LLM turns allowed.

DEFAULT_MAX_TURNS
context_capacity_tokens int | None

Model's max context window tokens, or None when unknown.

None

Returns:

Type Description
AgentContext

New AgentContext ready for execution.

Source code in src/synthorg/engine/context.py
@classmethod
def from_identity(
    cls,
    identity: AgentIdentity,
    *,
    task: Task | None = None,
    max_turns: int = DEFAULT_MAX_TURNS,
    context_capacity_tokens: int | None = None,
) -> AgentContext:
    """Create a fresh execution context from an agent identity.

    Args:
        identity: The frozen agent identity card.
        task: Optional task to bind to this execution.
        max_turns: Maximum number of LLM turns allowed.
        context_capacity_tokens: Model's max context window
            tokens, or ``None`` when unknown.

    Returns:
        New ``AgentContext`` ready for execution.
    """
    task_execution = TaskExecution.from_task(task) if task is not None else None
    context = cls(
        execution_id=str(uuid4()),
        identity=identity,
        task_execution=task_execution,
        max_turns=max_turns,
        started_at=datetime.now(UTC),
        context_capacity_tokens=context_capacity_tokens,
    )
    logger.debug(
        EXECUTION_CONTEXT_CREATED,
        execution_id=context.execution_id,
        agent_id=str(identity.id),
        has_task=task is not None,
    )
    return context

with_message

with_message(msg)

Append a single message to the conversation.

Parameters:

Name Type Description Default
msg ChatMessage

The chat message to append.

required

Returns:

Type Description
AgentContext

New AgentContext with the message appended.

Source code in src/synthorg/engine/context.py
def with_message(self, msg: ChatMessage) -> AgentContext:
    """Append a single message to the conversation.

    Args:
        msg: The chat message to append.

    Returns:
        New ``AgentContext`` with the message appended.
    """
    return self.model_copy(update={"conversation": (*self.conversation, msg)})

with_turn_completed

with_turn_completed(usage, response_msg)

Record a completed turn.

Increments turn count, appends the response message, and accumulates cost on both the context and the task execution (if present).

Parameters:

Name Type Description Default
usage TokenUsage

Token usage from this turn's LLM call.

required
response_msg ChatMessage

The assistant's response message.

required

Returns:

Type Description
AgentContext

New AgentContext with updated state.

Raises:

Type Description
MaxTurnsExceededError

If max_turns has been reached.

Source code in src/synthorg/engine/context.py
def with_turn_completed(
    self,
    usage: TokenUsage,
    response_msg: ChatMessage,
) -> AgentContext:
    """Record a completed turn.

    Increments turn count, appends the response message, and
    accumulates cost on both the context and the task execution
    (if present).

    Args:
        usage: Token usage from this turn's LLM call.
        response_msg: The assistant's response message.

    Returns:
        New ``AgentContext`` with updated state.

    Raises:
        MaxTurnsExceededError: If ``max_turns`` has been reached.
    """
    if not self.has_turns_remaining:
        msg = (
            f"Agent {self.identity.id} exceeded max_turns "
            f"({self.max_turns}) for execution {self.execution_id}"
        )
        logger.error(
            EXECUTION_MAX_TURNS_EXCEEDED,
            execution_id=self.execution_id,
            agent_id=str(self.identity.id),
            max_turns=self.max_turns,
            turn_count=self.turn_count,
        )
        raise MaxTurnsExceededError(msg)
    updates: dict[str, object] = {
        "turn_count": self.turn_count + 1,
        "conversation": (*self.conversation, response_msg),
        "accumulated_cost": add_token_usage(self.accumulated_cost, usage),
    }
    if self.task_execution is not None:
        updates["task_execution"] = self.task_execution.with_cost(usage)

    result = self.model_copy(update=updates)
    logger.info(
        EXECUTION_CONTEXT_TURN,
        execution_id=self.execution_id,
        turn=result.turn_count,
        cost=usage.cost,
    )
    return result

with_context_fill

with_context_fill(fill_tokens)

Update the estimated context fill level.

Parameters:

Name Type Description Default
fill_tokens int

New estimated fill in tokens.

required

Returns:

Type Description
AgentContext

New AgentContext with updated fill level.

Raises:

Type Description
ValueError

If fill_tokens is negative.

Source code in src/synthorg/engine/context.py
def with_context_fill(self, fill_tokens: int) -> AgentContext:
    """Update the estimated context fill level.

    Args:
        fill_tokens: New estimated fill in tokens.

    Returns:
        New ``AgentContext`` with updated fill level.

    Raises:
        ValueError: If ``fill_tokens`` is negative.
    """
    if fill_tokens < 0:
        msg = f"fill_tokens must be >= 0, got {fill_tokens}"
        raise ValueError(msg)
    return self.model_copy(
        update={"context_fill_tokens": fill_tokens},
    )

with_async_task_state

with_async_task_state(state)

Replace the async task state channel.

Parameters:

Name Type Description Default
state AsyncTaskStateChannel

New state channel.

required

Returns:

Type Description
AgentContext

New AgentContext with updated state channel.

Source code in src/synthorg/engine/context.py
def with_async_task_state(
    self,
    state: AsyncTaskStateChannel,
) -> AgentContext:
    """Replace the async task state channel.

    Args:
        state: New state channel.

    Returns:
        New ``AgentContext`` with updated state channel.
    """
    return self.model_copy(update={"async_task_state": state})

with_compression

with_compression(metadata, compressed_conversation, fill_tokens)

Replace conversation with a compressed version.

Parameters:

Name Type Description Default
metadata CompressionMetadata

Compression metadata to attach.

required
compressed_conversation tuple[ChatMessage, ...]

The compressed message tuple.

required
fill_tokens int

Updated fill estimate after compression.

required

Returns:

Type Description
AgentContext

New AgentContext with compressed conversation.

Raises:

Type Description
ValueError

If fill_tokens is negative.

Source code in src/synthorg/engine/context.py
def with_compression(
    self,
    metadata: CompressionMetadata,
    compressed_conversation: tuple[ChatMessage, ...],
    fill_tokens: int,
) -> AgentContext:
    """Replace conversation with a compressed version.

    Args:
        metadata: Compression metadata to attach.
        compressed_conversation: The compressed message tuple.
        fill_tokens: Updated fill estimate after compression.

    Returns:
        New ``AgentContext`` with compressed conversation.

    Raises:
        ValueError: If ``fill_tokens`` is negative.
    """
    if fill_tokens < 0:
        msg = f"fill_tokens must be >= 0, got {fill_tokens}"
        raise ValueError(msg)
    return self.model_copy(
        update={
            "conversation": compressed_conversation,
            "compression_metadata": metadata,
            "context_fill_tokens": fill_tokens,
        },
    )

with_task_transition

with_task_transition(target, *, reason='')

Transition the task execution status.

Delegates to :meth:~synthorg.engine.task_execution.TaskExecution.with_transition.

Parameters:

Name Type Description Default
target TaskStatus

The desired target status.

required
reason str

Optional reason for the transition.

''

Returns:

Type Description
AgentContext

New AgentContext with updated task execution.

Raises:

Type Description
ExecutionStateError

If no task execution is set.

ValueError

If the transition is invalid (from validate_transition).

Source code in src/synthorg/engine/context.py
def with_task_transition(
    self,
    target: TaskStatus,
    *,
    reason: str = "",
) -> AgentContext:
    """Transition the task execution status.

    Delegates to
    :meth:`~synthorg.engine.task_execution.TaskExecution.with_transition`.

    Args:
        target: The desired target status.
        reason: Optional reason for the transition.

    Returns:
        New ``AgentContext`` with updated task execution.

    Raises:
        ExecutionStateError: If no task execution is set.
        ValueError: If the transition is invalid (from
            ``validate_transition``).
    """
    if self.task_execution is None:
        msg = "Cannot transition task status: no task execution is set"
        logger.error(
            EXECUTION_CONTEXT_NO_TASK,
            execution_id=self.execution_id,
            agent_id=str(self.identity.id),
            target_status=target.value,
        )
        raise ExecutionStateError(msg)
    try:
        new_execution = self.task_execution.with_transition(target, reason=reason)
    except ValueError:
        logger.warning(
            EXECUTION_CONTEXT_TRANSITION_FAILED,
            execution_id=self.execution_id,
            agent_id=str(self.identity.id),
            target_status=target.value,
            current_status=self.task_execution.status.value,
        )
        raise
    return self.model_copy(update={"task_execution": new_execution})

to_snapshot

to_snapshot()

Create a compact snapshot for reporting and logging.

Returns:

Type Description
AgentContextSnapshot

Frozen AgentContextSnapshot with current state.

Source code in src/synthorg/engine/context.py
def to_snapshot(self) -> AgentContextSnapshot:
    """Create a compact snapshot for reporting and logging.

    Returns:
        Frozen ``AgentContextSnapshot`` with current state.
    """
    te = self.task_execution
    snapshot = AgentContextSnapshot(
        execution_id=self.execution_id,
        agent_id=str(self.identity.id),
        task_id=te.task.id if te is not None else None,
        turn_count=self.turn_count,
        accumulated_cost=self.accumulated_cost,
        task_status=te.status if te is not None else None,
        started_at=self.started_at,
        snapshot_at=datetime.now(UTC),
        message_count=len(self.conversation),
        context_fill_tokens=self.context_fill_tokens,
        context_fill_percent=self.context_fill_percent,
    )
    logger.debug(
        EXECUTION_CONTEXT_SNAPSHOT,
        execution_id=self.execution_id,
    )
    return snapshot

with_tool_loaded

with_tool_loaded(tool_name)

Mark a tool's L2 body as loaded.

Idempotent: loading an already-loaded tool is a no-op.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to load.

required

Returns:

Type Description
AgentContext

New AgentContext with the tool marked as loaded.

Source code in src/synthorg/engine/context.py
def with_tool_loaded(self, tool_name: str) -> AgentContext:
    """Mark a tool's L2 body as loaded.

    Idempotent: loading an already-loaded tool is a no-op.

    Args:
        tool_name: Name of the tool to load.

    Returns:
        New ``AgentContext`` with the tool marked as loaded.
    """
    if tool_name in self.loaded_tools:
        return self
    new_loaded = self.loaded_tools | {tool_name}
    new_order = (*self.tool_load_order, tool_name)
    return self.model_copy(
        update={
            "loaded_tools": new_loaded,
            "tool_load_order": new_order,
        },
    )

with_tool_unloaded

with_tool_unloaded(tool_name)

Mark a tool's L2 body as unloaded.

Also removes any L3 resources for the unloaded tool. Idempotent: unloading an already-unloaded tool is a no-op.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to unload.

required

Returns:

Type Description
AgentContext

New AgentContext with the tool removed.

Source code in src/synthorg/engine/context.py
def with_tool_unloaded(self, tool_name: str) -> AgentContext:
    """Mark a tool's L2 body as unloaded.

    Also removes any L3 resources for the unloaded tool.
    Idempotent: unloading an already-unloaded tool is a no-op.

    Args:
        tool_name: Name of the tool to unload.

    Returns:
        New ``AgentContext`` with the tool removed.
    """
    if tool_name not in self.loaded_tools:
        return self
    new_loaded = self.loaded_tools - {tool_name}
    new_order = tuple(t for t in self.tool_load_order if t != tool_name)
    new_resources = frozenset(
        (t, r) for t, r in self.loaded_resources if t != tool_name
    )
    return self.model_copy(
        update={
            "loaded_tools": new_loaded,
            "tool_load_order": new_order,
            "loaded_resources": new_resources,
        },
    )

with_resource_loaded

with_resource_loaded(tool_name, resource_id)

Mark an L3 resource as fetched.

Idempotent: loading an already-loaded resource is a no-op.

Parameters:

Name Type Description Default
tool_name str

Name of the tool owning the resource.

required
resource_id str

Identifier of the resource.

required

Returns:

Type Description
AgentContext

New AgentContext with the resource marked as loaded.

Source code in src/synthorg/engine/context.py
def with_resource_loaded(
    self,
    tool_name: str,
    resource_id: str,
) -> AgentContext:
    """Mark an L3 resource as fetched.

    Idempotent: loading an already-loaded resource is a no-op.

    Args:
        tool_name: Name of the tool owning the resource.
        resource_id: Identifier of the resource.

    Returns:
        New ``AgentContext`` with the resource marked as loaded.
    """
    pair = (tool_name, resource_id)
    if pair in self.loaded_resources:
        return self
    new_resources = self.loaded_resources | {pair}
    return self.model_copy(
        update={"loaded_resources": new_resources},
    )

Prompt Builder

prompt

System prompt construction from agent identity and context.

Translates agent configuration (personality, skills, authority, role) into contextually rich system prompts that shape agent behavior during LLM calls.

Non-inferable principle: System prompts should contain only information that agents cannot discover by reading the codebase or environment. Full tool definitions are delivered via the LLM provider's API tools parameter. However, lightweight L1 metadata (name, category, cost tier, one-line description) IS injected into the system prompt so agents can discover what tools exist and decide which to load via load_tool().

Example::

from synthorg.engine.prompt import build_system_prompt

prompt = build_system_prompt(agent=agent_identity, task=task)
prompt.content  # rendered system prompt string

SystemPrompt pydantic-model

Bases: BaseModel

Immutable result of system prompt construction.

Attributes:

Name Type Description
content str

Full rendered prompt text.

template_version str

Version of the template that produced this prompt.

estimated_tokens int

Token estimate of the prompt content.

sections tuple[str, ...]

Names of sections included in the prompt.

metadata dict[str, str]

Agent identity metadata (agent_id, name, role, department, level, and optionally profile_tier).

personality_trim_info PersonalityTrimInfo | None

Populated when personality section was trimmed to fit the profile's token budget.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

content pydantic-field

content

Full rendered prompt text

template_version pydantic-field

template_version

Template version that produced this prompt

estimated_tokens pydantic-field

estimated_tokens

Estimated token count of prompt content

sections pydantic-field

sections

Names of sections included in the prompt

metadata pydantic-field

metadata

Agent identity metadata (string-only values; shallow-frozen)

personality_trim_info pydantic-field

personality_trim_info = None

Populated when personality section was trimmed

build_system_prompt

build_system_prompt(
    *,
    agent,
    role=None,
    task=None,
    available_tools=(),
    l1_summaries=(),
    company=None,
    org_policies=(),
    max_tokens=None,
    custom_template=None,
    token_estimator=None,
    effective_autonomy=None,
    context_budget_indicator=None,
    currency=DEFAULT_CURRENCY,
    model_tier=None,
    personality_trimming_enabled=True,
    max_personality_tokens_override=None,
    strategy_config=None,
    async_task_state=None,
)

Build a system prompt from agent identity and optional context.

When max_tokens is provided and the prompt exceeds it, optional sections are progressively trimmed (strategy, company, task, org_policies).

Parameters:

Name Type Description Default
agent AgentIdentity

Agent identity containing personality, skills, authority.

required
role Role | None

Optional role with description and responsibilities.

None
task Task | None

Optional task context injected into the prompt.

None
available_tools tuple[ToolDefinition, ...]

Tool definitions populated into template context for custom templates only; the default template omits tools per D22 (non-inferable principle).

()
l1_summaries tuple[ToolL1Metadata, ...]

L1 metadata for system prompt injection. Lightweight tool summaries rendered in the Available Tools section of the default template.

()
company Company | None

Opt-in. Non-inferable principle recommends omitting unless agents need org-level context they cannot discover.

None
org_policies tuple[str, ...]

Company-wide policy texts to inject into prompt.

()
max_tokens int | None

Token budget; sections are trimmed if exceeded.

None
custom_template str | None

Optional Jinja2 template string override.

None
token_estimator PromptTokenEstimator | None

Custom token estimator (defaults to char/4).

None
effective_autonomy EffectiveAutonomy | None

Resolved autonomy for the current run.

None
context_budget_indicator str | None

Formatted context budget indicator string to inject into the prompt.

None
currency CurrencyCode

ISO 4217 currency code for budget displays. Validated against the allowlist in synthorg.budget.currency.

DEFAULT_CURRENCY
model_tier ModelTier | None

Model capability tier for prompt profile selection. None defaults to the full (large) profile.

None
personality_trimming_enabled bool

When True (default), the personality section is progressively trimmed if it exceeds the profile's max_personality_tokens.

True
max_personality_tokens_override int | None

When set to a positive value, overrides the profile's max_personality_tokens limit. Values <= 0 are ignored (profile default is used).

None
strategy_config StrategyConfig | None

Strategy and trendslop mitigation config. When provided and the agent qualifies (C-suite/VP/Director or has explicit strategic_output_mode), strategic analysis sections are injected into the prompt.

None
async_task_state AsyncTaskStateChannel | None

Optional async task state channel. When non-empty, appends an Active Async Tasks section to the prompt (survives trimming).

None

Returns:

Name Type Description
Immutable SystemPrompt

class:SystemPrompt with rendered content and metadata.

Raises:

Type Description
PromptBuildError

If prompt construction fails.

Source code in src/synthorg/engine/prompt.py
def build_system_prompt(  # noqa: PLR0913, C901, PLR0912
    *,
    agent: AgentIdentity,
    role: Role | None = None,
    task: Task | None = None,
    available_tools: tuple[ToolDefinition, ...] = (),
    l1_summaries: tuple[ToolL1Metadata, ...] = (),
    company: Company | None = None,
    org_policies: tuple[str, ...] = (),
    max_tokens: int | None = None,
    custom_template: str | None = None,
    token_estimator: PromptTokenEstimator | None = None,
    effective_autonomy: EffectiveAutonomy | None = None,
    context_budget_indicator: str | None = None,
    currency: CurrencyCode = DEFAULT_CURRENCY,
    model_tier: ModelTier | None = None,
    personality_trimming_enabled: bool = True,
    max_personality_tokens_override: int | None = None,
    strategy_config: StrategyConfig | None = None,
    async_task_state: AsyncTaskStateChannel | None = None,
) -> SystemPrompt:
    """Build a system prompt from agent identity and optional context.

    When ``max_tokens`` is provided and the prompt exceeds it, optional
    sections are progressively trimmed (strategy, company, task,
    org_policies).

    Args:
        agent: Agent identity containing personality, skills, authority.
        role: Optional role with description and responsibilities.
        task: Optional task context injected into the prompt.
        available_tools: Tool definitions populated into template context
            for custom templates only; the default template omits tools
            per D22 (non-inferable principle).
        l1_summaries: L1 metadata for system prompt injection.
            Lightweight tool summaries rendered in the Available
            Tools section of the default template.
        company: Opt-in. Non-inferable principle recommends omitting
            unless agents need org-level context they cannot discover.
        org_policies: Company-wide policy texts to inject into prompt.
        max_tokens: Token budget; sections are trimmed if exceeded.
        custom_template: Optional Jinja2 template string override.
        token_estimator: Custom token estimator (defaults to char/4).
        effective_autonomy: Resolved autonomy for the current run.
        context_budget_indicator: Formatted context budget indicator
            string to inject into the prompt.
        currency: ISO 4217 currency code for budget displays.  Validated
            against the allowlist in ``synthorg.budget.currency``.
        model_tier: Model capability tier for prompt profile selection.
            ``None`` defaults to the full (large) profile.
        personality_trimming_enabled: When ``True`` (default), the
            personality section is progressively trimmed if it exceeds
            the profile's ``max_personality_tokens``.
        max_personality_tokens_override: When set to a positive value,
            overrides the profile's ``max_personality_tokens`` limit.
            Values ``<= 0`` are ignored (profile default is used).
        strategy_config: Strategy and trendslop mitigation config.
            When provided and the agent qualifies (C-suite/VP/Director
            or has explicit ``strategic_output_mode``), strategic
            analysis sections are injected into the prompt.
        async_task_state: Optional async task state channel.
            When non-empty, appends an ``Active Async Tasks``
            section to the prompt (survives trimming).

    Returns:
        Immutable :class:`SystemPrompt` with rendered content and metadata.

    Raises:
        PromptBuildError: If prompt construction fails.
    """
    validate_max_tokens(agent, max_tokens)
    validate_org_policies(agent, org_policies)

    if l1_summaries:
        logger.info(
            TOOL_L1_INJECTED,
            tool_count=len(l1_summaries),
            tool_names=tuple(s.name for s in l1_summaries),
        )

    profile = get_prompt_profile(model_tier)
    if max_personality_tokens_override is not None:
        if max_personality_tokens_override > 0:
            profile = profile.model_copy(
                update={"max_personality_tokens": max_personality_tokens_override},
            )
        else:
            logger.warning(
                PROMPT_PROFILE_SELECTED,
                override_ignored=max_personality_tokens_override,
                reason="max_personality_tokens_override must be > 0",
            )
    logger.info(
        PROMPT_PROFILE_SELECTED,
        requested_tier=model_tier,
        selected_tier=profile.tier,
        defaulted=model_tier is None,
        personality_mode=profile.personality_mode,
        autonomy_detail_level=profile.autonomy_detail_level,
    )

    # Advisory only -- issues are logged but never block prompt construction.
    if org_policies:
        try:
            validate_policy_quality(org_policies)
        except MemoryError, RecursionError:
            raise
        except Exception:
            logger.warning(
                PROMPT_POLICY_VALIDATION_FAILED,
                agent_id=str(agent.id),
            )

    logger.info(
        PROMPT_BUILD_START,
        agent_id=str(agent.id),
        agent_name=agent.name,
        has_task=task is not None,
        tool_count=len(available_tools),
        has_company=company is not None,
        has_custom_template=custom_template is not None,
        model_tier=model_tier,
    )

    try:
        estimator = token_estimator or DefaultTokenEstimator()
        template_str = resolve_template(custom_template)

        result = _render_with_trimming(
            template_str=template_str,
            agent=agent,
            role=role,
            task=task,
            available_tools=available_tools,
            l1_summaries=l1_summaries,
            company=company,
            org_policies=org_policies,
            max_tokens=max_tokens,
            estimator=estimator,
            effective_autonomy=effective_autonomy,
            context_budget_indicator=context_budget_indicator,
            currency=currency,
            profile=profile,
            trimming_enabled=personality_trimming_enabled,
            strategy_config=strategy_config,
        )
    except PromptBuildError:
        raise  # Already logged by inner functions.
    except MemoryError, RecursionError:
        logger.error(
            PROMPT_BUILD_ERROR,
            agent_id=str(agent.id),
            agent_name=agent.name,
            error="non-recoverable error building prompt",
        )
        raise
    except Exception as exc:
        logger.warning(
            PROMPT_BUILD_ERROR,
            agent_id=str(agent.id),
            agent_name=agent.name,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        detail = sanitize_message(str(exc))
        msg = f"Unexpected error building prompt for agent '{agent.name}': {detail}"
        raise PromptBuildError(msg) from exc

    # Inject async task state section (survives trimming -- appended
    # after the main render since it must never be trimmed away).
    try:
        if async_task_state is not None and async_task_state.records:
            result = _inject_async_task_section(
                result,
                async_task_state,
                estimator,
            )
    except MemoryError, RecursionError:
        raise
    except Exception as exc:
        logger.warning(
            PROMPT_BUILD_ERROR,
            agent_id=str(agent.id),
            agent_name=agent.name,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        detail = sanitize_message(str(exc))
        msg = f"Error injecting async task state for agent '{agent.name}': {detail}"
        raise PromptBuildError(msg) from exc

    return _log_and_return(agent, result)

build_error_prompt

build_error_prompt(identity, agent_id, system_prompt)

Return the existing system prompt or a minimal error placeholder.

Used by the engine when the execution pipeline fails and a SystemPrompt was never built (or was partially built).

Parameters:

Name Type Description Default
identity AgentIdentity

Agent identity for metadata.

required
agent_id str

String agent identifier.

required
system_prompt SystemPrompt | None

Previously built prompt, or None.

required

Returns:

Type Description
SystemPrompt

The existing prompt if available, else a minimal placeholder.

Source code in src/synthorg/engine/prompt.py
def build_error_prompt(
    identity: AgentIdentity,
    agent_id: str,
    system_prompt: SystemPrompt | None,
) -> SystemPrompt:
    """Return the existing system prompt or a minimal error placeholder.

    Used by the engine when the execution pipeline fails and a
    ``SystemPrompt`` was never built (or was partially built).

    Args:
        identity: Agent identity for metadata.
        agent_id: String agent identifier.
        system_prompt: Previously built prompt, or ``None``.

    Returns:
        The existing prompt if available, else a minimal placeholder.
    """
    if system_prompt is not None:
        return system_prompt
    metadata = {**_build_metadata(identity), "agent_id": agent_id}
    return SystemPrompt(
        content="",
        template_version="error",
        estimated_tokens=0,
        sections=(),
        metadata=metadata,
    )

Task Execution

task_execution

Runtime task execution state.

Wraps the frozen Task config model with evolving execution state (status, cost, turn count) using model_copy(update=...) for cheap, immutable state transitions.

StatusTransition pydantic-model

Bases: BaseModel

Frozen audit record for a single status transition.

Attributes:

Name Type Description
from_status TaskStatus

Status before the transition.

to_status TaskStatus

Status after the transition.

timestamp AwareDatetime

When the transition occurred (timezone-aware).

reason str

Optional human-readable reason for the transition.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

from_status pydantic-field

from_status

Status before transition

to_status pydantic-field

to_status

Status after transition

timestamp pydantic-field

timestamp

When the transition occurred

reason pydantic-field

reason = ''

Optional reason for the transition

TaskExecution pydantic-model

Bases: BaseModel

Frozen runtime wrapper around a Task for execution tracking.

All state evolution happens via model_copy(update=...). Transitions are validated explicitly via :func:~synthorg.core.task_transitions.validate_transition before the copy is made.

Attributes:

Name Type Description
task Task

Original frozen task definition.

status TaskStatus

Current execution status (starts from task.status).

transition_log tuple[StatusTransition, ...]

Audit trail of status transitions.

accumulated_cost TokenUsage

Running token usage and cost totals.

turn_count int

Number of LLM turns completed.

retry_count int

Number of previous failure-reassignment cycles.

started_at AwareDatetime | None

Set by with_transition on first entry to IN_PROGRESS (None until then).

completed_at AwareDatetime | None

When execution reached a terminal state.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

task pydantic-field

task

Original frozen task definition

status pydantic-field

status

Current execution status

transition_log pydantic-field

transition_log = ()

Audit trail of status transitions

accumulated_cost pydantic-field

accumulated_cost = ZERO_TOKEN_USAGE

Running cost totals

turn_count pydantic-field

turn_count = 0

Number of turns completed

retry_count pydantic-field

retry_count = 0

Number of previous failure-reassignment cycles

started_at pydantic-field

started_at = None

When execution entered IN_PROGRESS

completed_at pydantic-field

completed_at = None

When execution reached a terminal state

is_terminal property

is_terminal

Whether execution is in a terminal state.

from_task classmethod

from_task(task, *, retry_count=0)

Create a fresh execution from a task definition.

Parameters:

Name Type Description Default
task Task

The frozen task to wrap.

required
retry_count int

Number of previous failure-reassignment cycles.

0

Returns:

Type Description
TaskExecution

New TaskExecution with status matching the task.

Source code in src/synthorg/engine/task_execution.py
@classmethod
def from_task(
    cls,
    task: Task,
    *,
    retry_count: int = 0,
) -> TaskExecution:
    """Create a fresh execution from a task definition.

    Args:
        task: The frozen task to wrap.
        retry_count: Number of previous failure-reassignment cycles.

    Returns:
        New ``TaskExecution`` with status matching the task.
    """
    execution = cls(task=task, status=task.status, retry_count=retry_count)
    logger.debug(
        EXECUTION_TASK_CREATED,
        task_id=task.id,
        initial_status=task.status.value,
    )
    return execution

with_transition

with_transition(target, *, reason='')

Validate and apply a status transition.

Raises:

Type Description
ValueError

If the transition is invalid.

Source code in src/synthorg/engine/task_execution.py
def with_transition(
    self,
    target: TaskStatus,
    *,
    reason: str = "",
) -> TaskExecution:
    """Validate and apply a status transition.

    Raises:
        ValueError: If the transition is invalid.
    """
    try:
        validate_transition(self.status, target)
    except ValueError:
        logger.warning(
            EXECUTION_TASK_TRANSITION_FAILED,
            task_id=self.task.id,
            from_status=self.status.value,
            to_status=target.value,
            turn_count=self.turn_count,
        )
        raise
    now = datetime.now(UTC)
    transition = StatusTransition(
        from_status=self.status,
        to_status=target,
        timestamp=now,
        reason=reason,
    )
    updates = _build_transition_updates(
        self,
        target,
        transition,
        now,
    )
    result = self.model_copy(update=updates)
    logger.info(
        EXECUTION_TASK_TRANSITION,
        task_id=self.task.id,
        from_status=self.status.value,
        to_status=target.value,
        reason=reason,
    )
    return result

with_cost

with_cost(usage)

Accumulate token usage and increment turn count.

Parameters:

Name Type Description Default
usage TokenUsage

Token usage from a single LLM call.

required

Returns:

Type Description
TaskExecution

New TaskExecution with updated cost and turn count.

Raises:

Type Description
ExecutionStateError

If execution is in a terminal state.

Source code in src/synthorg/engine/task_execution.py
def with_cost(self, usage: TokenUsage) -> TaskExecution:
    """Accumulate token usage and increment turn count.

    Args:
        usage: Token usage from a single LLM call.

    Returns:
        New ``TaskExecution`` with updated cost and turn count.

    Raises:
        ExecutionStateError: If execution is in a terminal state.
    """
    if self.is_terminal:
        msg = (
            f"Cannot record cost on terminal task execution "
            f"(task_id={self.task.id}, status={self.status.value})"
        )
        logger.error(
            EXECUTION_COST_ON_TERMINAL,
            task_id=self.task.id,
            status=self.status.value,
        )
        raise ExecutionStateError(msg)
    result = self.model_copy(
        update={
            "accumulated_cost": add_token_usage(self.accumulated_cost, usage),
            "turn_count": self.turn_count + 1,
        }
    )
    logger.debug(
        EXECUTION_COST_RECORDED,
        task_id=self.task.id,
        turn=result.turn_count,
        cost=usage.cost,
    )
    return result

to_task_snapshot

to_task_snapshot()

Return the original task with the current execution status.

Useful for persistence or reporting where a plain Task is expected.

Returns:

Type Description
Task

A copy of the original task with updated status.

Source code in src/synthorg/engine/task_execution.py
def to_task_snapshot(self) -> Task:
    """Return the original task with the current execution status.

    Useful for persistence or reporting where a plain ``Task`` is
    expected.

    Returns:
        A copy of the original task with updated status.
    """
    return self.task.model_copy(update={"status": self.status})

Parallel Execution

parallel

Parallel agent execution orchestrator.

Coordinates multiple AgentEngine.run() calls in parallel using structured concurrency (asyncio.TaskGroup), with error isolation, concurrency limits, resource locking, and progress tracking.

Inspired by the ToolInvoker.invoke_all() pattern from tools/invoker.py (TaskGroup + Semaphore + guarded execution), extended with fail-fast, progress tracking, and CancelledError handling.

ProgressCallback module-attribute

ProgressCallback = Callable[[ParallelProgress], None]

Synchronous callback invoked on progress updates.

Called directly (not awaited) from the executor's event loop; must not block. Async functions will produce un-awaited coroutines.

ParallelExecutor

ParallelExecutor(
    *,
    engine,
    shutdown_manager=None,
    resource_lock=None,
    progress_callback=None,
    clock=None,
)

Orchestrates concurrent agent execution.

Composition over inheritance -- takes an AgentEngine and coordinates concurrent run() calls.

Parameters:

Name Type Description Default
engine AgentEngine

Agent execution engine.

required
shutdown_manager ShutdownManager | None

Optional shutdown manager for task registration.

None
resource_lock ResourceLock | None

Optional resource lock for exclusive file access. Defaults to InMemoryResourceLock if any assignments declare resource claims.

None
progress_callback ProgressCallback | None

Optional synchronous callback invoked on progress updates.

None
Source code in src/synthorg/engine/parallel.py
def __init__(
    self,
    *,
    engine: AgentEngine,
    shutdown_manager: ShutdownManager | None = None,
    resource_lock: ResourceLock | None = None,
    progress_callback: ProgressCallback | None = None,
    clock: Clock | None = None,
) -> None:
    self._engine = engine
    self._shutdown_manager = shutdown_manager
    self._resource_lock = resource_lock
    self._progress_callback = progress_callback
    self._clock: Clock = clock if clock is not None else SystemClock()

execute_group async

execute_group(group)

Execute a parallel group of agent assignments.

Parameters:

Name Type Description Default
group ParallelExecutionGroup

The execution group to run.

required

Returns:

Type Description
ParallelExecutionResult

Result with all agent outcomes.

Raises:

Type Description
ResourceConflictError

If resource claims conflict between assignments.

ParallelExecutionError

If fatal errors (MemoryError, RecursionError) occurred during execution.

Source code in src/synthorg/engine/parallel.py
async def execute_group(
    self,
    group: ParallelExecutionGroup,
) -> ParallelExecutionResult:
    """Execute a parallel group of agent assignments.

    Args:
        group: The execution group to run.

    Returns:
        Result with all agent outcomes.

    Raises:
        ResourceConflictError: If resource claims conflict between
            assignments.
        ParallelExecutionError: If fatal errors (MemoryError,
            RecursionError) occurred during execution.
    """
    start = self._clock.monotonic()

    logger.info(
        PARALLEL_GROUP_START,
        group_id=group.group_id,
        agent_count=len(group.assignments),
        max_concurrency=group.max_concurrency,
        fail_fast=group.fail_fast,
    )

    lock = self._resolve_lock(group)
    self._validate_resource_claims(group)

    outcomes: dict[str, AgentOutcome] = {}
    fatal_errors: list[Exception] = []
    progress = _ProgressState(
        group_id=group.group_id,
        total=len(group.assignments),
    )

    task_error: Exception | None = None
    release_error: Exception | None = None
    try:
        if lock is not None:
            await self._acquire_all_locks(group, lock)
        await self._run_task_group(
            group,
            outcomes,
            fatal_errors,
            progress,
        )
    except Exception as exc:
        task_error = exc
    finally:
        if lock is not None:
            try:
                await self._release_all_locks(group, lock)
            except MemoryError, RecursionError:
                raise
            except Exception as release_exc:
                logger.warning(
                    PARALLEL_LOCK_RELEASE_ERROR,
                    note="Failed to release resource locks",
                    group_id=group.group_id,
                    error_type=type(release_exc).__name__,
                    error=safe_error_description(release_exc),
                )
                release_error = release_exc

    if release_error is not None:
        lock_msg = (
            f"Parallel group {group.group_id!r}: "
            "resource locks could not be released"
        )
        if task_error is not None:
            task_error.add_note(lock_msg)
        else:
            raise ParallelExecutionError(
                lock_msg,
            ) from release_error

    if task_error is not None:
        raise task_error

    result = self._build_result(
        group,
        outcomes,
        self._clock.monotonic() - start,
    )

    logger.info(
        PARALLEL_GROUP_COMPLETE,
        group_id=group.group_id,
        succeeded=result.agents_succeeded,
        failed=result.agents_failed,
        duration_seconds=result.total_duration_seconds,
    )

    if fatal_errors:
        msg = (
            f"Parallel group {group.group_id!r} had "
            f"{len(fatal_errors)} fatal error(s)"
        )
        logger.error(
            PARALLEL_AGENT_ERROR,
            group_id=group.group_id,
            fatal_error_count=len(fatal_errors),
            error=msg,
        )
        raise ParallelExecutionError(msg) from fatal_errors[0]

    return result

Run Result

run_result

Agent run result model.

Frozen Pydantic model wrapping ExecutionResult with outer metadata from the engine layer (system prompt, wall-clock duration, agent/task IDs).

AgentRunResult pydantic-model

Bases: BaseModel

Immutable result of a complete agent engine run.

Wraps the ExecutionResult from the loop with engine-level metadata: system prompt, wall-clock duration, and agent/task IDs.

Attributes:

Name Type Description
execution_result ExecutionResult

Outcome from the execution loop.

system_prompt SystemPrompt

System prompt used for this run.

duration_seconds float

Wall-clock run time in seconds.

agent_id NotBlankStr

Agent identifier (string form of UUID).

task_id NotBlankStr | None

Task identifier (always set currently; None reserved for future taskless runs).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

execution_result pydantic-field

execution_result

Outcome from the execution loop

system_prompt pydantic-field

system_prompt

System prompt used for this run

duration_seconds pydantic-field

duration_seconds

Wall-clock run time in seconds

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id = None

Task identifier, or None for future taskless runs

produced_artifacts pydantic-field

produced_artifacts = ()

Artifacts produced during execution

currency pydantic-field

currency

ISO 4217 currency that denominates total_cost. Populated by the engine from the active BudgetConfig.currency so cross-agent aggregations (e.g. ParallelExecutionResult.total_cost) can enforce the same-currency invariant before summing. Required so constructor sites cannot silently mis-label a non-default run as DEFAULT_CURRENCY.

termination_reason property

termination_reason

Why the execution loop terminated.

total_turns property

total_turns

Number of turns completed during execution.

total_cost property

total_cost

Accumulated cost from the execution context.

is_success property

is_success

True when termination reason is COMPLETED.

completion_summary property

completion_summary

Extract the last assistant message content as a work summary.

Walks the conversation in reverse to find the most recent assistant message with non-empty text content. Tool-call-only assistant messages (content is None or empty) are skipped.

Returns:

Type Description
str | None

The content string, or None if no qualifying message exists.

Metrics

metrics

Task completion metrics model.

Proxy overhead metrics for an agent run, computed from AgentRunResult data per the Operations design page.

TaskCompletionMetrics pydantic-model

Bases: BaseModel

Proxy overhead metrics for an agent run (see Operations design page).

Computed from AgentRunResult after execution to surface orchestration overhead indicators (turns, tokens, cost, duration).

Attributes:

Name Type Description
task_id NotBlankStr | None

Task identifier (None for future taskless runs).

agent_id NotBlankStr

Agent identifier (string form of UUID).

turns_per_task int

Number of LLM turns to complete the task.

tokens_per_task int

Total tokens consumed (input + output).

cost_per_task float

Total cost for the task in the configured currency.

duration_seconds float

Wall-clock execution time in seconds.

prompt_tokens int

Estimated system prompt tokens (per-call estimate from SystemPrompt.estimated_tokens).

prompt_token_ratio float

Per-call ratio of prompt tokens to total tokens (overhead indicator, derived via @computed_field). For multi-turn runs, the actual overhead is higher because the system prompt is resent on every turn.

accuracy_effort_ratio float | None

Accuracy-effort ratio from step-level quality signals (None when quality signals are unavailable).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _cap_prompt_tokens

task_id pydantic-field

task_id = None

Task identifier

agent_id pydantic-field

agent_id

Agent identifier

turns_per_task pydantic-field

turns_per_task

Number of LLM turns to complete the task

tokens_per_task pydantic-field

tokens_per_task

Total tokens consumed (input + output)

cost_per_task pydantic-field

cost_per_task

Total cost for the task in the configured currency

duration_seconds pydantic-field

duration_seconds

Wall-clock execution time in seconds

prompt_tokens pydantic-field

prompt_tokens = 0

Estimated system prompt tokens

accuracy_effort_ratio pydantic-field

accuracy_effort_ratio = None

Accuracy-effort ratio from step-level quality signals (None when quality signals are unavailable)

prompt_token_ratio property

prompt_token_ratio

Per-call ratio of prompt tokens to total tokens (overhead indicator).

For multi-turn runs the actual overhead is higher because the system prompt is resent on every turn.

from_run_result classmethod

from_run_result(result)

Build metrics from an agent run result.

Parameters:

Name Type Description Default
result AgentRunResult

The AgentRunResult to extract metrics from.

required

Returns:

Type Description
TaskCompletionMetrics

New TaskCompletionMetrics with values extracted from

TaskCompletionMetrics

the result's execution context and metadata.

Source code in src/synthorg/engine/metrics.py
@classmethod
def from_run_result(cls, result: AgentRunResult) -> TaskCompletionMetrics:
    """Build metrics from an agent run result.

    Args:
        result: The ``AgentRunResult`` to extract metrics from.

    Returns:
        New ``TaskCompletionMetrics`` with values extracted from
        the result's execution context and metadata.
    """
    accumulated = result.execution_result.context.accumulated_cost
    ae_data = result.execution_result.metadata.get("accuracy_effort")
    ae_ratio: float | None = None
    if ae_data is not None:
        if isinstance(ae_data, AccuracyEffortRatio):
            ae_ratio = ae_data.ratio
        else:
            logger.warning(
                EXECUTION_METRICS_UNEXPECTED_TYPE,
                type=type(ae_data).__name__,
                task_id=result.task_id,
            )
    return cls(
        task_id=result.task_id,
        agent_id=result.agent_id,
        turns_per_task=result.total_turns,
        tokens_per_task=accumulated.total_tokens,
        cost_per_task=result.total_cost,
        duration_seconds=result.duration_seconds,
        prompt_tokens=result.system_prompt.estimated_tokens,
        accuracy_effort_ratio=ae_ratio,
    )

Errors

errors

Engine-layer error hierarchy.

EngineError

EngineError(message=None)

Bases: DomainError

Base exception for all engine-layer errors.

Inherits from :class:DomainError so the prefix-vs-category validator runs on every subclass; a typo in a subclass error_code whose first digit no longer matches the declared error_category is rejected at class-definition time.

Class Attributes

status_code: Default HTTP status for API exposure (500). error_code: Default RFC 9457 error code. error_category: Default RFC 9457 error category. retryable: Whether the client should retry the request. default_message: Generic 5xx-safe message used by exception handlers.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PromptBuildError

PromptBuildError(message=None)

Bases: EngineError

Raised when system prompt construction fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ExecutionStateError

ExecutionStateError(message=None)

Bases: EngineError

Raised when an execution state transition is invalid.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MaxTurnsExceededError

MaxTurnsExceededError(message=None)

Bases: EngineError

Raised when turn_count reaches max_turns during execution.

Enforced by AgentContext.with_turn_completed when the hard turn limit has been reached.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

LoopExecutionError

LoopExecutionError(message=None)

Bases: EngineError

Non-recoverable execution loop error for the engine layer.

The execution loop returns TerminationReason.ERROR internally. This exception is available for the engine layer above the loop to convert that result into a raised error when appropriate.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ParallelExecutionError

ParallelExecutionError(message=None)

Bases: EngineError

Raised when a parallel execution group encounters a fatal error.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ResourceConflictError

ResourceConflictError(message=None)

Bases: EngineError

Raised when resource claims conflict between assignments.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

DecompositionError

DecompositionError(message=None)

Bases: EngineError

Base exception for task decomposition failures.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

DecompositionCycleError

DecompositionCycleError(message=None)

Bases: DecompositionError

Raised when a dependency cycle is detected in the subtask graph.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

DecompositionDepthError

DecompositionDepthError(message=None)

Bases: DecompositionError

Raised when decomposition exceeds the maximum nesting depth.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskRoutingError

TaskRoutingError(message=None)

Bases: EngineError

Raised when task routing to an agent fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskAssignmentError

TaskAssignmentError(message=None)

Bases: EngineError

Raised when task assignment fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

NoEligibleAgentError

NoEligibleAgentError(message=None)

Bases: TaskAssignmentError

Raised when no eligible agent is found for assignment.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

RecoveryConfigError

RecoveryConfigError(message=None)

Bases: EngineError

Configuration cannot satisfy the selected recovery strategy.

Typical cause: EngineRecoveryConfig.strategy == CHECKPOINT but no :class:CheckpointRepository was wired through to the factory.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ProjectNotFoundError

ProjectNotFoundError(*, project_id)

Bases: EngineError

Referenced project does not exist.

The message is deliberately generic to avoid leaking internal identifiers. The project_id attribute is available for structured logs but must NOT be included in user-facing responses.

Attributes:

Name Type Description
project_id NotBlankStr

The project identifier that was not found.

Source code in src/synthorg/engine/errors.py
def __init__(self, *, project_id: NotBlankStr) -> None:
    super().__init__("Project not found")
    self.project_id: NotBlankStr = project_id

ProjectAgentNotMemberError

ProjectAgentNotMemberError(*, project_id, agent_id)

Bases: EngineError

Agent is not a member of the task's project team.

The message is deliberately generic to avoid leaking internal identifiers. Attributes are available for structured logs only.

Attributes:

Name Type Description
project_id NotBlankStr

The project the agent attempted to access.

agent_id NotBlankStr

The agent that is not in the project team.

Source code in src/synthorg/engine/errors.py
def __init__(
    self,
    *,
    project_id: NotBlankStr,
    agent_id: NotBlankStr,
) -> None:
    super().__init__("Agent not authorized for this project")
    self.project_id: NotBlankStr = project_id
    self.agent_id: NotBlankStr = agent_id

WorkspaceError

WorkspaceError(message=None)

Bases: EngineError

Base exception for workspace isolation failures.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkspaceSetupError

WorkspaceSetupError(message=None)

Bases: WorkspaceError

Raised when workspace creation fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkspaceMergeError

WorkspaceMergeError(message=None)

Bases: WorkspaceError

Raised when workspace merge fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkspaceCleanupError

WorkspaceCleanupError(message=None)

Bases: WorkspaceError

Raised when workspace teardown fails.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkspaceLimitError

WorkspaceLimitError(message=None)

Bases: WorkspaceError

Raised when maximum concurrent workspaces reached.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskEngineError

TaskEngineError(message=None)

Bases: EngineError

Base exception for all task engine errors.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskEngineNotRunningError

TaskEngineNotRunningError(message=None)

Bases: TaskEngineError

Raised when a mutation is submitted to a stopped task engine.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskEngineQueueFullError

TaskEngineQueueFullError(message=None)

Bases: TaskEngineError

Raised when the task engine queue is at capacity.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskMutationError

TaskMutationError(message=None)

Bases: TaskEngineError

Raised when a task mutation fails (not found, validation, etc.).

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskNotFoundError

TaskNotFoundError(message=None)

Bases: TaskMutationError

Raised when a task is not found during mutation.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskVersionConflictError

TaskVersionConflictError(message=None)

Bases: TaskMutationError

Raised when optimistic concurrency version does not match.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskInternalError

TaskInternalError(message=None)

Bases: TaskEngineError

Raised when a task mutation fails due to an internal engine error.

Sibling of :class:TaskMutationError, not a subtype, so a broad except TaskMutationError handler does not accidentally catch internal engine faults. Inherits the default 500 / ENGINE_ERROR / INTERNAL metadata from :class:TaskEngineError.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

DelegationRoundLimitError

DelegationRoundLimitError(current_round, soft_limit)

Bases: EngineError

Hard abort when delegation rounds exceed 2x the soft cap.

Attributes:

Name Type Description
current_round int

The round number that triggered the abort.

soft_limit int

The configured soft cap on delegation rounds.

Source code in src/synthorg/engine/errors.py
def __init__(self, current_round: int, soft_limit: int) -> None:
    self.current_round: int = current_round
    self.soft_limit: int = soft_limit
    super().__init__(
        f"Delegation round {current_round} exceeds hard limit "
        f"({soft_limit * 2}, soft cap {soft_limit})"
    )

CoordinationError

CoordinationError(message=None)

Bases: EngineError

Base exception for multi-agent coordination failures.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

CoordinationPhaseError

CoordinationPhaseError(message, *, phase, partial_phases=())

Bases: CoordinationError

Raised when a coordination pipeline phase fails.

Carries the failing phase name and all phase results accumulated up to and including the failure, enabling partial-result inspection.

Attributes:

Name Type Description
phase str

Name of the phase that failed.

partial_phases tuple[CoordinationPhaseResult, ...]

Phase results accumulated before and including this failure.

Source code in src/synthorg/engine/errors.py
def __init__(
    self,
    message: str,
    *,
    phase: str,
    partial_phases: tuple[CoordinationPhaseResult, ...] = (),
) -> None:
    super().__init__(message)
    self.phase: str = phase
    self.partial_phases: tuple[CoordinationPhaseResult, ...] = partial_phases

RuntimeServicesBuildError

RuntimeServicesBuildError(message=None)

Bases: EngineError

Raised when the boot/reinit runtime-services build fails.

Wraps the underlying failure from build_runtime_services (provider registry, tool registry, agent engine, or coordinator factory) so the boot hook and the /setup/complete controller see a typed domain error instead of a raw exception. The original cause is preserved via raise ... from exc.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowExecutionError

WorkflowExecutionError(message=None)

Bases: EngineError

Base exception for workflow execution failures.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowDefinitionInvalidError

WorkflowDefinitionInvalidError(message=None)

Bases: WorkflowExecutionError

Raised when a workflow definition fails validation at activation time.

422 + REQUEST_VALIDATION_ERROR: a definition that fails activation-time structural checks is a caller-side validation failure surfaced after the request reached the engine, not an internal fault. Aligns with :class:WorkflowDefinitionValidationError (the create/update path) so every "invalid workflow definition" surface emits the same 422 envelope.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowConditionEvalError

WorkflowConditionEvalError(message=None)

Bases: WorkflowExecutionError

Raised when a condition expression cannot be evaluated.

422 + REQUEST_VALIDATION_ERROR: a condition expression that fails evaluation is authored by the caller as part of the workflow definition, so the failure is a request-shape problem rather than an engine fault.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowExecutionNotFoundError

WorkflowExecutionNotFoundError(message=None)

Bases: WorkflowExecutionError

Raised when a workflow execution instance is not found.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

SubworkflowNotFoundError

SubworkflowNotFoundError(message, *, subworkflow_id, version)

Bases: WorkflowExecutionError

Raised when a referenced subworkflow version cannot be resolved.

Attributes:

Name Type Description
subworkflow_id NotBlankStr

The subworkflow identifier.

version NotBlankStr

The semver pin that failed to resolve.

Source code in src/synthorg/engine/errors.py
def __init__(
    self,
    message: str,
    *,
    subworkflow_id: NotBlankStr,
    version: NotBlankStr,
) -> None:
    super().__init__(message)
    self.subworkflow_id: NotBlankStr = subworkflow_id
    self.version: NotBlankStr = version

SubworkflowCycleError

SubworkflowCycleError(message, *, cycle_path)

Bases: WorkflowExecutionError

Raised when the subworkflow reference graph contains a cycle.

Attributes:

Name Type Description
cycle_path tuple[tuple[str, str], ...]

Ordered (subworkflow_id, version) tuples that participate in the cycle.

Source code in src/synthorg/engine/errors.py
def __init__(
    self,
    message: str,
    *,
    cycle_path: tuple[tuple[str, str], ...],
) -> None:
    super().__init__(message)
    self.cycle_path: tuple[tuple[str, str], ...] = cycle_path

SubworkflowDepthExceededError

SubworkflowDepthExceededError(message, *, depth, max_depth)

Bases: WorkflowExecutionError

Raised when runtime subworkflow nesting exceeds the configured limit.

Attributes:

Name Type Description
depth int

The depth at which the limit was exceeded.

max_depth int

The configured maximum.

Source code in src/synthorg/engine/errors.py
def __init__(
    self,
    message: str,
    *,
    depth: int,
    max_depth: int,
) -> None:
    super().__init__(message)
    self.depth: int = depth
    self.max_depth: int = max_depth

SubworkflowIOError

SubworkflowIOError(message=None)

Bases: WorkflowExecutionError

Raised when subworkflow input or output binding is invalid.

Covers missing required inputs, unknown inputs, unknown outputs, type mismatches, and invalid binding expressions. The 422 mapping treats binding mismatches as caller-side validation failures so the centralised RFC 9457 dispatch surfaces a structured envelope without controller-level translation.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowTypeInvalidError

WorkflowTypeInvalidError(message=None)

Bases: WorkflowExecutionError

Raised when a request specifies an unknown workflow_type value.

Uses REQUEST_VALIDATION_ERROR (2001) and 400 to align with the other request-shape failures in this module: the value did not parse against the WorkflowType enum at the API boundary.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowDefinitionValidationError

WorkflowDefinitionValidationError(message=None)

Bases: WorkflowExecutionError

Raised when a workflow definition fails structural checks.

The default message is intentionally generic so Pydantic validation detail does not leak to API clients; callers may still chain the underlying exception with raise … from exc for the structured log emitted by the centralised handler.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowYamlExportError

WorkflowYamlExportError(message=None)

Bases: WorkflowExecutionError

Raised when YAML serialisation of a workflow definition fails.

Maps to 422 (Unprocessable Entity) on /workflows/{id}/export: the request itself is well-formed, but the persisted definition cannot be serialised to YAML -- a content-level failure rather than a request-syntax problem.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

WorkflowExecutionAlreadyTerminalError

WorkflowExecutionAlreadyTerminalError(message=None)

Bases: VersionConflictError

Raised when cancel targets an execution already in a terminal status.

Distinct from :class:synthorg.core.domain_errors.VersionConflictError (4002) so API clients can discriminate "the execution finished before you cancelled" (no retry will succeed) from a row-level optimistic- concurrency race where the caller can re-read and try again. Both map to 409 + CONFLICT so the HTTP envelope shape is unchanged; only the error_code differs.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

SelfReviewError

SelfReviewError(*, task_id, agent_id)

Bases: EngineError

Raised when an agent attempts to review their own work.

Structurally prevents an agent from acting as reviewer on a task they executed, enforcing separation of duties at the approval gate.

The exception message is deliberately generic ("Self-review is not permitted") to avoid leaking internal agent/task identifiers across authorization boundaries when the message is surfaced via an HTTP error response. The task_id and agent_id attributes are available for structured logs but must NOT be passed to user-facing error responses.

Attributes:

Name Type Description
task_id NotBlankStr

The task identifier the self-review was attempted on.

agent_id NotBlankStr

The agent identifier that is both executor and reviewer.

Source code in src/synthorg/engine/errors.py
def __init__(
    self,
    *,
    task_id: NotBlankStr,
    agent_id: NotBlankStr,
) -> None:
    super().__init__("Self-review is not permitted")
    self.task_id: NotBlankStr = task_id
    self.agent_id: NotBlankStr = agent_id

Task Decomposition

protocol

Decomposition strategy protocol.

DecompositionStrategy

Bases: Protocol

Protocol for task decomposition strategies.

Implementations produce a DecompositionPlan from a parent task and a decomposition context. The plan describes subtask definitions and their dependency relationships.

decompose async

decompose(task, context)

Decompose a task into subtasks.

Parameters:

Name Type Description Default
task Task

The parent task to decompose.

required
context DecompositionContext

Decomposition constraints (max subtasks, depth).

required

Returns:

Type Description
DecompositionPlan

A decomposition plan with subtask definitions.

Source code in src/synthorg/engine/decomposition/protocol.py
async def decompose(
    self,
    task: Task,
    context: DecompositionContext,
) -> DecompositionPlan:
    """Decompose a task into subtasks.

    Args:
        task: The parent task to decompose.
        context: Decomposition constraints (max subtasks, depth).

    Returns:
        A decomposition plan with subtask definitions.
    """
    ...

get_strategy_name

get_strategy_name()

Return a human-readable name for this strategy.

Source code in src/synthorg/engine/decomposition/protocol.py
def get_strategy_name(self) -> str:
    """Return a human-readable name for this strategy."""
    ...

models

Decomposition domain models.

Frozen Pydantic models for subtask definitions, decomposition plans, results, status rollups, and decomposition context.

SubtaskDefinition pydantic-model

Bases: BaseModel

Definition of a single subtask within a decomposition plan.

Attributes:

Name Type Description
id NotBlankStr

Unique subtask identifier (within this decomposition).

title NotBlankStr

Short subtask title.

description NotBlankStr

Detailed subtask description.

dependencies tuple[NotBlankStr, ...]

IDs of other subtasks this one depends on.

estimated_complexity Complexity

Complexity estimate for routing.

required_skills tuple[NotBlankStr, ...]

Skill IDs needed for routing.

required_tags tuple[NotBlankStr, ...]

Tags needed for multi-faceted routing match. When set, the routing scorer awards a small bonus to agents whose matched-skill tags cover every required tag. Empty tuple disables the tag-match tier.

required_role NotBlankStr | None

Optional role name for routing.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_no_self_dependency

id pydantic-field

id

Unique subtask identifier

title pydantic-field

title

Short subtask title

description pydantic-field

description

Detailed subtask description

dependencies pydantic-field

dependencies = ()

IDs of subtasks this one depends on

estimated_complexity pydantic-field

estimated_complexity = MEDIUM

Complexity estimate for routing

required_skills pydantic-field

required_skills = ()

Skill IDs needed for routing

required_tags pydantic-field

required_tags = ()

Tags needed for multi-faceted routing match

required_role pydantic-field

required_role = None

Optional role name for routing

DecompositionPlan pydantic-model

Bases: BaseModel

Plan describing how a parent task is decomposed into subtasks.

Validates subtask collection integrity at construction: non-empty, unique IDs, valid dependency references. Cycle detection is handled by DependencyGraph.validate() in the service layer.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the task being decomposed.

subtasks tuple[SubtaskDefinition, ...]

Ordered subtask definitions.

task_structure TaskStructure

Classified structure of the subtask graph.

coordination_topology CoordinationTopology

Selected coordination topology.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_subtasks

parent_task_id pydantic-field

parent_task_id

ID of the task being decomposed

subtasks pydantic-field

subtasks

Ordered subtask definitions

task_structure pydantic-field

task_structure = SEQUENTIAL

Classified task structure

coordination_topology pydantic-field

coordination_topology = AUTO

Selected coordination topology

DecompositionResult pydantic-model

Bases: BaseModel

Result of a complete task decomposition.

Attributes:

Name Type Description
plan DecompositionPlan

The decomposition plan that was executed.

created_tasks tuple[Task, ...]

Task objects created from subtask definitions.

dependency_edges tuple[tuple[NotBlankStr, NotBlankStr], ...]

Directed edges (from_id, to_id) in the DAG.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_plan_task_consistency

plan pydantic-field

plan

Executed decomposition plan

created_tasks pydantic-field

created_tasks

Task objects created from subtask definitions

dependency_edges pydantic-field

dependency_edges = ()

Directed edges (from_id, to_id) in the DAG

SubtaskStatusRollup pydantic-model

Bases: BaseModel

Aggregated status of subtasks for a parent task.

Tracks six explicit statuses: COMPLETED, FAILED, IN_PROGRESS, BLOCKED, CANCELLED, and SUSPENDED. Other statuses (CREATED, ASSIGNED, IN_REVIEW, INTERRUPTED) are not individually tracked; the gap between the sum of tracked counts and total accounts for these. The derived_parent_status treats any such remainder as work still pending (IN_PROGRESS).

When all subtasks are in terminal states but with a mix of completed and cancelled, derived_parent_status returns CANCELLED (some work was abandoned).

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

total int

Total number of subtasks.

completed int

Count of COMPLETED subtasks.

failed int

Count of FAILED subtasks.

in_progress int

Count of IN_PROGRESS subtasks.

blocked int

Count of BLOCKED subtasks.

cancelled int

Count of CANCELLED subtasks.

suspended int

Count of SUSPENDED subtasks.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_counts

parent_task_id pydantic-field

parent_task_id

Parent task ID

total pydantic-field

total

Total subtasks

completed pydantic-field

completed

Completed subtasks

failed pydantic-field

failed

Failed subtasks

in_progress pydantic-field

in_progress

In-progress subtasks

blocked pydantic-field

blocked

Blocked subtasks

cancelled pydantic-field

cancelled

Cancelled subtasks

suspended pydantic-field

suspended = 0

Suspended subtasks

derived_parent_status property

derived_parent_status

Derive the parent task status from subtask statuses.

DecompositionContext pydantic-model

Bases: BaseModel

Configuration context for a decomposition operation.

Attributes:

Name Type Description
max_subtasks int

Maximum number of subtasks allowed.

max_depth int

Maximum nesting depth for recursive decomposition.

current_depth int

Current nesting depth.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

max_subtasks pydantic-field

max_subtasks = 10

Maximum number of subtasks allowed

max_depth pydantic-field

max_depth = 3

Maximum nesting depth

current_depth pydantic-field

current_depth = 0

Current nesting depth

service

Decomposition service.

Orchestrates strategy, classifier, DAG validation, and task creation to decompose a parent task into executable subtasks.

DecompositionService

DecompositionService(strategy, classifier)

Service orchestrating task decomposition.

Composes a decomposition strategy with a structure classifier, DAG validator, and task factory to produce executable subtasks.

Source code in src/synthorg/engine/decomposition/service.py
def __init__(
    self,
    strategy: DecompositionStrategy,
    classifier: TaskStructureClassifier,
) -> None:
    self._strategy = strategy
    self._classifier = classifier

decompose_task async

decompose_task(task, context)

Decompose a task into subtasks.

  1. Classify task structure (uses explicit if set, otherwise heuristic inference). Override the plan's structure with the classifier's result when they differ.
  2. Call strategy.decompose().
  3. Validate DAG via DependencyGraph.
  4. Create Task objects from SubtaskDefinitions.
  5. Return DecompositionResult.

Parameters:

Name Type Description Default
task Task

The parent task to decompose.

required
context DecompositionContext

Decomposition constraints.

required

Returns:

Type Description
DecompositionResult

Decomposition result with created tasks and dependency edges.

Source code in src/synthorg/engine/decomposition/service.py
async def decompose_task(
    self,
    task: Task,
    context: DecompositionContext,
) -> DecompositionResult:
    """Decompose a task into subtasks.

    1. Classify task structure (uses explicit if set,
       otherwise heuristic inference). Override the plan's
       structure with the classifier's result when they differ.
    2. Call strategy.decompose().
    3. Validate DAG via DependencyGraph.
    4. Create Task objects from SubtaskDefinitions.
    5. Return DecompositionResult.

    Args:
        task: The parent task to decompose.
        context: Decomposition constraints.

    Returns:
        Decomposition result with created tasks and dependency edges.
    """
    logger.info(
        DECOMPOSITION_STARTED,
        task_id=task.id,
        strategy=self._strategy.get_strategy_name(),
        current_depth=context.current_depth,
    )

    try:
        return await self._do_decompose(task, context)
    except Exception:
        logger.exception(
            DECOMPOSITION_FAILED,
            task_id=task.id,
            strategy=self._strategy.get_strategy_name(),
        )
        raise

rollup_status

rollup_status(parent_task_id, subtask_statuses)

Compute status rollup for a parent task.

Parameters:

Name Type Description Default
parent_task_id NotBlankStr

The parent task identifier.

required
subtask_statuses tuple[TaskStatus, ...]

Statuses of all subtasks.

required

Returns:

Type Description
SubtaskStatusRollup

Aggregated status rollup.

Source code in src/synthorg/engine/decomposition/service.py
def rollup_status(
    self,
    parent_task_id: NotBlankStr,
    subtask_statuses: tuple[TaskStatus, ...],
) -> SubtaskStatusRollup:
    """Compute status rollup for a parent task.

    Args:
        parent_task_id: The parent task identifier.
        subtask_statuses: Statuses of all subtasks.

    Returns:
        Aggregated status rollup.
    """
    return StatusRollup.compute(parent_task_id, subtask_statuses)

Task Routing

models

Task routing domain models.

Frozen Pydantic models for routing candidates, decisions, results, and topology configuration.

RoutingCandidate pydantic-model

Bases: BaseModel

A candidate agent for a subtask with scoring details.

Attributes:

Name Type Description
agent_identity AgentIdentity

The candidate agent.

score float

Match score between 0.0 and 1.0.

matched_skills tuple[NotBlankStr, ...]

Skills that matched the subtask requirements.

reason NotBlankStr

Human-readable explanation of the score.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_identity pydantic-field

agent_identity

Candidate agent

score pydantic-field

score

Match score (0.0-1.0)

matched_skills pydantic-field

matched_skills = ()

Skills that matched subtask requirements

reason pydantic-field

reason

Explanation of score

RoutingDecision pydantic-model

Bases: BaseModel

Routing decision for a single subtask.

Attributes:

Name Type Description
subtask_id NotBlankStr

ID of the subtask being routed.

selected_candidate RoutingCandidate

The chosen agent candidate.

alternatives tuple[RoutingCandidate, ...]

Other candidates considered.

topology CoordinationTopology

Coordination topology for this subtask.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_selected_not_in_alternatives

subtask_id pydantic-field

subtask_id

Subtask being routed

selected_candidate pydantic-field

selected_candidate

Chosen agent candidate

alternatives pydantic-field

alternatives = ()

Other candidates considered

topology pydantic-field

topology

Coordination topology for this subtask

RoutingResult pydantic-model

Bases: BaseModel

Result of routing all subtasks in a decomposition.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

decisions tuple[RoutingDecision, ...]

Routing decisions for routable subtasks.

unroutable tuple[NotBlankStr, ...]

IDs of subtasks with no matching agent.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_unique_subtask_ids

parent_task_id pydantic-field

parent_task_id

Parent task ID

decisions pydantic-field

decisions = ()

Routing decisions

unroutable pydantic-field

unroutable = ()

Subtask IDs with no matching agent

AutoTopologyConfig pydantic-model

Bases: BaseModel

Configuration for automatic topology selection.

Attributes:

Name Type Description
sequential_override CoordinationTopology

Topology for sequential structures.

parallel_default CoordinationTopology

Topology for parallel structures.

mixed_default CoordinationTopology

Topology for mixed structures.

parallel_artifact_threshold int

Artifact count above which parallel tasks use decentralized topology.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_no_auto_defaults

sequential_override pydantic-field

sequential_override = SAS

Topology for sequential structures

parallel_default pydantic-field

parallel_default = CENTRALIZED

Topology for parallel structures

mixed_default pydantic-field

mixed_default = CONTEXT_DEPENDENT

Topology for mixed structures

parallel_artifact_threshold pydantic-field

parallel_artifact_threshold = 4

Artifact count threshold for decentralized topology

service

Task routing service.

Routes decomposed subtasks to appropriate agents based on scoring, then selects coordination topology.

TaskRoutingService

TaskRoutingService(scorer, topology_selector)

Routes subtasks to agents based on skill matching.

For each subtask in a decomposition result, scores all available agents and selects the best match. Subtasks with no viable candidate are reported as unroutable.

Source code in src/synthorg/engine/routing/service.py
def __init__(
    self,
    scorer: AgentTaskScorer,
    topology_selector: TopologySelector,
) -> None:
    self._scorer = scorer
    self._topology_selector = topology_selector

route

route(decomposition_result, available_agents, parent_task)

Route all subtasks to appropriate agents.

For each subtask: 1. Score all available agents. 2. Select the best candidate (highest score >= min_score). 3. Select topology from parent task override or plan structure. 4. Report unroutable subtasks.

Parameters:

Name Type Description Default
decomposition_result DecompositionResult

The decomposition to route.

required
available_agents tuple[AgentIdentity, ...]

Pool of agents to consider.

required
parent_task Task

The parent task (for topology selection).

required

Returns:

Type Description
RoutingResult

Routing result with decisions and unroutable subtask IDs.

Source code in src/synthorg/engine/routing/service.py
def route(
    self,
    decomposition_result: DecompositionResult,
    available_agents: tuple[AgentIdentity, ...],
    parent_task: Task,
) -> RoutingResult:
    """Route all subtasks to appropriate agents.

    For each subtask:
    1. Score all available agents.
    2. Select the best candidate (highest score >= min_score).
    3. Select topology from parent task override or plan structure.
    4. Report unroutable subtasks.

    Args:
        decomposition_result: The decomposition to route.
        available_agents: Pool of agents to consider.
        parent_task: The parent task (for topology selection).

    Returns:
        Routing result with decisions and unroutable subtask IDs.
    """
    plan = decomposition_result.plan

    if parent_task.id != plan.parent_task_id:
        msg = (
            f"parent_task.id {parent_task.id!r} does not "
            f"match plan.parent_task_id "
            f"{plan.parent_task_id!r}"
        )
        logger.warning(
            TASK_ROUTING_FAILED,
            parent_task_id=parent_task.id,
            plan_parent_task_id=plan.parent_task_id,
            error=msg,
        )
        raise ValueError(msg)

    logger.info(
        TASK_ROUTING_STARTED,
        parent_task_id=plan.parent_task_id,
        subtask_count=len(plan.subtasks),
        agent_count=len(available_agents),
    )

    if not available_agents:
        logger.warning(
            TASK_ROUTING_NO_AGENTS,
            parent_task_id=plan.parent_task_id,
            subtask_count=len(plan.subtasks),
        )
        return RoutingResult(
            parent_task_id=plan.parent_task_id,
            unroutable=tuple(s.id for s in plan.subtasks),
        )

    try:
        return self._do_route(decomposition_result, available_agents, parent_task)
    except Exception:
        logger.exception(
            TASK_ROUTING_FAILED,
            parent_task_id=plan.parent_task_id,
        )
        raise

Task Assignment

protocol

Task assignment strategy protocol.

Defines the pluggable interface for assignment strategies.

TaskAssignmentStrategy

Bases: Protocol

Protocol for task assignment strategies.

Implementations must be synchronous (pure computation, no I/O) and return an AssignmentResult with the selected agent and ranked alternatives. TaskAssignmentService calls assign() synchronously -- async implementations will NOT work correctly.

Error signaling contract:

  • ManualAssignmentStrategy raises NoEligibleAgentError when the designated agent is not found or not ACTIVE, and TaskAssignmentError when task.assigned_to is None.
  • Scoring-based strategies (the ScoringBasedAssignmentStrategy compositions for role-based, load-balanced, cost-optimized, hierarchical, and auction) return AssignmentResult(selected=None, ...) when no agent meets the minimum score threshold.

TaskAssignmentService propagates both patterns: it re-raises TaskAssignmentError (including its subclass NoEligibleAgentError) and logs a warning when result.selected is None, returning the result to the caller for handling.

name property

name

Strategy name identifier.

assign

assign(request)

Assign a task to an agent based on the strategy's algorithm.

Parameters:

Name Type Description Default
request AssignmentRequest

The assignment request with task and agent pool.

required

Returns:

Type Description
AssignmentResult

Assignment result with selected agent and alternatives.

AssignmentResult

selected may be None when no eligible agent is

AssignmentResult

found (scoring strategies) -- callers must check this.

Raises:

Type Description
TaskAssignmentError

When preconditions are violated (e.g. missing assigned_to for manual strategy).

NoEligibleAgentError

When the designated agent cannot be found or is not ACTIVE (manual strategy only).

Source code in src/synthorg/engine/assignment/protocol.py
def assign(
    self,
    request: AssignmentRequest,
) -> AssignmentResult:
    """Assign a task to an agent based on the strategy's algorithm.

    Args:
        request: The assignment request with task and agent pool.

    Returns:
        Assignment result with selected agent and alternatives.
        ``selected`` may be ``None`` when no eligible agent is
        found (scoring strategies) -- callers must check this.

    Raises:
        TaskAssignmentError: When preconditions are violated
            (e.g. missing ``assigned_to`` for manual strategy).
        NoEligibleAgentError: When the designated agent cannot
            be found or is not ACTIVE (manual strategy only).
    """
    ...

models

Task assignment domain models.

Frozen Pydantic models for assignment requests, results, agent workloads, and assignment candidates.

AgentWorkload pydantic-model

Bases: BaseModel

Snapshot of an agent's current workload.

Attributes:

Name Type Description
agent_id NotBlankStr

Unique agent identifier.

active_task_count int

Number of tasks currently in progress.

total_cost float

Total cost incurred by this agent in the configured currency.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_id pydantic-field

agent_id

Agent identifier

active_task_count pydantic-field

active_task_count

Number of tasks currently in progress

total_cost pydantic-field

total_cost = 0.0

Total cost incurred by this agent in the configured currency

AssignmentCandidate pydantic-model

Bases: BaseModel

A candidate agent for task assignment with scoring details.

Attributes:

Name Type Description
agent_identity AgentIdentity

The candidate agent.

score float

Match score between 0.0 and 1.0.

matched_skills tuple[NotBlankStr, ...]

Skills that matched the assignment requirements.

reason NotBlankStr

Human-readable explanation of the score.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_identity pydantic-field

agent_identity

Candidate agent

score pydantic-field

score

Match score (0.0-1.0)

matched_skills pydantic-field

matched_skills = ()

Skills that matched assignment requirements

reason pydantic-field

reason

Explanation of score

AssignmentRequest pydantic-model

Bases: BaseModel

Request for task assignment to an agent.

The required_skills and required_role fields live here (not on Task) so that scoring strategies can evaluate agent-task fit without modifying the Task model.

Attributes:

Name Type Description
task Task

The task to assign.

available_agents tuple[AgentIdentity, ...]

Pool of agents to consider (must be non-empty, unique by agent id).

workloads tuple[AgentWorkload, ...]

Current workload snapshots per agent (unique by agent_id).

min_score float

Minimum score threshold for eligibility.

required_skills tuple[NotBlankStr, ...]

Skill names needed for scoring.

required_role NotBlankStr | None

Optional role name for scoring.

max_concurrent_tasks int | None

Maximum concurrent tasks per agent. Agents at or above this limit are excluded from scoring. None disables the limit. Corresponds to TaskAssignmentConfig.max_concurrent_tasks_per_agent.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_collections

task pydantic-field

task

The task to assign

available_agents pydantic-field

available_agents

Pool of agents to consider

workloads pydantic-field

workloads = ()

Current workload snapshots per agent

min_score pydantic-field

min_score = 0.1

Minimum score threshold for eligibility

required_skills pydantic-field

required_skills = ()

Skill names needed for scoring

required_role pydantic-field

required_role = None

Optional role name for scoring

max_concurrent_tasks pydantic-field

max_concurrent_tasks = None

Maximum concurrent tasks per agent. Agents at or above this limit are excluded from scoring. None = no limit.

project_team pydantic-field

project_team = ()

Project team agent IDs for filtering. When non-empty, only agents whose ID is in this set are eligible.

AssignmentResult pydantic-model

Bases: BaseModel

Result of a task assignment operation.

Attributes:

Name Type Description
task_id NotBlankStr

ID of the task that was assigned.

strategy_used NotBlankStr

Name of the strategy that produced this result.

selected AssignmentCandidate | None

The selected candidate (None if no viable agent).

alternatives tuple[AssignmentCandidate, ...]

Other candidates considered, ranked by score.

reason NotBlankStr

Human-readable explanation of the assignment decision.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_selected_not_in_alternatives

task_id pydantic-field

task_id

Task identifier

strategy_used pydantic-field

strategy_used

Name of the strategy used

selected pydantic-field

selected = None

Selected candidate (None if no viable agent)

alternatives pydantic-field

alternatives = ()

Other candidates considered, ranked by score

reason pydantic-field

reason

Explanation of decision

service

Task assignment service.

Orchestrates task assignment by delegating to a pluggable TaskAssignmentStrategy with logging and validation.

TaskAssignmentService

TaskAssignmentService(strategy)

Orchestrates task assignment via a pluggable strategy.

Validates task status before delegating to the strategy. Does NOT mutate the task -- callers are responsible for any subsequent status transitions.

Source code in src/synthorg/engine/assignment/service.py
def __init__(self, strategy: TaskAssignmentStrategy) -> None:
    self._strategy = strategy

assign

assign(request)

Assign a task to an agent using the configured strategy.

Parameters:

Name Type Description Default
request AssignmentRequest

The assignment request.

required

Returns:

Type Description
AssignmentResult

Assignment result from the strategy.

Raises:

Type Description
TaskAssignmentError

If the task status is not eligible for assignment.

Source code in src/synthorg/engine/assignment/service.py
def assign(self, request: AssignmentRequest) -> AssignmentResult:
    """Assign a task to an agent using the configured strategy.

    Args:
        request: The assignment request.

    Returns:
        Assignment result from the strategy.

    Raises:
        TaskAssignmentError: If the task status is not eligible
            for assignment.
    """
    task = request.task

    if task.status not in _ASSIGNABLE_STATUSES:
        msg = (
            f"Task {task.id!r} has status {task.status.value!r}, "
            f"expected one of "
            f"{sorted(s.value for s in _ASSIGNABLE_STATUSES)}"
        )
        logger.warning(
            TASK_ASSIGNMENT_FAILED,
            task_id=task.id,
            status=task.status.value,
            error=msg,
        )
        raise TaskAssignmentError(msg)

    # Filter to project team members when specified.
    if request.project_team:
        from synthorg.engine.assignment.models import (  # noqa: PLC0415
            AssignmentResult,
        )

        team_set = frozenset(request.project_team)
        filtered = tuple(
            a for a in request.available_agents if str(a.id) in team_set
        )
        if not filtered:
            logger.warning(
                TASK_ASSIGNMENT_PROJECT_NO_ELIGIBLE,
                task_id=task.id,
                available_agents=len(request.available_agents),
                project_team_size=len(request.project_team),
            )
            return AssignmentResult(
                task_id=task.id,
                strategy_used=self._strategy.name,
                reason=("No available agents are members of the project team"),
            )
        logger.info(
            TASK_ASSIGNMENT_PROJECT_FILTERED,
            task_id=task.id,
            total_agents=len(request.available_agents),
            eligible_agents=len(filtered),
        )
        request = request.model_copy(
            update={"available_agents": filtered},
        )

    logger.info(
        TASK_ASSIGNMENT_STARTED,
        task_id=task.id,
        strategy=self._strategy.name,
        agent_count=len(request.available_agents),
    )

    try:
        result = self._strategy.assign(request)
    except TaskAssignmentError:
        raise  # already logged by the strategy
    except Exception:
        logger.exception(
            TASK_ASSIGNMENT_FAILED,
            task_id=task.id,
            strategy=self._strategy.name,
        )
        raise

    if result.selected is not None:
        logger.info(
            TASK_ASSIGNMENT_AGENT_SELECTED,
            task_id=task.id,
            agent_name=result.selected.agent_identity.name,
            score=result.selected.score,
            strategy=result.strategy_used,
        )
    else:
        logger.warning(
            TASK_ASSIGNMENT_NO_ELIGIBLE,
            task_id=task.id,
            strategy=self._strategy.name,
            reason=result.reason,
        )

    logger.info(
        TASK_ASSIGNMENT_COMPLETE,
        task_id=task.id,
        strategy=result.strategy_used,
        selected=result.selected is not None,
        alternatives=len(result.alternatives),
    )

    return result

Error Classification

models

Classification result models for the error taxonomy pipeline.

Defines severity levels, individual error findings, and aggregated classification results produced by the detection pipeline.

ErrorSeverity

Bases: StrEnum

Severity level for a detected coordination error.

ErrorFinding pydantic-model

Bases: BaseModel

A single coordination error detected during classification.

Attributes:

Name Type Description
category ErrorCategory

The error category from the taxonomy.

severity ErrorSeverity

Severity level of the finding.

description NotBlankStr

Human-readable description of the error.

evidence tuple[NotBlankStr, ...]

Supporting evidence extracted from the conversation.

turn_range tuple[int, int] | None

(start, end) 0-based index range where the error was observed, or None if the error cannot be attributed to specific positions. For conversation-based detectors this is the message index; for turn-based detectors this is the index into the turns tuple.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_turn_range

category pydantic-field

category

Error taxonomy category

severity pydantic-field

severity

Severity level

description pydantic-field

description

Error description

evidence pydantic-field

evidence = ()

Supporting evidence from conversation

turn_range pydantic-field

turn_range = None

0-based index range (start, end) where error was observed. For conversation-based detectors this is the message index in the conversation tuple; for turn-based detectors this is the index into the turns tuple.

ClassificationResult pydantic-model

Bases: BaseModel

Aggregated result from the error classification pipeline.

Attributes:

Name Type Description
execution_id NotBlankStr

Unique identifier for the execution run.

agent_id NotBlankStr

Agent that was executing.

task_id NotBlankStr

Task being executed.

categories_checked tuple[ErrorCategory, ...]

Which error categories were checked.

findings tuple[ErrorFinding, ...]

All detected error findings.

classified_at AwareDatetime

Timestamp when classification completed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_findings_match_categories

execution_id pydantic-field

execution_id

Execution run identifier

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id

Task identifier

categories_checked pydantic-field

categories_checked

Categories that were checked

findings pydantic-field

findings = ()

Detected error findings

classified_at pydantic-field

classified_at

Classification timestamp

finding_count property

finding_count

Total number of detected findings.

has_findings property

has_findings

Whether any error findings were detected.

pipeline

Error classification pipeline.

Orchestrates the detection of coordination errors from an execution result using the configured error taxonomy. Detectors are discovered dynamically from the ErrorTaxonomyConfig.detectors dict and dispatched via the Detector protocol. The pipeline never raises exceptions -- all errors are caught and logged.

classify_execution_errors async

classify_execution_errors(
    execution_result,
    agent_id,
    task_id,
    *,
    config,
    task_repo=None,
    provider=None,
    sinks=(),
)

Classify coordination errors from an execution result.

Discovers detectors from config.detectors, loads scope-appropriate context, runs detectors sequentially (concurrency happens inside CompositeDetector), and dispatches results to registered sinks.

Rate limiting is handled by the BaseCompletionProvider internally -- semantic detectors no longer accept a separate rate limiter to avoid double-throttling.

Returns None when the taxonomy is disabled. Never raises; all exceptions except MemoryError/RecursionError are caught and logged as CLASSIFICATION_ERROR.

Parameters:

Name Type Description Default
execution_result ExecutionResult

The completed execution result to analyse.

required
agent_id NotBlankStr

Agent that executed the task.

required
task_id NotBlankStr

Task that was executed.

required
config ErrorTaxonomyConfig

Error taxonomy configuration.

required
task_repo TaskRepository | None

Optional task repository for TASK_TREE scope.

None
provider BaseCompletionProvider | None

Optional LLM provider for semantic detectors.

None
sinks tuple[ClassificationSink, ...]

Downstream consumers to notify after classification.

()

Returns:

Type Description
ClassificationResult | None

Classification result with findings, or None if disabled.

Source code in src/synthorg/engine/classification/pipeline.py
async def classify_execution_errors(  # noqa: PLR0913
    execution_result: ExecutionResult,
    agent_id: NotBlankStr,
    task_id: NotBlankStr,
    *,
    config: ErrorTaxonomyConfig,
    task_repo: TaskRepository | None = None,
    provider: BaseCompletionProvider | None = None,
    sinks: tuple[ClassificationSink, ...] = (),
) -> ClassificationResult | None:
    """Classify coordination errors from an execution result.

    Discovers detectors from ``config.detectors``, loads
    scope-appropriate context, runs detectors sequentially
    (concurrency happens inside ``CompositeDetector``), and
    dispatches results to registered sinks.

    Rate limiting is handled by the ``BaseCompletionProvider``
    internally -- semantic detectors no longer accept a separate
    rate limiter to avoid double-throttling.

    Returns ``None`` when the taxonomy is disabled.  Never raises;
    all exceptions except ``MemoryError``/``RecursionError`` are
    caught and logged as ``CLASSIFICATION_ERROR``.

    Args:
        execution_result: The completed execution result to analyse.
        agent_id: Agent that executed the task.
        task_id: Task that was executed.
        config: Error taxonomy configuration.
        task_repo: Optional task repository for TASK_TREE scope.
        provider: Optional LLM provider for semantic detectors.
        sinks: Downstream consumers to notify after classification.

    Returns:
        Classification result with findings, or ``None`` if disabled.
    """
    if not config.enabled:
        logger.debug(
            CLASSIFICATION_SKIPPED,
            agent_id=agent_id,
            task_id=task_id,
            reason="error taxonomy disabled",
        )
        return None

    execution_id = execution_result.context.execution_id
    logger.info(
        CLASSIFICATION_START,
        agent_id=agent_id,
        task_id=task_id,
        execution_id=execution_id,
        categories=tuple(c.value for c in config.categories),
    )

    result = await _classify_safely(
        execution_result,
        agent_id,
        task_id,
        execution_id=execution_id,
        config=config,
        task_repo=task_repo,
        provider=provider,
    )
    if result is None:
        return None

    await _dispatch_to_sinks(result, sinks, agent_id, task_id)
    return result

Workspace Isolation

protocol

Workspace isolation strategy protocol.

WorkspaceIsolationStrategy

Bases: Protocol

Protocol for workspace isolation strategies.

Implementations provide the ability to create, merge, and tear down isolated workspaces for concurrent agent execution.

setup_workspace async

setup_workspace(*, request)

Create an isolated workspace for an agent task.

Parameters:

Name Type Description Default
request WorkspaceRequest

Workspace creation request.

required

Returns:

Type Description
Workspace

The created workspace.

Raises:

Type Description
WorkspaceLimitError

When max concurrent worktrees reached.

WorkspaceSetupError

When git operations fail.

Source code in src/synthorg/engine/workspace/protocol.py
async def setup_workspace(
    self,
    *,
    request: WorkspaceRequest,
) -> Workspace:
    """Create an isolated workspace for an agent task.

    Args:
        request: Workspace creation request.

    Returns:
        The created workspace.

    Raises:
        WorkspaceLimitError: When max concurrent worktrees reached.
        WorkspaceSetupError: When git operations fail.
    """
    ...

teardown_workspace async

teardown_workspace(*, workspace)

Remove an isolated workspace and clean up resources.

Parameters:

Name Type Description Default
workspace Workspace

The workspace to tear down.

required

Raises:

Type Description
WorkspaceCleanupError

When git cleanup operations fail.

Source code in src/synthorg/engine/workspace/protocol.py
async def teardown_workspace(
    self,
    *,
    workspace: Workspace,
) -> None:
    """Remove an isolated workspace and clean up resources.

    Args:
        workspace: The workspace to tear down.

    Raises:
        WorkspaceCleanupError: When git cleanup operations fail.
    """
    ...

merge_workspace async

merge_workspace(*, workspace)

Merge a workspace branch back into the base branch.

Merge conflicts are returned as a MergeResult with success=False rather than raised as exceptions.

Parameters:

Name Type Description Default
workspace Workspace

The workspace to merge.

required

Returns:

Type Description
MergeResult

The merge result with conflict details if any.

Raises:

Type Description
WorkspaceMergeError

When checkout or merge abort fails.

Source code in src/synthorg/engine/workspace/protocol.py
async def merge_workspace(
    self,
    *,
    workspace: Workspace,
) -> MergeResult:
    """Merge a workspace branch back into the base branch.

    Merge conflicts are returned as a ``MergeResult`` with
    ``success=False`` rather than raised as exceptions.

    Args:
        workspace: The workspace to merge.

    Returns:
        The merge result with conflict details if any.

    Raises:
        WorkspaceMergeError: When checkout or merge abort fails.
    """
    ...

list_active_workspaces async

list_active_workspaces()

Return all currently active workspaces.

Returns:

Type Description
tuple[Workspace, ...]

Tuple of active workspaces.

Source code in src/synthorg/engine/workspace/protocol.py
async def list_active_workspaces(self) -> tuple[Workspace, ...]:
    """Return all currently active workspaces.

    Returns:
        Tuple of active workspaces.
    """
    ...

get_strategy_type

get_strategy_type()

Return the strategy type identifier.

Returns:

Type Description
str

Strategy type name.

Source code in src/synthorg/engine/workspace/protocol.py
def get_strategy_type(self) -> str:
    """Return the strategy type identifier.

    Returns:
        Strategy type name.
    """
    ...

models

Workspace isolation domain models.

WorkspaceRequest pydantic-model

Bases: BaseModel

Request to create an isolated workspace for an agent task.

Attributes:

Name Type Description
task_id NotBlankStr

Identifier of the task requiring isolation.

agent_id NotBlankStr

Identifier of the agent that will work in the workspace.

base_branch NotBlankStr

Git branch to branch from.

file_scope tuple[NotBlankStr, ...]

Optional file path hints for the workspace.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

task_id pydantic-field

task_id

Task requiring isolation

agent_id pydantic-field

agent_id

Agent working in workspace

base_branch pydantic-field

base_branch = 'main'

Git branch to branch from

file_scope pydantic-field

file_scope = ()

Optional file path hints

Workspace pydantic-model

Bases: BaseModel

An active isolated workspace backed by a git worktree.

Attributes:

Name Type Description
workspace_id NotBlankStr

Unique identifier for this workspace.

task_id NotBlankStr

Task this workspace serves.

agent_id NotBlankStr

Agent operating in this workspace.

branch_name NotBlankStr

Git branch created for this workspace.

worktree_path NotBlankStr

Filesystem path to the worktree directory.

base_branch NotBlankStr

Branch this workspace was created from.

created_at datetime

Timestamp of workspace creation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

workspace_id pydantic-field

workspace_id

Unique workspace ID

task_id pydantic-field

task_id

Task this workspace serves

agent_id pydantic-field

agent_id

Agent operating in workspace

branch_name pydantic-field

branch_name

Git branch for this workspace

worktree_path pydantic-field

worktree_path

Filesystem path to worktree

base_branch pydantic-field

base_branch

Branch workspace was created from

created_at pydantic-field

created_at

Workspace creation timestamp

MergeConflict pydantic-model

Bases: BaseModel

A single merge conflict detected during workspace merge.

Attributes:

Name Type Description
file_path NotBlankStr

Path of the conflicting file.

conflict_type ConflictType

Type of conflict (e.g. textual, semantic).

ours_content str

Content from the base branch side.

theirs_content str

Content from the workspace branch side.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_semantic_description

file_path pydantic-field

file_path

Conflicting file path

conflict_type pydantic-field

conflict_type

Type of conflict detected during merge

ours_content pydantic-field

ours_content = ''

Base branch content

theirs_content pydantic-field

theirs_content = ''

Workspace branch content

description pydantic-field

description = ''

Human-readable description of the conflict

MergeResult pydantic-model

Bases: BaseModel

Result of merging a single workspace branch back.

Attributes:

Name Type Description
workspace_id NotBlankStr

Workspace that was merged.

branch_name NotBlankStr

Branch that was merged.

success bool

Whether the merge completed without conflicts.

conflicts tuple[MergeConflict, ...]

Any textual conflicts encountered during merge.

escalation ConflictEscalation | None

Escalation strategy applied, if any.

merged_commit_sha NotBlankStr | None

SHA of the merge commit, if successful.

duration_seconds float

Time taken for the merge operation.

semantic_conflicts tuple[MergeConflict, ...]

Semantic conflicts detected after merge.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_success_consistency

workspace_id pydantic-field

workspace_id

Merged workspace ID

branch_name pydantic-field

branch_name

Merged branch name

success pydantic-field

success

Whether merge succeeded

conflicts pydantic-field

conflicts = ()

Conflicts encountered

escalation pydantic-field

escalation = None

Escalation strategy applied

merged_commit_sha pydantic-field

merged_commit_sha = None

Merge commit SHA if successful

duration_seconds pydantic-field

duration_seconds

Merge duration in seconds

semantic_conflicts pydantic-field

semantic_conflicts = ()

Semantic conflicts detected after successful merge

WorkspaceGroupResult pydantic-model

Bases: BaseModel

Aggregated result of merging a group of workspaces.

Attributes:

Name Type Description
group_id NotBlankStr

Identifier for this merge group.

merge_results tuple[MergeResult, ...]

Individual merge results for each workspace.

duration_seconds float

Total time for the group merge operation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

group_id pydantic-field

group_id

Merge group identifier

merge_results pydantic-field

merge_results = ()

Individual merge results

duration_seconds pydantic-field

duration_seconds

Total merge duration in seconds

all_merged property

all_merged

Return True only if every workspace merged without conflict.

total_conflicts property

total_conflicts

Sum of conflicts from all merge results.

total_semantic_conflicts property

total_semantic_conflicts

Sum of semantic conflicts from all merge results.

service

Workspace isolation service.

High-level service that coordinates workspace lifecycle: setup, merge, and teardown for groups of agent workspaces.

WorkspaceIsolationService

WorkspaceIsolationService(*, strategy, config, clock=None)

Service for managing workspace isolation lifecycle.

Coordinates creating, merging, and tearing down workspaces for groups of concurrent agent tasks.

Parameters:

Name Type Description Default
strategy WorkspaceIsolationStrategy

Workspace isolation strategy implementation.

required
config WorkspaceIsolationConfig

Workspace isolation configuration.

required
Source code in src/synthorg/engine/workspace/service.py
def __init__(
    self,
    *,
    strategy: WorkspaceIsolationStrategy,
    config: WorkspaceIsolationConfig,
    clock: Clock | None = None,
) -> None:
    self._clock: Clock = clock if clock is not None else SystemClock()
    self._strategy = strategy
    self._config = config
    pw = config.planner_worktrees
    self._merge_orchestrator = MergeOrchestrator(
        strategy=strategy,
        merge_order=pw.merge_order,
        conflict_escalation=pw.conflict_escalation,
        cleanup_on_merge=pw.cleanup_on_merge,
        clock=self._clock,
    )

setup_group async

setup_group(*, requests)

Create workspaces for a group of agent tasks.

Rolls back all already-created workspaces if any setup fails.

Parameters:

Name Type Description Default
requests tuple[WorkspaceRequest, ...]

Workspace creation requests.

required

Returns:

Type Description
tuple[Workspace, ...]

Tuple of created workspaces.

Raises:

Type Description
WorkspaceLimitError

When max concurrent worktrees reached.

WorkspaceSetupError

When git operations fail.

Source code in src/synthorg/engine/workspace/service.py
async def setup_group(
    self,
    *,
    requests: tuple[WorkspaceRequest, ...],
) -> tuple[Workspace, ...]:
    """Create workspaces for a group of agent tasks.

    Rolls back all already-created workspaces if any setup fails.

    Args:
        requests: Workspace creation requests.

    Returns:
        Tuple of created workspaces.

    Raises:
        WorkspaceLimitError: When max concurrent worktrees reached.
        WorkspaceSetupError: When git operations fail.
    """
    logger.info(
        WORKSPACE_GROUP_SETUP_START,
        count=len(requests),
    )

    workspaces: list[Workspace] = []
    try:
        for request in requests:
            ws = await self._strategy.setup_workspace(
                request=request,
            )
            workspaces.append(ws)
    except MemoryError, RecursionError:
        raise
    except Exception as exc:
        # Catch ``Exception`` so any setup failure -- not just the
        # documented ``WorkspaceLimitError`` / ``WorkspaceSetupError``
        # -- triggers rollback. Without this fallback an
        # unexpected error after partial setup would leak the
        # already-created workspaces.
        logger.warning(
            WORKSPACE_GROUP_SETUP_FAILED,
            count=len(requests),
            created=len(workspaces),
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        await self._rollback_workspaces(workspaces)
        raise

    logger.info(
        WORKSPACE_GROUP_SETUP_COMPLETE,
        count=len(workspaces),
    )
    return tuple(workspaces)

merge_group async

merge_group(*, workspaces)

Merge all workspaces and return aggregated result.

Parameters:

Name Type Description Default
workspaces tuple[Workspace, ...]

Workspaces to merge.

required

Returns:

Type Description
WorkspaceGroupResult

Aggregated merge result for the group.

Raises:

Type Description
WorkspaceMergeError

When a merge operation fails fatally.

Source code in src/synthorg/engine/workspace/service.py
async def merge_group(
    self,
    *,
    workspaces: tuple[Workspace, ...],
) -> WorkspaceGroupResult:
    """Merge all workspaces and return aggregated result.

    Args:
        workspaces: Workspaces to merge.

    Returns:
        Aggregated merge result for the group.

    Raises:
        WorkspaceMergeError: When a merge operation fails fatally.
    """
    start = self._clock.monotonic()
    merge_results = await self._merge_orchestrator.merge_all(
        workspaces=workspaces,
    )
    elapsed = self._clock.monotonic() - start

    return WorkspaceGroupResult(
        group_id=str(uuid4()),
        merge_results=merge_results,
        duration_seconds=elapsed,
    )

teardown_group async

teardown_group(*, workspaces)

Tear down all workspaces in a group.

Uses best-effort teardown: attempts all workspaces even if some fail, then raises a combined error.

Parameters:

Name Type Description Default
workspaces tuple[Workspace, ...]

Workspaces to tear down.

required

Raises:

Type Description
WorkspaceCleanupError

When any teardown operation fails.

Source code in src/synthorg/engine/workspace/service.py
async def teardown_group(
    self,
    *,
    workspaces: tuple[Workspace, ...],
) -> None:
    """Tear down all workspaces in a group.

    Uses best-effort teardown: attempts all workspaces even if
    some fail, then raises a combined error.

    Args:
        workspaces: Workspaces to tear down.

    Raises:
        WorkspaceCleanupError: When any teardown operation fails.
    """
    logger.info(
        WORKSPACE_GROUP_TEARDOWN_START,
        count=len(workspaces),
    )

    errors: list[str] = []
    for workspace in workspaces:
        try:
            await self._strategy.teardown_workspace(
                workspace=workspace,
            )
        except MemoryError, RecursionError:
            raise
        except Exception as exc:
            # The ``errors`` list flows into
            # ``WorkspaceCleanupError`` which callers may log as
            # a message; raw ``exc`` text could leak DB
            # credentials or container ids. Use the same
            # scrubbed string as the warning log below.
            errors.append(
                f"workspace {workspace.workspace_id}: "
                f"{safe_error_description(exc)}",
            )
            logger.warning(
                WORKSPACE_TEARDOWN_FAILED,
                workspace_id=workspace.workspace_id,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )

    logger.info(
        WORKSPACE_GROUP_TEARDOWN_COMPLETE,
        count=len(workspaces),
        failures=len(errors),
    )

    if errors:
        msg = f"Failed to tear down {len(errors)} workspace(s): {'; '.join(errors)}"
        raise WorkspaceCleanupError(msg)