Skip to content

Engine

Agent orchestration, execution loops, task decomposition, routing, and parallel execution.

Agent Engine

agent_engine

Agent engine -- top-level orchestrator.

Ties together prompt construction, execution context, execution loop, tool invocation, and budget tracking into a single run() entry point.

AgentEngine

AgentEngine(
    *,
    provider,
    execution_loop=None,
    tool_registry=None,
    cost_tracker=None,
    recovery_strategy=_DEFAULT_RECOVERY_STRATEGY,
    shutdown_checker=None,
    error_taxonomy_config=None,
    budget_enforcer=None,
    security_config=None,
    approval_store=None,
    parked_context_repo=None,
    task_engine=None,
    checkpoint_repo=None,
    heartbeat_repo=None,
    checkpoint_config=None,
    coordinator=None,
    stagnation_detector=None,
    auto_loop_config=None,
    hybrid_loop_config=None,
    compaction_callback=None,
    plan_execute_config=None,
    provider_registry=None,
    tool_invocation_tracker=None,
    memory_injection_strategy=None,
    procedural_memory_config=None,
    memory_backend=None,
    config_resolver=None,
)

Top-level orchestrator for agent execution.

Builds the system prompt, creates an execution context, delegates to the configured ExecutionLoop, and returns an AgentRunResult with full metadata.

Parameters:

Name Type Description Default
provider CompletionProvider

LLM completion provider (required).

required
execution_loop ExecutionLoop | None

Static execution loop. Defaults to ReactLoop(). Mutually exclusive with auto_loop_config.

None
tool_registry ToolRegistry | None

Optional tools available to the agent.

None
cost_tracker CostTracker | None

Falls back to budget_enforcer.cost_tracker when None and budget_enforcer is provided. Must match budget_enforcer.cost_tracker if both supplied.

None
recovery_strategy RecoveryStrategy | None

Defaults to FailAndReassignStrategy.

_DEFAULT_RECOVERY_STRATEGY
shutdown_checker ShutdownChecker | None

Returns True for graceful shutdown.

None
error_taxonomy_config ErrorTaxonomyConfig | None

Post-execution error classification.

None
budget_enforcer BudgetEnforcer | None

Pre-flight checks, auto-downgrade, and enhanced in-flight budget checking.

None
security_config SecurityConfig | None

Optional security subsystem configuration.

None
approval_store ApprovalStore | None

Optional approval queue store.

None
parked_context_repo ParkedContextRepository | None

Optional repository for parking execution contexts during approval escalation.

None
task_engine TaskEngine | None

Optional centralized task engine for real-time status sync (incremental transitions at each lifecycle point, best-effort).

None
checkpoint_repo CheckpointRepository | None

Optional checkpoint repository for persisting execution state at turn boundaries. Must be paired with heartbeat_repo.

None
heartbeat_repo HeartbeatRepository | None

Optional heartbeat repository for crash detection during execution. Must be paired with checkpoint_repo.

None
checkpoint_config CheckpointConfig | None

Checkpoint tuning (interval, max size). Defaults to CheckpointConfig().

None
coordinator MultiAgentCoordinator | None

Optional multi-agent coordinator for delegated coordination via :meth:coordinate.

None
stagnation_detector StagnationDetector | None

Optional detector for repetitive tool-call patterns. Wired into the execution loop when using auto-selection or the default loop.

None
auto_loop_config AutoLoopConfig | None

Optional auto-loop selection configuration. Selects the execution loop per-task based on complexity and budget state. Mutually exclusive with execution_loop.

None
hybrid_loop_config HybridLoopConfig | None

Optional configuration for the hybrid plan+ReAct loop. Passed to build_execution_loop when auto-selection picks "hybrid".

None
compaction_callback CompactionCallback | None

Optional async callback invoked at turn boundaries to compress older conversation turns. Passed to the execution loop (both static default and auto-selected). When execution_loop is provided directly, the caller is responsible for wiring this callback into the loop.

None
plan_execute_config PlanExecuteConfig | None

Optional configuration for the plan-execute loop. Passed to build_execution_loop when auto-selection picks "plan_execute".

None
provider_registry ProviderRegistry | None

Optional registry of completion providers. Used for runtime provider CRUD and model discovery.

None
tool_invocation_tracker ToolInvocationTracker | None

Optional tracker for recording tool invocations in the activity timeline. Passed through to each ToolInvoker created by _make_tool_invoker.

None
memory_injection_strategy MemoryInjectionStrategy | None

Optional memory injection strategy. When a ToolBasedInjectionStrategy is provided, memory tools (search_memory, recall_memory) are registered in the ToolRegistry for each agent execution.

None
procedural_memory_config ProceduralMemoryConfig | None

Optional configuration for procedural memory auto-generation from agent failures. When set (and memory_backend is also provided), a proposer LLM call analyses failures and stores procedural memory entries.

None
memory_backend MemoryBackend | None

Optional memory backend for storing procedural memory entries. When omitted, procedural memory generation is silently skipped even if procedural_memory_config is set.

None
config_resolver ConfigResolver | None

Optional settings resolver for reading runtime ENGINE settings (personality trimming controls). When None, built-in defaults are used.

None
Source code in src/synthorg/engine/agent_engine.py
def __init__(  # noqa: PLR0913
    self,
    *,
    provider: CompletionProvider,
    execution_loop: ExecutionLoop | None = None,
    tool_registry: ToolRegistry | None = None,
    cost_tracker: CostTracker | None = None,
    recovery_strategy: RecoveryStrategy | None = _DEFAULT_RECOVERY_STRATEGY,
    shutdown_checker: ShutdownChecker | None = None,
    error_taxonomy_config: ErrorTaxonomyConfig | None = None,
    budget_enforcer: BudgetEnforcer | None = None,
    security_config: SecurityConfig | None = None,
    approval_store: ApprovalStore | None = None,
    parked_context_repo: ParkedContextRepository | None = None,
    task_engine: TaskEngine | None = None,
    checkpoint_repo: CheckpointRepository | None = None,
    heartbeat_repo: HeartbeatRepository | None = None,
    checkpoint_config: CheckpointConfig | None = None,
    coordinator: MultiAgentCoordinator | None = None,
    stagnation_detector: StagnationDetector | None = None,
    auto_loop_config: AutoLoopConfig | None = None,
    hybrid_loop_config: HybridLoopConfig | None = None,
    compaction_callback: CompactionCallback | None = None,
    plan_execute_config: PlanExecuteConfig | None = None,
    provider_registry: ProviderRegistry | None = None,
    tool_invocation_tracker: ToolInvocationTracker | None = None,
    memory_injection_strategy: MemoryInjectionStrategy | None = None,
    procedural_memory_config: ProceduralMemoryConfig | None = None,
    memory_backend: MemoryBackend | None = None,
    config_resolver: ConfigResolver | None = None,
) -> None:
    if execution_loop is not None and auto_loop_config is not None:
        msg = "execution_loop and auto_loop_config are mutually exclusive"
        logger.warning(
            EXECUTION_ENGINE_ERROR,
            reason=msg,
        )
        raise ValueError(msg)
    self._provider = provider
    self._provider_registry = provider_registry
    self._approval_store = approval_store
    self._parked_context_repo = parked_context_repo
    self._stagnation_detector = stagnation_detector
    self._auto_loop_config = auto_loop_config
    self._hybrid_loop_config = hybrid_loop_config
    self._compaction_callback = compaction_callback
    self._plan_execute_config = plan_execute_config
    self._approval_gate = self._make_approval_gate()
    if execution_loop is not None and (
        self._approval_gate is not None
        or self._stagnation_detector is not None
        or self._compaction_callback is not None
    ):
        logger.warning(
            APPROVAL_GATE_LOOP_WIRING_WARNING,
            note=(
                "execution_loop provided externally -- approval_gate, "
                "stagnation_detector, and compaction_callback will NOT "
                "be wired automatically. Configure the loop with "
                "approval_gate=, stagnation_detector=, and "
                "compaction_callback= explicitly."
            ),
        )
    self._loop: ExecutionLoop = execution_loop or self._make_default_loop()
    self._tool_registry = tool_registry
    self._budget_enforcer = budget_enforcer
    if (checkpoint_repo is None) != (heartbeat_repo is None):
        msg = (
            "checkpoint_repo and heartbeat_repo must both be "
            "provided or both omitted"
        )
        raise ValueError(msg)
    self._checkpoint_repo = checkpoint_repo
    self._heartbeat_repo = heartbeat_repo
    self._checkpoint_config = checkpoint_config or CheckpointConfig()
    self._cost_tracker: CostTracker | None
    if budget_enforcer is not None:
        if (
            cost_tracker is not None
            and cost_tracker is not budget_enforcer.cost_tracker
        ):
            msg = (
                "cost_tracker must match budget_enforcer.cost_tracker "
                "when budget_enforcer is provided"
            )
            raise ValueError(msg)
        self._cost_tracker = budget_enforcer.cost_tracker
    else:
        self._cost_tracker = cost_tracker
    self._security_config = security_config
    self._task_engine = task_engine
    self._recovery_strategy = recovery_strategy
    self._shutdown_checker = shutdown_checker
    self._error_taxonomy_config = error_taxonomy_config
    self._coordinator = coordinator
    self._tool_invocation_tracker = tool_invocation_tracker
    self._memory_injection_strategy = memory_injection_strategy
    self._procedural_memory_config = procedural_memory_config
    self._memory_backend = memory_backend
    self._config_resolver = config_resolver
    self._procedural_proposer: ProceduralMemoryProposer | None = None
    if (
        procedural_memory_config is not None
        and procedural_memory_config.enabled
        and memory_backend is not None
    ):
        self._procedural_proposer = ProceduralMemoryProposer(
            provider=provider,
            config=procedural_memory_config,
        )
    self._audit_log = AuditLog()
    logger.debug(
        EXECUTION_ENGINE_CREATED,
        loop_type=(
            "auto"
            if self._auto_loop_config is not None
            else self._loop.get_loop_type()
        ),
        has_tool_registry=self._tool_registry is not None,
        has_cost_tracker=self._cost_tracker is not None,
        has_budget_enforcer=self._budget_enforcer is not None,
        has_coordinator=self._coordinator is not None,
        has_compaction_callback=self._compaction_callback is not None,
        has_plan_execute_config=self._plan_execute_config is not None,
        has_hybrid_loop_config=self._hybrid_loop_config is not None,
    )

coordinator property

coordinator

Return the multi-agent coordinator, or None if not configured.

coordinate async

coordinate(context)

Delegate to the multi-agent coordinator.

Parameters:

Name Type Description Default
context CoordinationContext

Coordination context with task, agents, and config.

required

Returns:

Type Description
CoordinationResult

Coordination result with all phase outcomes.

Raises:

Type Description
ExecutionStateError

If no coordinator is configured.

CoordinationPhaseError

When a critical phase fails.

Source code in src/synthorg/engine/agent_engine.py
async def coordinate(
    self,
    context: CoordinationContext,
) -> CoordinationResult:
    """Delegate to the multi-agent coordinator.

    Args:
        context: Coordination context with task, agents, and config.

    Returns:
        Coordination result with all phase outcomes.

    Raises:
        ExecutionStateError: If no coordinator is configured.
        CoordinationPhaseError: When a critical phase fails.
    """
    if self._coordinator is None:
        msg = "No coordinator configured for multi-agent dispatch"
        logger.warning(
            EXECUTION_ENGINE_ERROR,
            error=msg,
        )
        raise ExecutionStateError(msg)
    return await self._coordinator.coordinate(context)

run async

run(
    *,
    identity,
    task,
    completion_config=None,
    max_turns=DEFAULT_MAX_TURNS,
    memory_messages=(),
    timeout_seconds=None,
    effective_autonomy=None,
)

Execute an agent on a task.

Raises:

Type Description
ExecutionStateError

If pre-flight validation fails.

ValueError

If max_turns < 1 or timeout_seconds <= 0.

MemoryError

Re-raised unconditionally (non-recoverable).

RecursionError

Re-raised unconditionally (non-recoverable).

Source code in src/synthorg/engine/agent_engine.py
async def run(  # noqa: PLR0913
    self,
    *,
    identity: AgentIdentity,
    task: Task,
    completion_config: CompletionConfig | None = None,
    max_turns: int = DEFAULT_MAX_TURNS,
    memory_messages: tuple[ChatMessage, ...] = (),
    timeout_seconds: float | None = None,
    effective_autonomy: EffectiveAutonomy | None = None,
) -> AgentRunResult:
    """Execute an agent on a task.

    Raises:
        ExecutionStateError: If pre-flight validation fails.
        ValueError: If ``max_turns < 1`` or ``timeout_seconds <= 0``.
        MemoryError: Re-raised unconditionally (non-recoverable).
        RecursionError: Re-raised unconditionally (non-recoverable).
    """
    agent_id = str(identity.id)
    task_id = task.id

    validate_run_inputs(
        agent_id=agent_id,
        task_id=task_id,
        max_turns=max_turns,
        timeout_seconds=timeout_seconds,
    )
    validate_agent(identity, agent_id)
    validate_task(task, agent_id, task_id)

    with correlation_scope(agent_id=agent_id, task_id=task_id):
        start = time.monotonic()
        ctx: AgentContext | None = None
        system_prompt: SystemPrompt | None = None
        provider: CompletionProvider = self._provider
        try:
            loop_mode = (
                "auto"
                if self._auto_loop_config is not None
                else self._loop.get_loop_type()
            )
            logger.info(
                EXECUTION_ENGINE_START,
                agent_id=agent_id,
                task_id=task_id,
                loop_type=loop_mode,
                max_turns=max_turns,
            )

            # Pre-flight budget enforcement + degradation
            if self._budget_enforcer:
                preflight = await self._budget_enforcer.check_can_execute(
                    agent_id,
                    provider_name=identity.model.provider,
                )
                provider, identity = self._apply_degradation(
                    preflight,
                    identity,
                    provider,
                )
                identity = await self._budget_enforcer.resolve_model(
                    identity,
                )

            tool_invoker = self._make_tool_invoker(
                identity,
                task_id=task_id,
                effective_autonomy=effective_autonomy,
            )
            ctx, system_prompt = await self._prepare_context(
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                max_turns=max_turns,
                memory_messages=memory_messages,
                tool_invoker=tool_invoker,
                effective_autonomy=effective_autonomy,
            )
            return await self._execute(
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                completion_config=completion_config,
                ctx=ctx,
                system_prompt=system_prompt,
                start=start,
                timeout_seconds=timeout_seconds,
                tool_invoker=tool_invoker,
                effective_autonomy=effective_autonomy,
                provider=provider,
            )
        except MemoryError, RecursionError:
            logger.exception(
                EXECUTION_ENGINE_ERROR,
                agent_id=agent_id,
                task_id=task_id,
                error="non-recoverable error in run()",
            )
            raise
        except BudgetExhaustedError as exc:
            return self._handle_budget_error(
                exc=exc,
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                duration_seconds=time.monotonic() - start,
                ctx=ctx,
                system_prompt=system_prompt,
            )
        except Exception as exc:
            return await self._handle_fatal_error(
                exc=exc,
                identity=identity,
                task=task,
                agent_id=agent_id,
                task_id=task_id,
                duration_seconds=time.monotonic() - start,
                ctx=ctx,
                system_prompt=system_prompt,
                completion_config=completion_config,
                effective_autonomy=effective_autonomy,
                provider=provider,
            )

Execution Loop Protocol

loop_protocol

Execution loop protocol and supporting models.

Defines the ExecutionLoop protocol that the agent engine calls to run a task, along with ExecutionResult, TurnRecord, TerminationReason, and the BudgetChecker and ShutdownChecker type aliases.

BudgetChecker module-attribute

BudgetChecker = Callable[[AgentContext], bool]

Callback that returns True when the budget is exhausted.

ShutdownChecker module-attribute

ShutdownChecker = Callable[[], bool]

Callback that returns True when a graceful shutdown has been requested.

TerminationReason

Bases: StrEnum

Why the execution loop terminated.

TurnRecord pydantic-model

Bases: BaseModel

Per-turn metadata recorded during execution.

Attributes:

Name Type Description
turn_number int

1-indexed turn number.

input_tokens int

Input tokens consumed this turn.

output_tokens int

Output tokens generated this turn.

total_tokens int

Sum of input and output tokens (computed).

cost_usd float

Cost in USD (base currency) for this turn.

tool_calls_made tuple[NotBlankStr, ...]

Names of tools invoked this turn.

tool_call_fingerprints tuple[NotBlankStr, ...]

Deterministic fingerprints of tool calls (name:args_hash) for stagnation detection.

finish_reason FinishReason

LLM finish reason for this turn.

call_category LLMCallCategory | None

Optional LLM call category for coordination metrics (productive, coordination, system).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

turn_number pydantic-field

turn_number

1-indexed turn number

input_tokens pydantic-field

input_tokens

Input tokens this turn

output_tokens pydantic-field

output_tokens

Output tokens this turn

cost_usd pydantic-field

cost_usd

Cost in USD (base currency) this turn

tool_calls_made pydantic-field

tool_calls_made = ()

Tool names invoked this turn

tool_call_fingerprints pydantic-field

tool_call_fingerprints = ()

Deterministic fingerprints of tool calls (name:args_hash)

finish_reason pydantic-field

finish_reason

LLM finish reason this turn

call_category pydantic-field

call_category = None

LLM call category (productive, coordination, system)

total_tokens property

total_tokens

Sum of input and output tokens.

ExecutionResult pydantic-model

ExecutionResult(**data)

Bases: BaseModel

Result returned by an execution loop.

Attributes:

Name Type Description
context AgentContext

Final agent context after execution.

termination_reason TerminationReason

Why the loop stopped.

turns tuple[TurnRecord, ...]

Per-turn metadata records.

total_tool_calls int

Total tool calls across all turns (computed).

error_message str | None

Error description when termination_reason is ERROR.

metadata dict[str, object]

Forward-compatible dict for future loop types. Note: frozen=True prevents field reassignment but not in-place mutation of the dict contents; deep-copy at system boundaries per project conventions.

Deep-copy metadata dict at construction boundary.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_error_message
Source code in src/synthorg/engine/loop_protocol.py
def __init__(self, **data: object) -> None:
    """Deep-copy metadata dict at construction boundary."""
    if "metadata" in data and isinstance(data["metadata"], dict):
        data["metadata"] = copy.deepcopy(data["metadata"])
    super().__init__(**data)

context pydantic-field

context

Final agent context

termination_reason pydantic-field

termination_reason

Why the loop stopped

turns pydantic-field

turns = ()

Per-turn metadata

error_message pydantic-field

error_message = None

Error description (when reason is ERROR)

metadata pydantic-field

metadata

Forward-compatible metadata for future loop types

total_tool_calls property

total_tool_calls

Sum of tool calls from all turn records.

ExecutionLoop

Bases: Protocol

Protocol for agent execution loops.

The agent engine calls execute to run a task through the loop. Implementations decide the control flow (ReAct, Plan-and-Execute, etc.) but all return an ExecutionResult with a TerminationReason.

execute async

execute(
    *,
    context,
    provider,
    tool_invoker=None,
    budget_checker=None,
    shutdown_checker=None,
    completion_config=None,
)

Run the execution loop.

Parameters:

Name Type Description Default
context AgentContext

Initial agent context with conversation and identity.

required
provider CompletionProvider

LLM completion provider.

required
tool_invoker ToolInvoker | None

Optional tool invoker for tool execution.

None
budget_checker BudgetChecker | None

Optional callback; returns True when budget is exhausted.

None
shutdown_checker ShutdownChecker | None

Optional callback; returns True when a graceful shutdown has been requested.

None
completion_config CompletionConfig | None

Optional per-execution override for temperature/max_tokens (defaults to identity's model config).

None

Returns:

Type Description
ExecutionResult

Execution result with final context and termination reason.

Source code in src/synthorg/engine/loop_protocol.py
async def execute(  # noqa: PLR0913
    self,
    *,
    context: AgentContext,
    provider: CompletionProvider,
    tool_invoker: ToolInvoker | None = None,
    budget_checker: BudgetChecker | None = None,
    shutdown_checker: ShutdownChecker | None = None,
    completion_config: CompletionConfig | None = None,
) -> ExecutionResult:
    """Run the execution loop.

    Args:
        context: Initial agent context with conversation and identity.
        provider: LLM completion provider.
        tool_invoker: Optional tool invoker for tool execution.
        budget_checker: Optional callback; returns ``True`` when
            budget is exhausted.
        shutdown_checker: Optional callback; returns ``True`` when
            a graceful shutdown has been requested.
        completion_config: Optional per-execution override for
            temperature/max_tokens (defaults to identity's model config).

    Returns:
        Execution result with final context and termination reason.
    """
    ...

get_loop_type

get_loop_type()

Return the loop type identifier (e.g. "react").

Source code in src/synthorg/engine/loop_protocol.py
def get_loop_type(self) -> str:
    """Return the loop type identifier (e.g. ``"react"``)."""
    ...

make_budget_checker

make_budget_checker(task)

Create a budget checker if the task has a positive budget limit.

The returned callable returns True when accumulated cost meets or exceeds the limit (budget exhausted), False otherwise. Returns None when there is no positive budget limit.

Source code in src/synthorg/engine/loop_protocol.py
def make_budget_checker(task: Task) -> BudgetChecker | None:
    """Create a budget checker if the task has a positive budget limit.

    The returned callable returns ``True`` when accumulated cost meets
    or exceeds the limit (budget exhausted), ``False`` otherwise.
    Returns ``None`` when there is no positive budget limit.
    """
    if task.budget_limit <= 0:
        return None

    limit = task.budget_limit

    def _check(ctx: AgentContext) -> bool:
        return ctx.accumulated_cost.cost_usd >= limit

    return _check

ReAct Loop

react_loop

ReAct execution loop -- think, act, observe.

Implements the ExecutionLoop protocol using the ReAct pattern: check shutdown -> check budget -> call LLM -> record turn -> check for LLM errors -> update context -> handle completion or (check shutdown -> execute tools) -> repeat.

ReactLoop

ReactLoop(
    checkpoint_callback=None,
    *,
    approval_gate=None,
    stagnation_detector=None,
    compaction_callback=None,
)

ReAct execution loop: reason, act, observe.

The loop checks for shutdown, checks the budget, calls the LLM, checks for termination conditions, executes any requested tools, feeds results back, and repeats until the LLM signals completion, the turn limit is reached, the budget is exhausted, a shutdown is requested, or an error occurs.

Parameters:

Name Type Description Default
checkpoint_callback CheckpointCallback | None

Optional async callback invoked after each completed turn; the callback itself decides whether to persist.

None
approval_gate ApprovalGate | None

Optional gate that checks for pending escalations after tool execution and parks the agent when approval is required. None disables approval checks.

None
stagnation_detector StagnationDetector | None

Optional detector that checks for repetitive tool-call patterns and intervenes with corrective prompts or early termination. None disables stagnation detection.

None
compaction_callback CompactionCallback | None

Optional async callback invoked at turn boundaries to compress older conversation turns when the context fill level is high. None disables compaction.

None
Source code in src/synthorg/engine/react_loop.py
def __init__(
    self,
    checkpoint_callback: CheckpointCallback | None = None,
    *,
    approval_gate: ApprovalGate | None = None,
    stagnation_detector: StagnationDetector | None = None,
    compaction_callback: CompactionCallback | None = None,
) -> None:
    self._checkpoint_callback = checkpoint_callback
    self._approval_gate = approval_gate
    self._stagnation_detector = stagnation_detector
    self._compaction_callback = compaction_callback

approval_gate property

approval_gate

Return the approval gate, or None.

stagnation_detector property

stagnation_detector

Return the stagnation detector, or None.

compaction_callback property

compaction_callback

Return the compaction callback, or None.

get_loop_type

get_loop_type()

Return the loop type identifier.

Source code in src/synthorg/engine/react_loop.py
def get_loop_type(self) -> str:
    """Return the loop type identifier."""
    return "react"

execute async

execute(
    *,
    context,
    provider,
    tool_invoker=None,
    budget_checker=None,
    shutdown_checker=None,
    completion_config=None,
)

Run the ReAct loop until termination.

Parameters:

Name Type Description Default
context AgentContext

Initial agent context with conversation.

required
provider CompletionProvider

LLM completion provider.

required
tool_invoker ToolInvoker | None

Optional tool invoker for tool execution.

None
budget_checker BudgetChecker | None

Optional budget exhaustion callback.

None
shutdown_checker ShutdownChecker | None

Optional callback; returns True when a graceful shutdown has been requested.

None
completion_config CompletionConfig | None

Optional per-execution config override.

None

Returns:

Type Description
ExecutionResult

Execution result with final context and termination info.

Raises:

Type Description
MemoryError

Re-raised unconditionally (non-recoverable).

RecursionError

Re-raised unconditionally (non-recoverable).

Source code in src/synthorg/engine/react_loop.py
async def execute(  # noqa: PLR0913
    self,
    *,
    context: AgentContext,
    provider: CompletionProvider,
    tool_invoker: ToolInvoker | None = None,
    budget_checker: BudgetChecker | None = None,
    shutdown_checker: ShutdownChecker | None = None,
    completion_config: CompletionConfig | None = None,
) -> ExecutionResult:
    """Run the ReAct loop until termination.

    Args:
        context: Initial agent context with conversation.
        provider: LLM completion provider.
        tool_invoker: Optional tool invoker for tool execution.
        budget_checker: Optional budget exhaustion callback.
        shutdown_checker: Optional callback; returns ``True`` when
            a graceful shutdown has been requested.
        completion_config: Optional per-execution config override.

    Returns:
        Execution result with final context and termination info.

    Raises:
        MemoryError: Re-raised unconditionally (non-recoverable).
        RecursionError: Re-raised unconditionally (non-recoverable).
    """
    model_id, config, tool_defs, turns = self._prepare_loop(
        context, completion_config, tool_invoker
    )
    ctx = context
    corrections_injected = 0

    while ctx.has_turns_remaining:
        shutdown_result = check_shutdown(ctx, shutdown_checker, turns)
        if shutdown_result is not None:
            return shutdown_result

        budget_result = check_budget(ctx, budget_checker, turns)
        if budget_result is not None:
            return budget_result

        turn_number = ctx.turn_count + 1
        response = await call_provider(
            ctx,
            provider,
            model_id,
            tool_defs,
            config,
            turn_number,
            turns,
        )
        if isinstance(response, ExecutionResult):
            return response

        turns.append(
            make_turn_record(
                turn_number,
                response,
                call_category=LLMCallCategory.PRODUCTIVE,
            )
        )

        result = await self._process_turn_response(
            ctx,
            response,
            turn_number,
            turns,
            tool_invoker,
            shutdown_checker,
        )
        if isinstance(result, ExecutionResult):
            return result
        ctx = result

        # Stagnation detection after successful turn processing
        stag_outcome = await check_stagnation(
            ctx,
            self._stagnation_detector,
            turns,
            corrections_injected,
            execution_id=ctx.execution_id,
        )
        if isinstance(stag_outcome, ExecutionResult):
            return stag_outcome
        if isinstance(stag_outcome, tuple):
            ctx, corrections_injected = stag_outcome

        # Context compaction at turn boundaries
        compacted = await invoke_compaction(
            ctx,
            self._compaction_callback,
            turn_number,
        )
        if compacted is not None:
            ctx = compacted

    logger.info(
        EXECUTION_LOOP_TERMINATED,
        execution_id=ctx.execution_id,
        reason=TerminationReason.MAX_TURNS.value,
        turns=len(turns),
    )
    return build_result(ctx, TerminationReason.MAX_TURNS, turns)

Plan-and-Execute Loop

plan_execute_loop

Plan-and-Execute execution loop.

Implements the ExecutionLoop protocol using a two-phase approach: 1. Plan -- ask the LLM to decompose the task into ordered steps. Planning calls pass tools=None (no tool access during planning). 2. Execute -- run each step via a mini-ReAct sub-loop with tools.

Re-planning is triggered when a step fails, up to a configurable limit. When re-planning is exhausted, the loop terminates with ERROR.

PlanExecuteLoop

PlanExecuteLoop(
    config=None,
    checkpoint_callback=None,
    *,
    approval_gate=None,
    stagnation_detector=None,
    compaction_callback=None,
)

Plan-and-Execute execution loop.

Decomposes a task into steps via LLM planning, then executes each step with a mini-ReAct sub-loop. Supports re-planning on failure.

Parameters:

Name Type Description Default
config PlanExecuteConfig | None

Loop configuration. Defaults to PlanExecuteConfig().

None
checkpoint_callback CheckpointCallback | None

Optional per-turn checkpoint callback.

None
approval_gate ApprovalGate | None

Optional gate that checks for pending escalations after tool execution and parks the agent when approval is required. None disables approval checks.

None
stagnation_detector StagnationDetector | None

Optional detector that checks for repetitive tool-call patterns within each step and intervenes with corrective prompts or early termination. None disables stagnation detection.

None
compaction_callback CompactionCallback | None

Optional async callback invoked at turn boundaries to compress older conversation turns when the context fill level is high. None disables compaction.

None
Source code in src/synthorg/engine/plan_execute_loop.py
def __init__(
    self,
    config: PlanExecuteConfig | None = None,
    checkpoint_callback: CheckpointCallback | None = None,
    *,
    approval_gate: ApprovalGate | None = None,
    stagnation_detector: StagnationDetector | None = None,
    compaction_callback: CompactionCallback | None = None,
) -> None:
    self._config = config or PlanExecuteConfig()
    self._checkpoint_callback = checkpoint_callback
    self._approval_gate = approval_gate
    self._stagnation_detector = stagnation_detector
    self._compaction_callback = compaction_callback

config property

config

Return the loop configuration.

approval_gate property

approval_gate

Return the approval gate, or None.

stagnation_detector property

stagnation_detector

Return the stagnation detector, or None.

compaction_callback property

compaction_callback

Return the compaction callback, or None.

get_loop_type

get_loop_type()

Return the loop type identifier.

Source code in src/synthorg/engine/plan_execute_loop.py
def get_loop_type(self) -> str:
    """Return the loop type identifier."""
    return "plan_execute"

execute async

execute(
    *,
    context,
    provider,
    tool_invoker=None,
    budget_checker=None,
    shutdown_checker=None,
    completion_config=None,
)

Run the Plan-and-Execute loop until termination.

Parameters:

Name Type Description Default
context AgentContext

Initial agent context with conversation.

required
provider CompletionProvider

LLM completion provider.

required
tool_invoker ToolInvoker | None

Optional tool invoker for tool execution.

None
budget_checker BudgetChecker | None

Optional budget exhaustion callback.

None
shutdown_checker ShutdownChecker | None

Optional callback; returns True when a graceful shutdown has been requested.

None
completion_config CompletionConfig | None

Optional per-execution config override.

None

Returns:

Type Description
ExecutionResult

Execution result with final context and termination info.

Raises:

Type Description
MemoryError

Re-raised unconditionally (non-recoverable).

RecursionError

Re-raised unconditionally (non-recoverable).

Source code in src/synthorg/engine/plan_execute_loop.py
async def execute(  # noqa: PLR0913
    self,
    *,
    context: AgentContext,
    provider: CompletionProvider,
    tool_invoker: ToolInvoker | None = None,
    budget_checker: BudgetChecker | None = None,
    shutdown_checker: ShutdownChecker | None = None,
    completion_config: CompletionConfig | None = None,
) -> ExecutionResult:
    """Run the Plan-and-Execute loop until termination.

    Args:
        context: Initial agent context with conversation.
        provider: LLM completion provider.
        tool_invoker: Optional tool invoker for tool execution.
        budget_checker: Optional budget exhaustion callback.
        shutdown_checker: Optional callback; returns ``True`` when
            a graceful shutdown has been requested.
        completion_config: Optional per-execution config override.

    Returns:
        Execution result with final context and termination info.

    Raises:
        MemoryError: Re-raised unconditionally (non-recoverable).
        RecursionError: Re-raised unconditionally (non-recoverable).
    """
    logger.info(
        EXECUTION_LOOP_START,
        execution_id=context.execution_id,
        loop_type=self.get_loop_type(),
        max_turns=context.max_turns,
    )

    ctx = context
    default_model = ctx.identity.model.model_id
    planner_model = self._config.planner_model or default_model
    executor_model = self._config.executor_model or default_model
    default_config = completion_config or CompletionConfig(
        temperature=ctx.identity.model.temperature,
        max_tokens=ctx.identity.model.max_tokens,
    )
    tool_defs = get_tool_definitions(tool_invoker)
    turns: list[TurnRecord] = []
    all_plans: list[ExecutionPlan] = []
    replans_used = 0

    # Phase 1: Planning
    plan_result = await self._run_planning_phase(
        ctx,
        provider,
        planner_model,
        default_config,
        turns,
        shutdown_checker,
        budget_checker,
    )
    if isinstance(plan_result, ExecutionResult):
        return self._finalize(plan_result, all_plans, replans_used)
    ctx, plan = plan_result
    all_plans.append(plan)

    # Phase 2: Execute steps
    return await self._run_steps(
        ctx,
        provider,
        executor_model,
        default_config,
        tool_defs,
        tool_invoker,
        plan,
        turns,
        all_plans,
        replans_used,
        planner_model,
        budget_checker,
        shutdown_checker,
    )

Plan Models

plan_models

Data models for the Plan-and-Execute execution loop.

Defines the plan structure (steps, status, revisions) and the configuration model for the plan-execute loop.

StepStatus

Bases: StrEnum

Execution status of a plan step.

PlanStep pydantic-model

Bases: BaseModel

A single step within an execution plan.

Attributes:

Name Type Description
step_number int

1-indexed position in the plan.

description NotBlankStr

What this step should accomplish.

expected_outcome NotBlankStr

The anticipated result of this step.

status StepStatus

Current execution status of this step.

actual_outcome NotBlankStr | None

Observed result after execution (if any).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

step_number pydantic-field

step_number

1-indexed step number

description pydantic-field

description

Step description

expected_outcome pydantic-field

expected_outcome

Anticipated result of this step

status pydantic-field

status = PENDING

Current execution status

actual_outcome pydantic-field

actual_outcome = None

Observed result after execution

ExecutionPlan pydantic-model

Bases: BaseModel

An ordered sequence of plan steps for task execution.

Attributes:

Name Type Description
steps tuple[PlanStep, ...]

Ordered tuple of plan steps.

revision_number int

Plan revision counter (0 = original).

original_task_summary NotBlankStr

Brief summary of the task being planned.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_sequential_step_numbers

steps pydantic-field

steps

Ordered plan steps

revision_number pydantic-field

revision_number = 0

Plan revision counter (0 = original)

original_task_summary pydantic-field

original_task_summary

Brief summary of the task being planned

PlanExecuteConfig pydantic-model

Bases: BaseModel

Configuration for the Plan-and-Execute loop.

Attributes:

Name Type Description
planner_model NotBlankStr | None

Model override for plan generation. None uses the agent's default model.

executor_model NotBlankStr | None

Model override for step execution. None uses the agent's default model.

max_replans int

Maximum number of re-planning attempts on step failure.

Config:

  • frozen: True
  • extra: forbid
  • allow_inf_nan: False

Fields:

planner_model pydantic-field

planner_model = None

Model override for plan generation (None = agent default)

executor_model pydantic-field

executor_model = None

Model override for step execution (None = agent default)

max_replans pydantic-field

max_replans = 3

Maximum number of re-planning attempts

Execution Context

context

Agent execution context.

Wraps an AgentIdentity (frozen config) with evolving runtime state (conversation, cost, turn count, task execution) using model_copy(update=...) for cheap, immutable state transitions.

DEFAULT_MAX_TURNS module-attribute

DEFAULT_MAX_TURNS = 20

Default hard limit on LLM turns per agent execution.

AgentContextSnapshot pydantic-model

Bases: BaseModel

Compact frozen snapshot of an AgentContext for reporting.

Attributes:

Name Type Description
execution_id NotBlankStr

Unique execution run identifier.

agent_id NotBlankStr

Agent identifier (string form of UUID).

task_id NotBlankStr | None

Task identifier, if a task is active.

turn_count int

Number of turns completed.

accumulated_cost TokenUsage

Running cost totals.

task_status TaskStatus | None

Current task status, if a task is active.

started_at AwareDatetime

When the execution began.

snapshot_at AwareDatetime

When this snapshot was taken.

message_count int

Number of messages in the conversation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_task_pair

execution_id pydantic-field

execution_id

Unique execution identifier

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id = None

Task identifier

turn_count pydantic-field

turn_count

Turns completed

accumulated_cost pydantic-field

accumulated_cost

Running cost totals

task_status pydantic-field

task_status = None

Current task status

started_at pydantic-field

started_at

Execution start time

snapshot_at pydantic-field

snapshot_at

When snapshot was taken

message_count pydantic-field

message_count

Messages in conversation

context_fill_tokens pydantic-field

context_fill_tokens = 0

Estimated context fill tokens

context_fill_percent pydantic-field

context_fill_percent = None

Context fill percentage

AgentContext pydantic-model

Bases: BaseModel

Frozen runtime context for agent execution.

All state evolution happens via model_copy(update=...). The context tracks the conversation, accumulated cost, and optionally a TaskExecution for task-bound agent runs.

Attributes:

Name Type Description
execution_id NotBlankStr

Unique identifier for this execution run.

identity AgentIdentity

Frozen agent identity configuration.

task_execution TaskExecution | None

Current task execution state (if any).

conversation tuple[ChatMessage, ...]

Accumulated chat messages.

accumulated_cost TokenUsage

Running token usage and cost totals.

turn_count int

Number of LLM turns completed.

max_turns int

Hard limit on turns before the engine stops.

started_at AwareDatetime

When this execution began.

context_fill_tokens int

Estimated tokens currently in the full context (system prompt + conversation + tool defs).

context_capacity_tokens int | None

Model's max context window tokens, or None when unknown.

compression_metadata CompressionMetadata | None

Metadata about conversation compression, set when compaction has occurred.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

execution_id pydantic-field

execution_id

Unique execution run identifier

identity pydantic-field

identity

Frozen agent identity config

task_execution pydantic-field

task_execution = None

Current task execution state

conversation pydantic-field

conversation = ()

Accumulated conversation messages

accumulated_cost pydantic-field

accumulated_cost = ZERO_TOKEN_USAGE

Running cost totals across all turns

turn_count pydantic-field

turn_count = 0

Turns completed

max_turns pydantic-field

max_turns = DEFAULT_MAX_TURNS

Hard turn limit

started_at pydantic-field

started_at

When execution began

context_fill_tokens pydantic-field

context_fill_tokens = 0

Estimated tokens in the full context

context_capacity_tokens pydantic-field

context_capacity_tokens = None

Model's max context window tokens

compression_metadata pydantic-field

compression_metadata = None

Compression metadata when compacted

context_fill_percent property

context_fill_percent

Percentage of context window currently filled.

Returns None when context capacity is unknown.

has_turns_remaining property

has_turns_remaining

Whether the agent has turns remaining before hitting max_turns.

from_identity classmethod

from_identity(
    identity, *, task=None, max_turns=DEFAULT_MAX_TURNS, context_capacity_tokens=None
)

Create a fresh execution context from an agent identity.

Parameters:

Name Type Description Default
identity AgentIdentity

The frozen agent identity card.

required
task Task | None

Optional task to bind to this execution.

None
max_turns int

Maximum number of LLM turns allowed.

DEFAULT_MAX_TURNS
context_capacity_tokens int | None

Model's max context window tokens, or None when unknown.

None

Returns:

Type Description
AgentContext

New AgentContext ready for execution.

Source code in src/synthorg/engine/context.py
@classmethod
def from_identity(
    cls,
    identity: AgentIdentity,
    *,
    task: Task | None = None,
    max_turns: int = DEFAULT_MAX_TURNS,
    context_capacity_tokens: int | None = None,
) -> AgentContext:
    """Create a fresh execution context from an agent identity.

    Args:
        identity: The frozen agent identity card.
        task: Optional task to bind to this execution.
        max_turns: Maximum number of LLM turns allowed.
        context_capacity_tokens: Model's max context window
            tokens, or ``None`` when unknown.

    Returns:
        New ``AgentContext`` ready for execution.
    """
    task_execution = TaskExecution.from_task(task) if task is not None else None
    context = cls(
        execution_id=str(uuid4()),
        identity=identity,
        task_execution=task_execution,
        max_turns=max_turns,
        started_at=datetime.now(UTC),
        context_capacity_tokens=context_capacity_tokens,
    )
    logger.debug(
        EXECUTION_CONTEXT_CREATED,
        execution_id=context.execution_id,
        agent_id=str(identity.id),
        has_task=task is not None,
    )
    return context

with_message

with_message(msg)

Append a single message to the conversation.

Parameters:

Name Type Description Default
msg ChatMessage

The chat message to append.

required

Returns:

Type Description
AgentContext

New AgentContext with the message appended.

Source code in src/synthorg/engine/context.py
def with_message(self, msg: ChatMessage) -> AgentContext:
    """Append a single message to the conversation.

    Args:
        msg: The chat message to append.

    Returns:
        New ``AgentContext`` with the message appended.
    """
    return self.model_copy(update={"conversation": (*self.conversation, msg)})

with_turn_completed

with_turn_completed(usage, response_msg)

Record a completed turn.

Increments turn count, appends the response message, and accumulates cost on both the context and the task execution (if present).

Parameters:

Name Type Description Default
usage TokenUsage

Token usage from this turn's LLM call.

required
response_msg ChatMessage

The assistant's response message.

required

Returns:

Type Description
AgentContext

New AgentContext with updated state.

Raises:

Type Description
MaxTurnsExceededError

If max_turns has been reached.

Source code in src/synthorg/engine/context.py
def with_turn_completed(
    self,
    usage: TokenUsage,
    response_msg: ChatMessage,
) -> AgentContext:
    """Record a completed turn.

    Increments turn count, appends the response message, and
    accumulates cost on both the context and the task execution
    (if present).

    Args:
        usage: Token usage from this turn's LLM call.
        response_msg: The assistant's response message.

    Returns:
        New ``AgentContext`` with updated state.

    Raises:
        MaxTurnsExceededError: If ``max_turns`` has been reached.
    """
    if not self.has_turns_remaining:
        msg = (
            f"Agent {self.identity.id} exceeded max_turns "
            f"({self.max_turns}) for execution {self.execution_id}"
        )
        logger.error(
            EXECUTION_MAX_TURNS_EXCEEDED,
            execution_id=self.execution_id,
            agent_id=str(self.identity.id),
            max_turns=self.max_turns,
            turn_count=self.turn_count,
        )
        raise MaxTurnsExceededError(msg)
    updates: dict[str, object] = {
        "turn_count": self.turn_count + 1,
        "conversation": (*self.conversation, response_msg),
        "accumulated_cost": add_token_usage(self.accumulated_cost, usage),
    }
    if self.task_execution is not None:
        updates["task_execution"] = self.task_execution.with_cost(usage)

    result = self.model_copy(update=updates)
    logger.info(
        EXECUTION_CONTEXT_TURN,
        execution_id=self.execution_id,
        turn=result.turn_count,
        cost_usd=usage.cost_usd,
    )
    return result

with_context_fill

with_context_fill(fill_tokens)

Update the estimated context fill level.

Parameters:

Name Type Description Default
fill_tokens int

New estimated fill in tokens.

required

Returns:

Type Description
AgentContext

New AgentContext with updated fill level.

Raises:

Type Description
ValueError

If fill_tokens is negative.

Source code in src/synthorg/engine/context.py
def with_context_fill(self, fill_tokens: int) -> AgentContext:
    """Update the estimated context fill level.

    Args:
        fill_tokens: New estimated fill in tokens.

    Returns:
        New ``AgentContext`` with updated fill level.

    Raises:
        ValueError: If ``fill_tokens`` is negative.
    """
    if fill_tokens < 0:
        msg = f"fill_tokens must be >= 0, got {fill_tokens}"
        raise ValueError(msg)
    return self.model_copy(
        update={"context_fill_tokens": fill_tokens},
    )

with_compression

with_compression(metadata, compressed_conversation, fill_tokens)

Replace conversation with a compressed version.

Parameters:

Name Type Description Default
metadata CompressionMetadata

Compression metadata to attach.

required
compressed_conversation tuple[ChatMessage, ...]

The compressed message tuple.

required
fill_tokens int

Updated fill estimate after compression.

required

Returns:

Type Description
AgentContext

New AgentContext with compressed conversation.

Raises:

Type Description
ValueError

If fill_tokens is negative.

Source code in src/synthorg/engine/context.py
def with_compression(
    self,
    metadata: CompressionMetadata,
    compressed_conversation: tuple[ChatMessage, ...],
    fill_tokens: int,
) -> AgentContext:
    """Replace conversation with a compressed version.

    Args:
        metadata: Compression metadata to attach.
        compressed_conversation: The compressed message tuple.
        fill_tokens: Updated fill estimate after compression.

    Returns:
        New ``AgentContext`` with compressed conversation.

    Raises:
        ValueError: If ``fill_tokens`` is negative.
    """
    if fill_tokens < 0:
        msg = f"fill_tokens must be >= 0, got {fill_tokens}"
        raise ValueError(msg)
    return self.model_copy(
        update={
            "conversation": compressed_conversation,
            "compression_metadata": metadata,
            "context_fill_tokens": fill_tokens,
        },
    )

with_task_transition

with_task_transition(target, *, reason='')

Transition the task execution status.

Delegates to :meth:~synthorg.engine.task_execution.TaskExecution.with_transition.

Parameters:

Name Type Description Default
target TaskStatus

The desired target status.

required
reason str

Optional reason for the transition.

''

Returns:

Type Description
AgentContext

New AgentContext with updated task execution.

Raises:

Type Description
ExecutionStateError

If no task execution is set.

ValueError

If the transition is invalid (from validate_transition).

Source code in src/synthorg/engine/context.py
def with_task_transition(
    self,
    target: TaskStatus,
    *,
    reason: str = "",
) -> AgentContext:
    """Transition the task execution status.

    Delegates to
    :meth:`~synthorg.engine.task_execution.TaskExecution.with_transition`.

    Args:
        target: The desired target status.
        reason: Optional reason for the transition.

    Returns:
        New ``AgentContext`` with updated task execution.

    Raises:
        ExecutionStateError: If no task execution is set.
        ValueError: If the transition is invalid (from
            ``validate_transition``).
    """
    if self.task_execution is None:
        msg = "Cannot transition task status: no task execution is set"
        logger.error(
            EXECUTION_CONTEXT_NO_TASK,
            execution_id=self.execution_id,
            agent_id=str(self.identity.id),
            target_status=target.value,
        )
        raise ExecutionStateError(msg)
    try:
        new_execution = self.task_execution.with_transition(target, reason=reason)
    except ValueError:
        logger.warning(
            EXECUTION_CONTEXT_TRANSITION_FAILED,
            execution_id=self.execution_id,
            agent_id=str(self.identity.id),
            target_status=target.value,
            current_status=self.task_execution.status.value,
        )
        raise
    return self.model_copy(update={"task_execution": new_execution})

to_snapshot

to_snapshot()

Create a compact snapshot for reporting and logging.

Returns:

Type Description
AgentContextSnapshot

Frozen AgentContextSnapshot with current state.

Source code in src/synthorg/engine/context.py
def to_snapshot(self) -> AgentContextSnapshot:
    """Create a compact snapshot for reporting and logging.

    Returns:
        Frozen ``AgentContextSnapshot`` with current state.
    """
    te = self.task_execution
    snapshot = AgentContextSnapshot(
        execution_id=self.execution_id,
        agent_id=str(self.identity.id),
        task_id=te.task.id if te is not None else None,
        turn_count=self.turn_count,
        accumulated_cost=self.accumulated_cost,
        task_status=te.status if te is not None else None,
        started_at=self.started_at,
        snapshot_at=datetime.now(UTC),
        message_count=len(self.conversation),
        context_fill_tokens=self.context_fill_tokens,
        context_fill_percent=self.context_fill_percent,
    )
    logger.debug(
        EXECUTION_CONTEXT_SNAPSHOT,
        execution_id=self.execution_id,
    )
    return snapshot

Prompt Builder

prompt

System prompt construction from agent identity and context.

Translates agent configuration (personality, skills, authority, role) into contextually rich system prompts that shape agent behavior during LLM calls.

Non-inferable principle: System prompts should contain only information that agents cannot discover by reading the codebase or environment. Tool definitions, for example, are already delivered via the LLM provider's API tools parameter, so repeating them in the system prompt would increase cost without benefit (per D22, arXiv:2602.11988). The default template therefore omits the Available Tools section. Custom templates may still reference {{ tools }} when explicitly needed.

Example::

from synthorg.engine.prompt import build_system_prompt

prompt = build_system_prompt(agent=agent_identity, task=task)
prompt.content  # rendered system prompt string

SystemPrompt pydantic-model

Bases: BaseModel

Immutable result of system prompt construction.

Attributes:

Name Type Description
content str

Full rendered prompt text.

template_version str

Version of the template that produced this prompt.

estimated_tokens int

Token estimate of the prompt content.

sections tuple[str, ...]

Names of sections included in the prompt.

metadata dict[str, str]

Agent identity metadata (agent_id, name, role, department, level, and optionally profile_tier).

personality_trim_info PersonalityTrimInfo | None

Populated when personality section was trimmed to fit the profile's token budget.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

content pydantic-field

content

Full rendered prompt text

template_version pydantic-field

template_version

Template version that produced this prompt

estimated_tokens pydantic-field

estimated_tokens

Estimated token count of prompt content

sections pydantic-field

sections

Names of sections included in the prompt

metadata pydantic-field

metadata

Agent identity metadata (string-only values; shallow-frozen)

personality_trim_info pydantic-field

personality_trim_info = None

Populated when personality section was trimmed

build_system_prompt

build_system_prompt(
    *,
    agent,
    role=None,
    task=None,
    available_tools=(),
    company=None,
    org_policies=(),
    max_tokens=None,
    custom_template=None,
    token_estimator=None,
    effective_autonomy=None,
    context_budget_indicator=None,
    currency=DEFAULT_CURRENCY,
    model_tier=None,
    personality_trimming_enabled=True,
    max_personality_tokens_override=None,
)

Build a system prompt from agent identity and optional context.

When max_tokens is provided and the prompt exceeds it, optional sections are progressively trimmed (company, task, org_policies).

Parameters:

Name Type Description Default
agent AgentIdentity

Agent identity containing personality, skills, authority.

required
role Role | None

Optional role with description and responsibilities.

None
task Task | None

Optional task context injected into the prompt.

None
available_tools tuple[ToolDefinition, ...]

Tool definitions populated into template context for custom templates only; the default template omits tools per D22 (non-inferable principle).

()
company Company | None

Opt-in. Non-inferable principle recommends omitting unless agents need org-level context they cannot discover.

None
org_policies tuple[str, ...]

Company-wide policy texts to inject into prompt.

()
max_tokens int | None

Token budget; sections are trimmed if exceeded.

None
custom_template str | None

Optional Jinja2 template string override.

None
token_estimator PromptTokenEstimator | None

Custom token estimator (defaults to char/4).

None
effective_autonomy EffectiveAutonomy | None

Resolved autonomy for the current run.

None
context_budget_indicator str | None

Formatted context budget indicator string to inject into the prompt.

None
currency str

ISO 4217 currency code for budget displays (e.g. "USD", "EUR").

DEFAULT_CURRENCY
model_tier ModelTier | None

Model capability tier for prompt profile selection. None defaults to the full (large) profile.

None
personality_trimming_enabled bool

When True (default), the personality section is progressively trimmed if it exceeds the profile's max_personality_tokens.

True
max_personality_tokens_override int | None

When set to a positive value, overrides the profile's max_personality_tokens limit. Values <= 0 are ignored (profile default is used).

None

Returns:

Name Type Description
Immutable SystemPrompt

class:SystemPrompt with rendered content and metadata.

Raises:

Type Description
PromptBuildError

If prompt construction fails.

Source code in src/synthorg/engine/prompt.py
def build_system_prompt(  # noqa: PLR0913
    *,
    agent: AgentIdentity,
    role: Role | None = None,
    task: Task | None = None,
    available_tools: tuple[ToolDefinition, ...] = (),
    company: Company | None = None,
    org_policies: tuple[str, ...] = (),
    max_tokens: int | None = None,
    custom_template: str | None = None,
    token_estimator: PromptTokenEstimator | None = None,
    effective_autonomy: EffectiveAutonomy | None = None,
    context_budget_indicator: str | None = None,
    currency: str = DEFAULT_CURRENCY,
    model_tier: ModelTier | None = None,
    personality_trimming_enabled: bool = True,
    max_personality_tokens_override: int | None = None,
) -> SystemPrompt:
    """Build a system prompt from agent identity and optional context.

    When ``max_tokens`` is provided and the prompt exceeds it, optional
    sections are progressively trimmed (company, task, org_policies).

    Args:
        agent: Agent identity containing personality, skills, authority.
        role: Optional role with description and responsibilities.
        task: Optional task context injected into the prompt.
        available_tools: Tool definitions populated into template context
            for custom templates only; the default template omits tools
            per D22 (non-inferable principle).
        company: Opt-in. Non-inferable principle recommends omitting
            unless agents need org-level context they cannot discover.
        org_policies: Company-wide policy texts to inject into prompt.
        max_tokens: Token budget; sections are trimmed if exceeded.
        custom_template: Optional Jinja2 template string override.
        token_estimator: Custom token estimator (defaults to char/4).
        effective_autonomy: Resolved autonomy for the current run.
        context_budget_indicator: Formatted context budget indicator
            string to inject into the prompt.
        currency: ISO 4217 currency code for budget displays
            (e.g. ``"USD"``, ``"EUR"``).
        model_tier: Model capability tier for prompt profile selection.
            ``None`` defaults to the full (large) profile.
        personality_trimming_enabled: When ``True`` (default), the
            personality section is progressively trimmed if it exceeds
            the profile's ``max_personality_tokens``.
        max_personality_tokens_override: When set to a positive value,
            overrides the profile's ``max_personality_tokens`` limit.
            Values ``<= 0`` are ignored (profile default is used).

    Returns:
        Immutable :class:`SystemPrompt` with rendered content and metadata.

    Raises:
        PromptBuildError: If prompt construction fails.
    """
    _validate_max_tokens(agent, max_tokens)
    _validate_org_policies(agent, org_policies)

    profile = get_prompt_profile(model_tier)
    if max_personality_tokens_override is not None:
        if max_personality_tokens_override > 0:
            profile = profile.model_copy(
                update={"max_personality_tokens": max_personality_tokens_override},
            )
        else:
            logger.warning(
                PROMPT_PROFILE_SELECTED,
                override_ignored=max_personality_tokens_override,
                reason="max_personality_tokens_override must be > 0",
            )
    logger.info(
        PROMPT_PROFILE_SELECTED,
        requested_tier=model_tier,
        selected_tier=profile.tier,
        defaulted=model_tier is None,
        personality_mode=profile.personality_mode,
        autonomy_detail_level=profile.autonomy_detail_level,
    )

    # Advisory only -- issues are logged but never block prompt construction.
    if org_policies:
        try:
            validate_policy_quality(org_policies)
        except MemoryError, RecursionError:
            raise
        except Exception:
            logger.warning(
                PROMPT_POLICY_VALIDATION_FAILED,
                agent_id=str(agent.id),
                exc_info=True,
            )

    logger.info(
        PROMPT_BUILD_START,
        agent_id=str(agent.id),
        agent_name=agent.name,
        has_task=task is not None,
        tool_count=len(available_tools),
        has_company=company is not None,
        has_custom_template=custom_template is not None,
        model_tier=model_tier,
    )

    try:
        estimator = token_estimator or DefaultTokenEstimator()
        template_str = _resolve_template(custom_template)

        result = _render_with_trimming(
            template_str=template_str,
            agent=agent,
            role=role,
            task=task,
            available_tools=available_tools,
            company=company,
            org_policies=org_policies,
            max_tokens=max_tokens,
            estimator=estimator,
            effective_autonomy=effective_autonomy,
            context_budget_indicator=context_budget_indicator,
            currency=currency,
            profile=profile,
            trimming_enabled=personality_trimming_enabled,
        )
    except PromptBuildError:
        raise  # Already logged by inner functions.
    except MemoryError, RecursionError:
        logger.error(
            PROMPT_BUILD_ERROR,
            agent_id=str(agent.id),
            agent_name=agent.name,
            error="non-recoverable error building prompt",
            exc_info=True,
        )
        raise
    except Exception as exc:
        logger.exception(
            PROMPT_BUILD_ERROR,
            agent_id=str(agent.id),
            agent_name=agent.name,
            error=str(exc),
        )
        detail = sanitize_message(str(exc))
        msg = f"Unexpected error building prompt for agent '{agent.name}': {detail}"
        raise PromptBuildError(msg) from exc

    return _log_and_return(agent, result)

build_error_prompt

build_error_prompt(identity, agent_id, system_prompt)

Return the existing system prompt or a minimal error placeholder.

Used by the engine when the execution pipeline fails and a SystemPrompt was never built (or was partially built).

Parameters:

Name Type Description Default
identity AgentIdentity

Agent identity for metadata.

required
agent_id str

String agent identifier.

required
system_prompt SystemPrompt | None

Previously built prompt, or None.

required

Returns:

Type Description
SystemPrompt

The existing prompt if available, else a minimal placeholder.

Source code in src/synthorg/engine/prompt.py
def build_error_prompt(
    identity: AgentIdentity,
    agent_id: str,
    system_prompt: SystemPrompt | None,
) -> SystemPrompt:
    """Return the existing system prompt or a minimal error placeholder.

    Used by the engine when the execution pipeline fails and a
    ``SystemPrompt`` was never built (or was partially built).

    Args:
        identity: Agent identity for metadata.
        agent_id: String agent identifier.
        system_prompt: Previously built prompt, or ``None``.

    Returns:
        The existing prompt if available, else a minimal placeholder.
    """
    if system_prompt is not None:
        return system_prompt
    metadata = {**_build_metadata(identity), "agent_id": agent_id}
    return SystemPrompt(
        content="",
        template_version="error",
        estimated_tokens=0,
        sections=(),
        metadata=metadata,
    )

format_task_instruction

format_task_instruction(task, *, currency=DEFAULT_CURRENCY)

Format a task into a user message for the initial conversation.

Parameters:

Name Type Description Default
task Task

Task to format.

required
currency str

ISO 4217 currency code for budget display.

DEFAULT_CURRENCY

Returns:

Type Description
str

Markdown-formatted task instruction string.

Source code in src/synthorg/engine/prompt.py
def format_task_instruction(
    task: Task,
    *,
    currency: str = DEFAULT_CURRENCY,
) -> str:
    """Format a task into a user message for the initial conversation.

    Args:
        task: Task to format.
        currency: ISO 4217 currency code for budget display.

    Returns:
        Markdown-formatted task instruction string.
    """
    parts = [f"# Task: {task.title}", "", task.description]

    if task.acceptance_criteria:
        parts.append("")
        parts.append("## Acceptance Criteria")
        parts.extend(f"- {c.description}" for c in task.acceptance_criteria)

    if task.budget_limit > 0:
        parts.append("")
        parts.append(f"**Budget limit:** {format_cost(task.budget_limit, currency)}")

    if task.deadline:
        parts.append("")
        parts.append(f"**Deadline:** {task.deadline}")

    return "\n".join(parts)

Task Execution

task_execution

Runtime task execution state.

Wraps the frozen Task config model with evolving execution state (status, cost, turn count) using model_copy(update=...) for cheap, immutable state transitions.

StatusTransition pydantic-model

Bases: BaseModel

Frozen audit record for a single status transition.

Attributes:

Name Type Description
from_status TaskStatus

Status before the transition.

to_status TaskStatus

Status after the transition.

timestamp AwareDatetime

When the transition occurred (timezone-aware).

reason str

Optional human-readable reason for the transition.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

from_status pydantic-field

from_status

Status before transition

to_status pydantic-field

to_status

Status after transition

timestamp pydantic-field

timestamp

When the transition occurred

reason pydantic-field

reason = ''

Optional reason for the transition

TaskExecution pydantic-model

Bases: BaseModel

Frozen runtime wrapper around a Task for execution tracking.

All state evolution happens via model_copy(update=...). Transitions are validated explicitly via :func:~synthorg.core.task_transitions.validate_transition before the copy is made.

Attributes:

Name Type Description
task Task

Original frozen task definition.

status TaskStatus

Current execution status (starts from task.status).

transition_log tuple[StatusTransition, ...]

Audit trail of status transitions.

accumulated_cost TokenUsage

Running token usage and cost totals.

turn_count int

Number of LLM turns completed.

retry_count int

Number of previous failure-reassignment cycles.

started_at AwareDatetime | None

Set by with_transition on first entry to IN_PROGRESS (None until then).

completed_at AwareDatetime | None

When execution reached a terminal state.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

task pydantic-field

task

Original frozen task definition

status pydantic-field

status

Current execution status

transition_log pydantic-field

transition_log = ()

Audit trail of status transitions

accumulated_cost pydantic-field

accumulated_cost = ZERO_TOKEN_USAGE

Running cost totals

turn_count pydantic-field

turn_count = 0

Number of turns completed

retry_count pydantic-field

retry_count = 0

Number of previous failure-reassignment cycles

started_at pydantic-field

started_at = None

When execution entered IN_PROGRESS

completed_at pydantic-field

completed_at = None

When execution reached a terminal state

is_terminal property

is_terminal

Whether execution is in a terminal state.

from_task classmethod

from_task(task, *, retry_count=0)

Create a fresh execution from a task definition.

Parameters:

Name Type Description Default
task Task

The frozen task to wrap.

required
retry_count int

Number of previous failure-reassignment cycles.

0

Returns:

Type Description
TaskExecution

New TaskExecution with status matching the task.

Source code in src/synthorg/engine/task_execution.py
@classmethod
def from_task(
    cls,
    task: Task,
    *,
    retry_count: int = 0,
) -> TaskExecution:
    """Create a fresh execution from a task definition.

    Args:
        task: The frozen task to wrap.
        retry_count: Number of previous failure-reassignment cycles.

    Returns:
        New ``TaskExecution`` with status matching the task.
    """
    execution = cls(task=task, status=task.status, retry_count=retry_count)
    logger.debug(
        EXECUTION_TASK_CREATED,
        task_id=task.id,
        initial_status=task.status.value,
    )
    return execution

with_transition

with_transition(target, *, reason='')

Validate and apply a status transition.

Raises:

Type Description
ValueError

If the transition is invalid.

Source code in src/synthorg/engine/task_execution.py
def with_transition(
    self,
    target: TaskStatus,
    *,
    reason: str = "",
) -> TaskExecution:
    """Validate and apply a status transition.

    Raises:
        ValueError: If the transition is invalid.
    """
    try:
        validate_transition(self.status, target)
    except ValueError:
        logger.warning(
            EXECUTION_TASK_TRANSITION_FAILED,
            task_id=self.task.id,
            from_status=self.status.value,
            to_status=target.value,
            turn_count=self.turn_count,
        )
        raise
    now = datetime.now(UTC)
    transition = StatusTransition(
        from_status=self.status,
        to_status=target,
        timestamp=now,
        reason=reason,
    )
    updates = _build_transition_updates(
        self,
        target,
        transition,
        now,
    )
    result = self.model_copy(update=updates)
    logger.info(
        EXECUTION_TASK_TRANSITION,
        task_id=self.task.id,
        from_status=self.status.value,
        to_status=target.value,
        reason=reason,
    )
    return result

with_cost

with_cost(usage)

Accumulate token usage and increment turn count.

Parameters:

Name Type Description Default
usage TokenUsage

Token usage from a single LLM call.

required

Returns:

Type Description
TaskExecution

New TaskExecution with updated cost and turn count.

Raises:

Type Description
ExecutionStateError

If execution is in a terminal state.

Source code in src/synthorg/engine/task_execution.py
def with_cost(self, usage: TokenUsage) -> TaskExecution:
    """Accumulate token usage and increment turn count.

    Args:
        usage: Token usage from a single LLM call.

    Returns:
        New ``TaskExecution`` with updated cost and turn count.

    Raises:
        ExecutionStateError: If execution is in a terminal state.
    """
    if self.is_terminal:
        msg = (
            f"Cannot record cost on terminal task execution "
            f"(task_id={self.task.id}, status={self.status.value})"
        )
        logger.error(
            EXECUTION_COST_ON_TERMINAL,
            task_id=self.task.id,
            status=self.status.value,
        )
        raise ExecutionStateError(msg)
    result = self.model_copy(
        update={
            "accumulated_cost": add_token_usage(self.accumulated_cost, usage),
            "turn_count": self.turn_count + 1,
        }
    )
    logger.debug(
        EXECUTION_COST_RECORDED,
        task_id=self.task.id,
        turn=result.turn_count,
        cost_usd=usage.cost_usd,
    )
    return result

to_task_snapshot

to_task_snapshot()

Return the original task with the current execution status.

Useful for persistence or reporting where a plain Task is expected.

Returns:

Type Description
Task

A copy of the original task with updated status.

Source code in src/synthorg/engine/task_execution.py
def to_task_snapshot(self) -> Task:
    """Return the original task with the current execution status.

    Useful for persistence or reporting where a plain ``Task`` is
    expected.

    Returns:
        A copy of the original task with updated status.
    """
    return self.task.model_copy(update={"status": self.status})

Parallel Execution

parallel

Parallel agent execution orchestrator.

Coordinates multiple AgentEngine.run() calls in parallel using structured concurrency (asyncio.TaskGroup), with error isolation, concurrency limits, resource locking, and progress tracking.

Inspired by the ToolInvoker.invoke_all() pattern from tools/invoker.py (TaskGroup + Semaphore + guarded execution), extended with fail-fast, progress tracking, and CancelledError handling.

ProgressCallback module-attribute

ProgressCallback = Callable[[ParallelProgress], None]

Synchronous callback invoked on progress updates.

Called directly (not awaited) from the executor's event loop; must not block. Async functions will produce un-awaited coroutines.

ParallelExecutor

ParallelExecutor(
    *, engine, shutdown_manager=None, resource_lock=None, progress_callback=None
)

Orchestrates concurrent agent execution.

Composition over inheritance -- takes an AgentEngine and coordinates concurrent run() calls.

Parameters:

Name Type Description Default
engine AgentEngine

Agent execution engine.

required
shutdown_manager ShutdownManager | None

Optional shutdown manager for task registration.

None
resource_lock ResourceLock | None

Optional resource lock for exclusive file access. Defaults to InMemoryResourceLock if any assignments declare resource claims.

None
progress_callback ProgressCallback | None

Optional synchronous callback invoked on progress updates.

None
Source code in src/synthorg/engine/parallel.py
def __init__(
    self,
    *,
    engine: AgentEngine,
    shutdown_manager: ShutdownManager | None = None,
    resource_lock: ResourceLock | None = None,
    progress_callback: ProgressCallback | None = None,
) -> None:
    self._engine = engine
    self._shutdown_manager = shutdown_manager
    self._resource_lock = resource_lock
    self._progress_callback = progress_callback

execute_group async

execute_group(group)

Execute a parallel group of agent assignments.

Parameters:

Name Type Description Default
group ParallelExecutionGroup

The execution group to run.

required

Returns:

Type Description
ParallelExecutionResult

Result with all agent outcomes.

Raises:

Type Description
ResourceConflictError

If resource claims conflict between assignments.

ParallelExecutionError

If fatal errors (MemoryError, RecursionError) occurred during execution.

Source code in src/synthorg/engine/parallel.py
async def execute_group(
    self,
    group: ParallelExecutionGroup,
) -> ParallelExecutionResult:
    """Execute a parallel group of agent assignments.

    Args:
        group: The execution group to run.

    Returns:
        Result with all agent outcomes.

    Raises:
        ResourceConflictError: If resource claims conflict between
            assignments.
        ParallelExecutionError: If fatal errors (MemoryError,
            RecursionError) occurred during execution.
    """
    start = time.monotonic()

    logger.info(
        PARALLEL_GROUP_START,
        group_id=group.group_id,
        agent_count=len(group.assignments),
        max_concurrency=group.max_concurrency,
        fail_fast=group.fail_fast,
    )

    lock = self._resolve_lock(group)
    self._validate_resource_claims(group)

    outcomes: dict[str, AgentOutcome] = {}
    fatal_errors: list[Exception] = []
    progress = _ProgressState(
        group_id=group.group_id,
        total=len(group.assignments),
    )

    task_error: Exception | None = None
    release_error: Exception | None = None
    try:
        if lock is not None:
            await self._acquire_all_locks(group, lock)
        await self._run_task_group(
            group,
            outcomes,
            fatal_errors,
            progress,
        )
    except Exception as exc:
        task_error = exc
    finally:
        if lock is not None:
            try:
                await self._release_all_locks(group, lock)
            except Exception as exc:
                logger.exception(
                    PARALLEL_LOCK_RELEASE_ERROR,
                    error="Failed to release resource locks",
                    group_id=group.group_id,
                )
                release_error = exc

    if release_error is not None:
        lock_msg = (
            f"Parallel group {group.group_id!r}: "
            "resource locks could not be released"
        )
        if task_error is not None:
            task_error.add_note(lock_msg)
        else:
            raise ParallelExecutionError(
                lock_msg,
            ) from release_error

    if task_error is not None:
        raise task_error

    result = self._build_result(
        group,
        outcomes,
        time.monotonic() - start,
    )

    logger.info(
        PARALLEL_GROUP_COMPLETE,
        group_id=group.group_id,
        succeeded=result.agents_succeeded,
        failed=result.agents_failed,
        duration_seconds=result.total_duration_seconds,
    )

    if fatal_errors:
        msg = (
            f"Parallel group {group.group_id!r} had "
            f"{len(fatal_errors)} fatal error(s)"
        )
        logger.error(
            PARALLEL_AGENT_ERROR,
            group_id=group.group_id,
            fatal_error_count=len(fatal_errors),
            error=msg,
        )
        raise ParallelExecutionError(msg) from fatal_errors[0]

    return result

Run Result

run_result

Agent run result model.

Frozen Pydantic model wrapping ExecutionResult with outer metadata from the engine layer (system prompt, wall-clock duration, agent/task IDs).

AgentRunResult pydantic-model

Bases: BaseModel

Immutable result of a complete agent engine run.

Wraps the ExecutionResult from the loop with engine-level metadata: system prompt, wall-clock duration, and agent/task IDs.

Attributes:

Name Type Description
execution_result ExecutionResult

Outcome from the execution loop.

system_prompt SystemPrompt

System prompt used for this run.

duration_seconds float

Wall-clock run time in seconds.

agent_id NotBlankStr

Agent identifier (string form of UUID).

task_id NotBlankStr | None

Task identifier (always set currently; None reserved for future taskless runs).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

execution_result pydantic-field

execution_result

Outcome from the execution loop

system_prompt pydantic-field

system_prompt

System prompt used for this run

duration_seconds pydantic-field

duration_seconds

Wall-clock run time in seconds

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id = None

Task identifier, or None for future taskless runs

produced_artifacts pydantic-field

produced_artifacts = ()

Artifacts produced during execution

termination_reason property

termination_reason

Why the execution loop terminated.

total_turns property

total_turns

Number of turns completed during execution.

total_cost_usd property

total_cost_usd

Accumulated cost from the execution context.

is_success property

is_success

True when termination reason is COMPLETED.

completion_summary property

completion_summary

Extract the last assistant message content as a work summary.

Walks the conversation in reverse to find the most recent assistant message with non-empty text content. Tool-call-only assistant messages (content is None or empty) are skipped.

Returns:

Type Description
str | None

The content string, or None if no qualifying message exists.

Metrics

metrics

Task completion metrics model.

Proxy overhead metrics for an agent run, computed from AgentRunResult data per the Operations design page.

TaskCompletionMetrics pydantic-model

Bases: BaseModel

Proxy overhead metrics for an agent run (see Operations design page).

Computed from AgentRunResult after execution to surface orchestration overhead indicators (turns, tokens, cost, duration).

Attributes:

Name Type Description
task_id NotBlankStr | None

Task identifier (None for future taskless runs).

agent_id NotBlankStr

Agent identifier (string form of UUID).

turns_per_task int

Number of LLM turns to complete the task.

tokens_per_task int

Total tokens consumed (input + output).

cost_per_task float

Total cost for the task in USD (base currency).

duration_seconds float

Wall-clock execution time in seconds.

prompt_tokens int

Estimated system prompt tokens (per-call estimate from SystemPrompt.estimated_tokens).

prompt_token_ratio float

Per-call ratio of prompt tokens to total tokens (overhead indicator, derived via @computed_field). For multi-turn runs, the actual overhead is higher because the system prompt is resent on every turn.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _cap_prompt_tokens

task_id pydantic-field

task_id = None

Task identifier

agent_id pydantic-field

agent_id

Agent identifier

turns_per_task pydantic-field

turns_per_task

Number of LLM turns to complete the task

tokens_per_task pydantic-field

tokens_per_task

Total tokens consumed (input + output)

cost_per_task pydantic-field

cost_per_task

Total cost for the task in USD (base currency)

duration_seconds pydantic-field

duration_seconds

Wall-clock execution time in seconds

prompt_tokens pydantic-field

prompt_tokens = 0

Estimated system prompt tokens

prompt_token_ratio property

prompt_token_ratio

Per-call ratio of prompt tokens to total tokens (overhead indicator).

For multi-turn runs the actual overhead is higher because the system prompt is resent on every turn.

from_run_result classmethod

from_run_result(result)

Build metrics from an agent run result.

Parameters:

Name Type Description Default
result AgentRunResult

The AgentRunResult to extract metrics from.

required

Returns:

Type Description
TaskCompletionMetrics

New TaskCompletionMetrics with values extracted from

TaskCompletionMetrics

the result's execution context and metadata.

Source code in src/synthorg/engine/metrics.py
@classmethod
def from_run_result(cls, result: AgentRunResult) -> TaskCompletionMetrics:
    """Build metrics from an agent run result.

    Args:
        result: The ``AgentRunResult`` to extract metrics from.

    Returns:
        New ``TaskCompletionMetrics`` with values extracted from
        the result's execution context and metadata.
    """
    accumulated = result.execution_result.context.accumulated_cost
    return cls(
        task_id=result.task_id,
        agent_id=result.agent_id,
        turns_per_task=result.total_turns,
        tokens_per_task=accumulated.total_tokens,
        cost_per_task=result.total_cost_usd,
        duration_seconds=result.duration_seconds,
        prompt_tokens=result.system_prompt.estimated_tokens,
    )

Errors

errors

Engine-layer error hierarchy.

EngineError

Bases: Exception

Base exception for all engine-layer errors.

PromptBuildError

Bases: EngineError

Raised when system prompt construction fails.

ExecutionStateError

Bases: EngineError

Raised when an execution state transition is invalid.

MaxTurnsExceededError

Bases: EngineError

Raised when turn_count reaches max_turns during execution.

Enforced by AgentContext.with_turn_completed when the hard turn limit has been reached.

LoopExecutionError

Bases: EngineError

Non-recoverable execution loop error for the engine layer.

The execution loop returns TerminationReason.ERROR internally. This exception is available for the engine layer above the loop to convert that result into a raised error when appropriate.

ParallelExecutionError

Bases: EngineError

Raised when a parallel execution group encounters a fatal error.

ResourceConflictError

Bases: EngineError

Raised when resource claims conflict between assignments.

DecompositionError

Bases: EngineError

Base exception for task decomposition failures.

DecompositionCycleError

Bases: DecompositionError

Raised when a dependency cycle is detected in the subtask graph.

DecompositionDepthError

Bases: DecompositionError

Raised when decomposition exceeds the maximum nesting depth.

TaskRoutingError

Bases: EngineError

Raised when task routing to an agent fails.

TaskAssignmentError

Bases: EngineError

Raised when task assignment fails.

NoEligibleAgentError

Bases: TaskAssignmentError

Raised when no eligible agent is found for assignment.

WorkspaceError

Bases: EngineError

Base exception for workspace isolation failures.

WorkspaceSetupError

Bases: WorkspaceError

Raised when workspace creation fails.

WorkspaceMergeError

Bases: WorkspaceError

Raised when workspace merge fails.

WorkspaceCleanupError

Bases: WorkspaceError

Raised when workspace teardown fails.

WorkspaceLimitError

Bases: WorkspaceError

Raised when maximum concurrent workspaces reached.

TaskEngineError

Bases: EngineError

Base exception for all task engine errors.

TaskEngineNotRunningError

Bases: TaskEngineError

Raised when a mutation is submitted to a stopped task engine.

TaskEngineQueueFullError

Bases: TaskEngineError

Raised when the task engine queue is at capacity.

TaskMutationError

Bases: TaskEngineError

Raised when a task mutation fails (not found, validation, etc.).

TaskNotFoundError

Bases: TaskMutationError

Raised when a task is not found during mutation.

TaskVersionConflictError

Bases: TaskMutationError

Raised when optimistic concurrency version does not match.

TaskInternalError

Bases: TaskEngineError

Raised when a task mutation fails due to an internal engine error.

Unlike :class:TaskMutationError (which covers business-rule failures such as validation or not-found), this signals an unexpected engine fault that the caller cannot fix by changing the request. Maps to 5xx at the API layer.

This is deliberately a sibling of TaskMutationError, not a subtype, so that broad except TaskMutationError handlers do not accidentally catch internal engine faults.

CoordinationError

Bases: EngineError

Base exception for multi-agent coordination failures.

CoordinationPhaseError

CoordinationPhaseError(message, *, phase, partial_phases=())

Bases: CoordinationError

Raised when a coordination pipeline phase fails.

Carries the failing phase name and all phase results accumulated up to and including the failure, enabling partial-result inspection.

Attributes:

Name Type Description
phase

Name of the phase that failed.

partial_phases

Phase results accumulated before and including this failure.

Source code in src/synthorg/engine/errors.py
def __init__(
    self,
    message: str,
    *,
    phase: str,
    partial_phases: tuple[CoordinationPhaseResult, ...] = (),
) -> None:
    super().__init__(message)
    self.phase = phase
    self.partial_phases = partial_phases

WorkflowExecutionError

Bases: EngineError

Base exception for workflow execution failures.

WorkflowDefinitionInvalidError

Bases: WorkflowExecutionError

Raised when a workflow definition fails validation at activation time.

WorkflowConditionEvalError

Bases: WorkflowExecutionError

Raised when a condition expression cannot be evaluated.

WorkflowExecutionNotFoundError

Bases: WorkflowExecutionError

Raised when a workflow execution instance is not found.

Task Decomposition

protocol

Decomposition strategy protocol.

DecompositionStrategy

Bases: Protocol

Protocol for task decomposition strategies.

Implementations produce a DecompositionPlan from a parent task and a decomposition context. The plan describes subtask definitions and their dependency relationships.

decompose async

decompose(task, context)

Decompose a task into subtasks.

Parameters:

Name Type Description Default
task Task

The parent task to decompose.

required
context DecompositionContext

Decomposition constraints (max subtasks, depth).

required

Returns:

Type Description
DecompositionPlan

A decomposition plan with subtask definitions.

Source code in src/synthorg/engine/decomposition/protocol.py
async def decompose(
    self,
    task: Task,
    context: DecompositionContext,
) -> DecompositionPlan:
    """Decompose a task into subtasks.

    Args:
        task: The parent task to decompose.
        context: Decomposition constraints (max subtasks, depth).

    Returns:
        A decomposition plan with subtask definitions.
    """
    ...

get_strategy_name

get_strategy_name()

Return a human-readable name for this strategy.

Source code in src/synthorg/engine/decomposition/protocol.py
def get_strategy_name(self) -> str:
    """Return a human-readable name for this strategy."""
    ...

models

Decomposition domain models.

Frozen Pydantic models for subtask definitions, decomposition plans, results, status rollups, and decomposition context.

SubtaskDefinition pydantic-model

Bases: BaseModel

Definition of a single subtask within a decomposition plan.

Attributes:

Name Type Description
id NotBlankStr

Unique subtask identifier (within this decomposition).

title NotBlankStr

Short subtask title.

description NotBlankStr

Detailed subtask description.

dependencies tuple[NotBlankStr, ...]

IDs of other subtasks this one depends on.

estimated_complexity Complexity

Complexity estimate for routing.

required_skills tuple[NotBlankStr, ...]

Skill names needed for routing.

required_role NotBlankStr | None

Optional role name for routing.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_no_self_dependency

id pydantic-field

id

Unique subtask identifier

title pydantic-field

title

Short subtask title

description pydantic-field

description

Detailed subtask description

dependencies pydantic-field

dependencies = ()

IDs of subtasks this one depends on

estimated_complexity pydantic-field

estimated_complexity = MEDIUM

Complexity estimate for routing

required_skills pydantic-field

required_skills = ()

Skill names needed for routing

required_role pydantic-field

required_role = None

Optional role name for routing

DecompositionPlan pydantic-model

Bases: BaseModel

Plan describing how a parent task is decomposed into subtasks.

Validates subtask collection integrity at construction: non-empty, unique IDs, valid dependency references. Cycle detection is handled by DependencyGraph.validate() in the service layer.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the task being decomposed.

subtasks tuple[SubtaskDefinition, ...]

Ordered subtask definitions.

task_structure TaskStructure

Classified structure of the subtask graph.

coordination_topology CoordinationTopology

Selected coordination topology.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_subtasks

parent_task_id pydantic-field

parent_task_id

ID of the task being decomposed

subtasks pydantic-field

subtasks

Ordered subtask definitions

task_structure pydantic-field

task_structure = SEQUENTIAL

Classified task structure

coordination_topology pydantic-field

coordination_topology = AUTO

Selected coordination topology

DecompositionResult pydantic-model

Bases: BaseModel

Result of a complete task decomposition.

Attributes:

Name Type Description
plan DecompositionPlan

The decomposition plan that was executed.

created_tasks tuple[Task, ...]

Task objects created from subtask definitions.

dependency_edges tuple[tuple[NotBlankStr, NotBlankStr], ...]

Directed edges (from_id, to_id) in the DAG.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_plan_task_consistency

plan pydantic-field

plan

Executed decomposition plan

created_tasks pydantic-field

created_tasks

Task objects created from subtask definitions

dependency_edges pydantic-field

dependency_edges = ()

Directed edges (from_id, to_id) in the DAG

SubtaskStatusRollup pydantic-model

Bases: BaseModel

Aggregated status of subtasks for a parent task.

Tracks five explicit statuses: COMPLETED, FAILED, IN_PROGRESS, BLOCKED, and CANCELLED. Other statuses (CREATED, ASSIGNED, IN_REVIEW, INTERRUPTED) are not individually tracked; the gap between the sum of tracked counts and total accounts for these. The derived_parent_status treats any such remainder as work still pending (IN_PROGRESS).

When all subtasks are in terminal states but with a mix of completed and cancelled, derived_parent_status returns CANCELLED (some work was abandoned).

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

total int

Total number of subtasks.

completed int

Count of COMPLETED subtasks.

failed int

Count of FAILED subtasks.

in_progress int

Count of IN_PROGRESS subtasks.

blocked int

Count of BLOCKED subtasks.

cancelled int

Count of CANCELLED subtasks.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_counts

parent_task_id pydantic-field

parent_task_id

Parent task ID

total pydantic-field

total

Total subtasks

completed pydantic-field

completed

Completed subtasks

failed pydantic-field

failed

Failed subtasks

in_progress pydantic-field

in_progress

In-progress subtasks

blocked pydantic-field

blocked

Blocked subtasks

cancelled pydantic-field

cancelled

Cancelled subtasks

derived_parent_status property

derived_parent_status

Derive the parent task status from subtask statuses.

DecompositionContext pydantic-model

Bases: BaseModel

Configuration context for a decomposition operation.

Attributes:

Name Type Description
max_subtasks int

Maximum number of subtasks allowed.

max_depth int

Maximum nesting depth for recursive decomposition.

current_depth int

Current nesting depth.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

max_subtasks pydantic-field

max_subtasks = 10

Maximum number of subtasks allowed

max_depth pydantic-field

max_depth = 3

Maximum nesting depth

current_depth pydantic-field

current_depth = 0

Current nesting depth

service

Decomposition service.

Orchestrates strategy, classifier, DAG validation, and task creation to decompose a parent task into executable subtasks.

DecompositionService

DecompositionService(strategy, classifier)

Service orchestrating task decomposition.

Composes a decomposition strategy with a structure classifier, DAG validator, and task factory to produce executable subtasks.

Source code in src/synthorg/engine/decomposition/service.py
def __init__(
    self,
    strategy: DecompositionStrategy,
    classifier: TaskStructureClassifier,
) -> None:
    self._strategy = strategy
    self._classifier = classifier

decompose_task async

decompose_task(task, context)

Decompose a task into subtasks.

  1. Classify task structure (uses explicit if set, otherwise heuristic inference). Override the plan's structure with the classifier's result when they differ.
  2. Call strategy.decompose().
  3. Validate DAG via DependencyGraph.
  4. Create Task objects from SubtaskDefinitions.
  5. Return DecompositionResult.

Parameters:

Name Type Description Default
task Task

The parent task to decompose.

required
context DecompositionContext

Decomposition constraints.

required

Returns:

Type Description
DecompositionResult

Decomposition result with created tasks and dependency edges.

Source code in src/synthorg/engine/decomposition/service.py
async def decompose_task(
    self,
    task: Task,
    context: DecompositionContext,
) -> DecompositionResult:
    """Decompose a task into subtasks.

    1. Classify task structure (uses explicit if set,
       otherwise heuristic inference). Override the plan's
       structure with the classifier's result when they differ.
    2. Call strategy.decompose().
    3. Validate DAG via DependencyGraph.
    4. Create Task objects from SubtaskDefinitions.
    5. Return DecompositionResult.

    Args:
        task: The parent task to decompose.
        context: Decomposition constraints.

    Returns:
        Decomposition result with created tasks and dependency edges.
    """
    logger.info(
        DECOMPOSITION_STARTED,
        task_id=task.id,
        strategy=self._strategy.get_strategy_name(),
        current_depth=context.current_depth,
    )

    try:
        return await self._do_decompose(task, context)
    except Exception:
        logger.exception(
            DECOMPOSITION_FAILED,
            task_id=task.id,
            strategy=self._strategy.get_strategy_name(),
        )
        raise

rollup_status

rollup_status(parent_task_id, subtask_statuses)

Compute status rollup for a parent task.

Parameters:

Name Type Description Default
parent_task_id NotBlankStr

The parent task identifier.

required
subtask_statuses tuple[TaskStatus, ...]

Statuses of all subtasks.

required

Returns:

Type Description
SubtaskStatusRollup

Aggregated status rollup.

Source code in src/synthorg/engine/decomposition/service.py
def rollup_status(
    self,
    parent_task_id: NotBlankStr,
    subtask_statuses: tuple[TaskStatus, ...],
) -> SubtaskStatusRollup:
    """Compute status rollup for a parent task.

    Args:
        parent_task_id: The parent task identifier.
        subtask_statuses: Statuses of all subtasks.

    Returns:
        Aggregated status rollup.
    """
    return StatusRollup.compute(parent_task_id, subtask_statuses)

Task Routing

models

Task routing domain models.

Frozen Pydantic models for routing candidates, decisions, results, and topology configuration.

RoutingCandidate pydantic-model

Bases: BaseModel

A candidate agent for a subtask with scoring details.

Attributes:

Name Type Description
agent_identity AgentIdentity

The candidate agent.

score float

Match score between 0.0 and 1.0.

matched_skills tuple[NotBlankStr, ...]

Skills that matched the subtask requirements.

reason NotBlankStr

Human-readable explanation of the score.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_identity pydantic-field

agent_identity

Candidate agent

score pydantic-field

score

Match score (0.0-1.0)

matched_skills pydantic-field

matched_skills = ()

Skills that matched subtask requirements

reason pydantic-field

reason

Explanation of score

RoutingDecision pydantic-model

Bases: BaseModel

Routing decision for a single subtask.

Attributes:

Name Type Description
subtask_id NotBlankStr

ID of the subtask being routed.

selected_candidate RoutingCandidate

The chosen agent candidate.

alternatives tuple[RoutingCandidate, ...]

Other candidates considered.

topology CoordinationTopology

Coordination topology for this subtask.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_selected_not_in_alternatives

subtask_id pydantic-field

subtask_id

Subtask being routed

selected_candidate pydantic-field

selected_candidate

Chosen agent candidate

alternatives pydantic-field

alternatives = ()

Other candidates considered

topology pydantic-field

topology

Coordination topology for this subtask

RoutingResult pydantic-model

Bases: BaseModel

Result of routing all subtasks in a decomposition.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

decisions tuple[RoutingDecision, ...]

Routing decisions for routable subtasks.

unroutable tuple[NotBlankStr, ...]

IDs of subtasks with no matching agent.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_subtask_ids

parent_task_id pydantic-field

parent_task_id

Parent task ID

decisions pydantic-field

decisions = ()

Routing decisions

unroutable pydantic-field

unroutable = ()

Subtask IDs with no matching agent

AutoTopologyConfig pydantic-model

Bases: BaseModel

Configuration for automatic topology selection.

Attributes:

Name Type Description
sequential_override CoordinationTopology

Topology for sequential structures.

parallel_default CoordinationTopology

Topology for parallel structures.

mixed_default CoordinationTopology

Topology for mixed structures.

parallel_artifact_threshold int

Artifact count above which parallel tasks use decentralized topology.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_no_auto_defaults

sequential_override pydantic-field

sequential_override = SAS

Topology for sequential structures

parallel_default pydantic-field

parallel_default = CENTRALIZED

Topology for parallel structures

mixed_default pydantic-field

mixed_default = CONTEXT_DEPENDENT

Topology for mixed structures

parallel_artifact_threshold pydantic-field

parallel_artifact_threshold = 4

Artifact count threshold for decentralized topology

service

Task routing service.

Routes decomposed subtasks to appropriate agents based on scoring, then selects coordination topology.

TaskRoutingService

TaskRoutingService(scorer, topology_selector)

Routes subtasks to agents based on skill matching.

For each subtask in a decomposition result, scores all available agents and selects the best match. Subtasks with no viable candidate are reported as unroutable.

Source code in src/synthorg/engine/routing/service.py
def __init__(
    self,
    scorer: AgentTaskScorer,
    topology_selector: TopologySelector,
) -> None:
    self._scorer = scorer
    self._topology_selector = topology_selector

route

route(decomposition_result, available_agents, parent_task)

Route all subtasks to appropriate agents.

For each subtask: 1. Score all available agents. 2. Select the best candidate (highest score >= min_score). 3. Select topology from parent task override or plan structure. 4. Report unroutable subtasks.

Parameters:

Name Type Description Default
decomposition_result DecompositionResult

The decomposition to route.

required
available_agents tuple[AgentIdentity, ...]

Pool of agents to consider.

required
parent_task Task

The parent task (for topology selection).

required

Returns:

Type Description
RoutingResult

Routing result with decisions and unroutable subtask IDs.

Source code in src/synthorg/engine/routing/service.py
def route(
    self,
    decomposition_result: DecompositionResult,
    available_agents: tuple[AgentIdentity, ...],
    parent_task: Task,
) -> RoutingResult:
    """Route all subtasks to appropriate agents.

    For each subtask:
    1. Score all available agents.
    2. Select the best candidate (highest score >= min_score).
    3. Select topology from parent task override or plan structure.
    4. Report unroutable subtasks.

    Args:
        decomposition_result: The decomposition to route.
        available_agents: Pool of agents to consider.
        parent_task: The parent task (for topology selection).

    Returns:
        Routing result with decisions and unroutable subtask IDs.
    """
    plan = decomposition_result.plan

    if parent_task.id != plan.parent_task_id:
        msg = (
            f"parent_task.id {parent_task.id!r} does not "
            f"match plan.parent_task_id "
            f"{plan.parent_task_id!r}"
        )
        logger.warning(
            TASK_ROUTING_FAILED,
            parent_task_id=parent_task.id,
            plan_parent_task_id=plan.parent_task_id,
            error=msg,
        )
        raise ValueError(msg)

    logger.info(
        TASK_ROUTING_STARTED,
        parent_task_id=plan.parent_task_id,
        subtask_count=len(plan.subtasks),
        agent_count=len(available_agents),
    )

    if not available_agents:
        logger.warning(
            TASK_ROUTING_NO_AGENTS,
            parent_task_id=plan.parent_task_id,
            subtask_count=len(plan.subtasks),
        )
        return RoutingResult(
            parent_task_id=plan.parent_task_id,
            unroutable=tuple(s.id for s in plan.subtasks),
        )

    try:
        return self._do_route(decomposition_result, available_agents, parent_task)
    except Exception:
        logger.exception(
            TASK_ROUTING_FAILED,
            parent_task_id=plan.parent_task_id,
        )
        raise

Task Assignment

protocol

Task assignment strategy protocol.

Defines the pluggable interface for assignment strategies.

TaskAssignmentStrategy

Bases: Protocol

Protocol for task assignment strategies.

Implementations must be synchronous (pure computation, no I/O) and return an AssignmentResult with the selected agent and ranked alternatives. TaskAssignmentService calls assign() synchronously -- async implementations will NOT work correctly.

Error signaling contract:

  • ManualAssignmentStrategy raises NoEligibleAgentError when the designated agent is not found or not ACTIVE, and TaskAssignmentError when task.assigned_to is None.
  • Scoring-based strategies (RoleBasedAssignmentStrategy, LoadBalancedAssignmentStrategy, CostOptimizedAssignmentStrategy, HierarchicalAssignmentStrategy, AuctionAssignmentStrategy) return AssignmentResult(selected=None, ...) when no agent meets the minimum score threshold.

TaskAssignmentService propagates both patterns: it re-raises TaskAssignmentError (including its subclass NoEligibleAgentError) and logs a warning when result.selected is None, returning the result to the caller for handling.

name property

name

Strategy name identifier.

assign

assign(request)

Assign a task to an agent based on the strategy's algorithm.

Parameters:

Name Type Description Default
request AssignmentRequest

The assignment request with task and agent pool.

required

Returns:

Type Description
AssignmentResult

Assignment result with selected agent and alternatives.

AssignmentResult

selected may be None when no eligible agent is

AssignmentResult

found (scoring strategies) -- callers must check this.

Raises:

Type Description
TaskAssignmentError

When preconditions are violated (e.g. missing assigned_to for manual strategy).

NoEligibleAgentError

When the designated agent cannot be found or is not ACTIVE (manual strategy only).

Source code in src/synthorg/engine/assignment/protocol.py
def assign(
    self,
    request: AssignmentRequest,
) -> AssignmentResult:
    """Assign a task to an agent based on the strategy's algorithm.

    Args:
        request: The assignment request with task and agent pool.

    Returns:
        Assignment result with selected agent and alternatives.
        ``selected`` may be ``None`` when no eligible agent is
        found (scoring strategies) -- callers must check this.

    Raises:
        TaskAssignmentError: When preconditions are violated
            (e.g. missing ``assigned_to`` for manual strategy).
        NoEligibleAgentError: When the designated agent cannot
            be found or is not ACTIVE (manual strategy only).
    """
    ...

models

Task assignment domain models.

Frozen Pydantic models for assignment requests, results, agent workloads, and assignment candidates.

AgentWorkload pydantic-model

Bases: BaseModel

Snapshot of an agent's current workload.

Attributes:

Name Type Description
agent_id NotBlankStr

Unique agent identifier.

active_task_count int

Number of tasks currently in progress.

total_cost_usd float

Total cost incurred by this agent in USD (base currency).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent identifier

active_task_count pydantic-field

active_task_count

Number of tasks currently in progress

total_cost_usd pydantic-field

total_cost_usd = 0.0

Total cost incurred by this agent in USD (base currency)

AssignmentCandidate pydantic-model

Bases: BaseModel

A candidate agent for task assignment with scoring details.

Attributes:

Name Type Description
agent_identity AgentIdentity

The candidate agent.

score float

Match score between 0.0 and 1.0.

matched_skills tuple[NotBlankStr, ...]

Skills that matched the assignment requirements.

reason NotBlankStr

Human-readable explanation of the score.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_identity pydantic-field

agent_identity

Candidate agent

score pydantic-field

score

Match score (0.0-1.0)

matched_skills pydantic-field

matched_skills = ()

Skills that matched assignment requirements

reason pydantic-field

reason

Explanation of score

AssignmentRequest pydantic-model

Bases: BaseModel

Request for task assignment to an agent.

The required_skills and required_role fields live here (not on Task) so that scoring strategies can evaluate agent-task fit without modifying the Task model.

Attributes:

Name Type Description
task Task

The task to assign.

available_agents tuple[AgentIdentity, ...]

Pool of agents to consider (must be non-empty, unique by agent id).

workloads tuple[AgentWorkload, ...]

Current workload snapshots per agent (unique by agent_id).

min_score float

Minimum score threshold for eligibility.

required_skills tuple[NotBlankStr, ...]

Skill names needed for scoring.

required_role NotBlankStr | None

Optional role name for scoring.

max_concurrent_tasks int | None

Maximum concurrent tasks per agent. Agents at or above this limit are excluded from scoring. None disables the limit. Corresponds to TaskAssignmentConfig.max_concurrent_tasks_per_agent.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_collections

task pydantic-field

task

The task to assign

available_agents pydantic-field

available_agents

Pool of agents to consider

workloads pydantic-field

workloads = ()

Current workload snapshots per agent

min_score pydantic-field

min_score = 0.1

Minimum score threshold for eligibility

required_skills pydantic-field

required_skills = ()

Skill names needed for scoring

required_role pydantic-field

required_role = None

Optional role name for scoring

max_concurrent_tasks pydantic-field

max_concurrent_tasks = None

Maximum concurrent tasks per agent. Agents at or above this limit are excluded from scoring. None = no limit.

AssignmentResult pydantic-model

Bases: BaseModel

Result of a task assignment operation.

Attributes:

Name Type Description
task_id NotBlankStr

ID of the task that was assigned.

strategy_used NotBlankStr

Name of the strategy that produced this result.

selected AssignmentCandidate | None

The selected candidate (None if no viable agent).

alternatives tuple[AssignmentCandidate, ...]

Other candidates considered, ranked by score.

reason NotBlankStr

Human-readable explanation of the assignment decision.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_selected_not_in_alternatives

task_id pydantic-field

task_id

Task identifier

strategy_used pydantic-field

strategy_used

Name of the strategy used

selected pydantic-field

selected = None

Selected candidate (None if no viable agent)

alternatives pydantic-field

alternatives = ()

Other candidates considered, ranked by score

reason pydantic-field

reason

Explanation of decision

service

Task assignment service.

Orchestrates task assignment by delegating to a pluggable TaskAssignmentStrategy with logging and validation.

TaskAssignmentService

TaskAssignmentService(strategy)

Orchestrates task assignment via a pluggable strategy.

Validates task status before delegating to the strategy. Does NOT mutate the task -- callers are responsible for any subsequent status transitions.

Source code in src/synthorg/engine/assignment/service.py
def __init__(self, strategy: TaskAssignmentStrategy) -> None:
    self._strategy = strategy

assign

assign(request)

Assign a task to an agent using the configured strategy.

Parameters:

Name Type Description Default
request AssignmentRequest

The assignment request.

required

Returns:

Type Description
AssignmentResult

Assignment result from the strategy.

Raises:

Type Description
TaskAssignmentError

If the task status is not eligible for assignment.

Source code in src/synthorg/engine/assignment/service.py
def assign(self, request: AssignmentRequest) -> AssignmentResult:
    """Assign a task to an agent using the configured strategy.

    Args:
        request: The assignment request.

    Returns:
        Assignment result from the strategy.

    Raises:
        TaskAssignmentError: If the task status is not eligible
            for assignment.
    """
    task = request.task

    if task.status not in _ASSIGNABLE_STATUSES:
        msg = (
            f"Task {task.id!r} has status {task.status.value!r}, "
            f"expected one of "
            f"{sorted(s.value for s in _ASSIGNABLE_STATUSES)}"
        )
        logger.warning(
            TASK_ASSIGNMENT_FAILED,
            task_id=task.id,
            status=task.status.value,
            error=msg,
        )
        raise TaskAssignmentError(msg)

    logger.info(
        TASK_ASSIGNMENT_STARTED,
        task_id=task.id,
        strategy=self._strategy.name,
        agent_count=len(request.available_agents),
    )

    try:
        result = self._strategy.assign(request)
    except TaskAssignmentError:
        raise  # already logged by the strategy
    except Exception:
        logger.exception(
            TASK_ASSIGNMENT_FAILED,
            task_id=task.id,
            strategy=self._strategy.name,
        )
        raise

    if result.selected is not None:
        logger.info(
            TASK_ASSIGNMENT_AGENT_SELECTED,
            task_id=task.id,
            agent_name=result.selected.agent_identity.name,
            score=result.selected.score,
            strategy=result.strategy_used,
        )
    else:
        logger.warning(
            TASK_ASSIGNMENT_NO_ELIGIBLE,
            task_id=task.id,
            strategy=self._strategy.name,
            reason=result.reason,
        )

    logger.info(
        TASK_ASSIGNMENT_COMPLETE,
        task_id=task.id,
        strategy=result.strategy_used,
        selected=result.selected is not None,
        alternatives=len(result.alternatives),
    )

    return result

Error Classification

models

Classification result models for the error taxonomy pipeline.

Defines severity levels, individual error findings, and aggregated classification results produced by the detection pipeline.

ErrorSeverity

Bases: StrEnum

Severity level for a detected coordination error.

ErrorFinding pydantic-model

Bases: BaseModel

A single coordination error detected during classification.

Attributes:

Name Type Description
category ErrorCategory

The error category from the taxonomy.

severity ErrorSeverity

Severity level of the finding.

description NotBlankStr

Human-readable description of the error.

evidence tuple[NotBlankStr, ...]

Supporting evidence extracted from the conversation.

turn_range tuple[int, int] | None

(start, end) 0-based index range where the error was observed, or None if the error cannot be attributed to specific positions. For conversation-based detectors this is the message index; for turn-based detectors this is the index into the turns tuple.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_turn_range

category pydantic-field

category

Error taxonomy category

severity pydantic-field

severity

Severity level

description pydantic-field

description

Error description

evidence pydantic-field

evidence = ()

Supporting evidence from conversation

turn_range pydantic-field

turn_range = None

0-based index range (start, end) where error was observed. For conversation-based detectors this is the message index in the conversation tuple; for turn-based detectors this is the index into the turns tuple.

ClassificationResult pydantic-model

Bases: BaseModel

Aggregated result from the error classification pipeline.

Attributes:

Name Type Description
execution_id NotBlankStr

Unique identifier for the execution run.

agent_id NotBlankStr

Agent that was executing.

task_id NotBlankStr

Task being executed.

categories_checked tuple[ErrorCategory, ...]

Which error categories were checked.

findings tuple[ErrorFinding, ...]

All detected error findings.

classified_at AwareDatetime

Timestamp when classification completed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_findings_match_categories

execution_id pydantic-field

execution_id

Execution run identifier

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id

Task identifier

categories_checked pydantic-field

categories_checked

Categories that were checked

findings pydantic-field

findings = ()

Detected error findings

classified_at pydantic-field

classified_at

Classification timestamp

finding_count property

finding_count

Total number of detected findings.

has_findings property

has_findings

Whether any error findings were detected.

pipeline

Error classification pipeline.

Orchestrates the detection of coordination errors from an execution result using the configured error taxonomy. The pipeline never raises exceptions -- all errors are caught and logged.

classify_execution_errors async

classify_execution_errors(execution_result, agent_id, task_id, *, config)

Classify coordination errors from an execution result.

Returns None when the taxonomy is disabled. Never raises -- all exceptions are caught and logged as CLASSIFICATION_ERROR.

The function is async for compatibility with the engine's async execution pipeline; current detectors run synchronously.

Parameters:

Name Type Description Default
execution_result ExecutionResult

The completed execution result to analyse.

required
agent_id NotBlankStr

Agent that executed the task.

required
task_id NotBlankStr

Task that was executed.

required
config ErrorTaxonomyConfig

Error taxonomy configuration controlling which categories to check.

required

Returns:

Type Description
ClassificationResult | None

Classification result with findings, or None if disabled.

Source code in src/synthorg/engine/classification/pipeline.py
async def classify_execution_errors(
    execution_result: ExecutionResult,
    agent_id: NotBlankStr,
    task_id: NotBlankStr,
    *,
    config: ErrorTaxonomyConfig,
) -> ClassificationResult | None:
    """Classify coordination errors from an execution result.

    Returns ``None`` when the taxonomy is disabled.  Never raises --
    all exceptions are caught and logged as ``CLASSIFICATION_ERROR``.

    The function is async for compatibility with the engine's async
    execution pipeline; current detectors run synchronously.

    Args:
        execution_result: The completed execution result to analyse.
        agent_id: Agent that executed the task.
        task_id: Task that was executed.
        config: Error taxonomy configuration controlling which
            categories to check.

    Returns:
        Classification result with findings, or ``None`` if disabled.
    """
    if not config.enabled:
        logger.debug(
            CLASSIFICATION_SKIPPED,
            agent_id=agent_id,
            task_id=task_id,
            reason="error taxonomy disabled",
        )
        return None

    execution_id = execution_result.context.execution_id
    logger.info(
        CLASSIFICATION_START,
        agent_id=agent_id,
        task_id=task_id,
        execution_id=execution_id,
        categories=tuple(c.value for c in config.categories),
    )

    try:
        return _run_detectors(
            execution_result,
            agent_id,
            task_id,
            execution_id=execution_id,
            config=config,
        )
    except MemoryError, RecursionError:
        logger.error(
            CLASSIFICATION_ERROR,
            agent_id=agent_id,
            task_id=task_id,
            error="non-recoverable error in classification",
            exc_info=True,
        )
        raise
    except Exception as exc:
        logger.exception(
            CLASSIFICATION_ERROR,
            agent_id=agent_id,
            task_id=task_id,
            error=f"{type(exc).__name__}: {exc}",
        )
        return None

Workspace Isolation

protocol

Workspace isolation strategy protocol.

WorkspaceIsolationStrategy

Bases: Protocol

Protocol for workspace isolation strategies.

Implementations provide the ability to create, merge, and tear down isolated workspaces for concurrent agent execution.

setup_workspace async

setup_workspace(*, request)

Create an isolated workspace for an agent task.

Parameters:

Name Type Description Default
request WorkspaceRequest

Workspace creation request.

required

Returns:

Type Description
Workspace

The created workspace.

Raises:

Type Description
WorkspaceLimitError

When max concurrent worktrees reached.

WorkspaceSetupError

When git operations fail.

Source code in src/synthorg/engine/workspace/protocol.py
async def setup_workspace(
    self,
    *,
    request: WorkspaceRequest,
) -> Workspace:
    """Create an isolated workspace for an agent task.

    Args:
        request: Workspace creation request.

    Returns:
        The created workspace.

    Raises:
        WorkspaceLimitError: When max concurrent worktrees reached.
        WorkspaceSetupError: When git operations fail.
    """
    ...

teardown_workspace async

teardown_workspace(*, workspace)

Remove an isolated workspace and clean up resources.

Parameters:

Name Type Description Default
workspace Workspace

The workspace to tear down.

required

Raises:

Type Description
WorkspaceCleanupError

When git cleanup operations fail.

Source code in src/synthorg/engine/workspace/protocol.py
async def teardown_workspace(
    self,
    *,
    workspace: Workspace,
) -> None:
    """Remove an isolated workspace and clean up resources.

    Args:
        workspace: The workspace to tear down.

    Raises:
        WorkspaceCleanupError: When git cleanup operations fail.
    """
    ...

merge_workspace async

merge_workspace(*, workspace)

Merge a workspace branch back into the base branch.

Merge conflicts are returned as a MergeResult with success=False rather than raised as exceptions.

Parameters:

Name Type Description Default
workspace Workspace

The workspace to merge.

required

Returns:

Type Description
MergeResult

The merge result with conflict details if any.

Raises:

Type Description
WorkspaceMergeError

When checkout or merge abort fails.

Source code in src/synthorg/engine/workspace/protocol.py
async def merge_workspace(
    self,
    *,
    workspace: Workspace,
) -> MergeResult:
    """Merge a workspace branch back into the base branch.

    Merge conflicts are returned as a ``MergeResult`` with
    ``success=False`` rather than raised as exceptions.

    Args:
        workspace: The workspace to merge.

    Returns:
        The merge result with conflict details if any.

    Raises:
        WorkspaceMergeError: When checkout or merge abort fails.
    """
    ...

list_active_workspaces async

list_active_workspaces()

Return all currently active workspaces.

Returns:

Type Description
tuple[Workspace, ...]

Tuple of active workspaces.

Source code in src/synthorg/engine/workspace/protocol.py
async def list_active_workspaces(self) -> tuple[Workspace, ...]:
    """Return all currently active workspaces.

    Returns:
        Tuple of active workspaces.
    """
    ...

get_strategy_type

get_strategy_type()

Return the strategy type identifier.

Returns:

Type Description
str

Strategy type name.

Source code in src/synthorg/engine/workspace/protocol.py
def get_strategy_type(self) -> str:
    """Return the strategy type identifier.

    Returns:
        Strategy type name.
    """
    ...

models

Workspace isolation domain models.

WorkspaceRequest pydantic-model

Bases: BaseModel

Request to create an isolated workspace for an agent task.

Attributes:

Name Type Description
task_id NotBlankStr

Identifier of the task requiring isolation.

agent_id NotBlankStr

Identifier of the agent that will work in the workspace.

base_branch NotBlankStr

Git branch to branch from.

file_scope tuple[NotBlankStr, ...]

Optional file path hints for the workspace.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

task_id pydantic-field

task_id

Task requiring isolation

agent_id pydantic-field

agent_id

Agent working in workspace

base_branch pydantic-field

base_branch = 'main'

Git branch to branch from

file_scope pydantic-field

file_scope = ()

Optional file path hints

Workspace pydantic-model

Bases: BaseModel

An active isolated workspace backed by a git worktree.

Attributes:

Name Type Description
workspace_id NotBlankStr

Unique identifier for this workspace.

task_id NotBlankStr

Task this workspace serves.

agent_id NotBlankStr

Agent operating in this workspace.

branch_name NotBlankStr

Git branch created for this workspace.

worktree_path NotBlankStr

Filesystem path to the worktree directory.

base_branch NotBlankStr

Branch this workspace was created from.

created_at datetime

Timestamp of workspace creation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

workspace_id pydantic-field

workspace_id

Unique workspace ID

task_id pydantic-field

task_id

Task this workspace serves

agent_id pydantic-field

agent_id

Agent operating in workspace

branch_name pydantic-field

branch_name

Git branch for this workspace

worktree_path pydantic-field

worktree_path

Filesystem path to worktree

base_branch pydantic-field

base_branch

Branch workspace was created from

created_at pydantic-field

created_at

Workspace creation timestamp

MergeConflict pydantic-model

Bases: BaseModel

A single merge conflict detected during workspace merge.

Attributes:

Name Type Description
file_path NotBlankStr

Path of the conflicting file.

conflict_type ConflictType

Type of conflict (e.g. textual, semantic).

ours_content str

Content from the base branch side.

theirs_content str

Content from the workspace branch side.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_semantic_description

file_path pydantic-field

file_path

Conflicting file path

conflict_type pydantic-field

conflict_type

Type of conflict detected during merge

ours_content pydantic-field

ours_content = ''

Base branch content

theirs_content pydantic-field

theirs_content = ''

Workspace branch content

description pydantic-field

description = ''

Human-readable description of the conflict

MergeResult pydantic-model

Bases: BaseModel

Result of merging a single workspace branch back.

Attributes:

Name Type Description
workspace_id NotBlankStr

Workspace that was merged.

branch_name NotBlankStr

Branch that was merged.

success bool

Whether the merge completed without conflicts.

conflicts tuple[MergeConflict, ...]

Any textual conflicts encountered during merge.

escalation ConflictEscalation | None

Escalation strategy applied, if any.

merged_commit_sha NotBlankStr | None

SHA of the merge commit, if successful.

duration_seconds float

Time taken for the merge operation.

semantic_conflicts tuple[MergeConflict, ...]

Semantic conflicts detected after merge.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_success_consistency

workspace_id pydantic-field

workspace_id

Merged workspace ID

branch_name pydantic-field

branch_name

Merged branch name

success pydantic-field

success

Whether merge succeeded

conflicts pydantic-field

conflicts = ()

Conflicts encountered

escalation pydantic-field

escalation = None

Escalation strategy applied

merged_commit_sha pydantic-field

merged_commit_sha = None

Merge commit SHA if successful

duration_seconds pydantic-field

duration_seconds

Merge duration in seconds

semantic_conflicts pydantic-field

semantic_conflicts = ()

Semantic conflicts detected after successful merge

WorkspaceGroupResult pydantic-model

Bases: BaseModel

Aggregated result of merging a group of workspaces.

Attributes:

Name Type Description
group_id NotBlankStr

Identifier for this merge group.

merge_results tuple[MergeResult, ...]

Individual merge results for each workspace.

duration_seconds float

Total time for the group merge operation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

group_id pydantic-field

group_id

Merge group identifier

merge_results pydantic-field

merge_results = ()

Individual merge results

duration_seconds pydantic-field

duration_seconds

Total merge duration in seconds

all_merged property

all_merged

Return True only if every workspace merged without conflict.

total_conflicts property

total_conflicts

Sum of conflicts from all merge results.

total_semantic_conflicts property

total_semantic_conflicts

Sum of semantic conflicts from all merge results.

service

Workspace isolation service.

High-level service that coordinates workspace lifecycle: setup, merge, and teardown for groups of agent workspaces.

WorkspaceIsolationService

WorkspaceIsolationService(*, strategy, config)

Service for managing workspace isolation lifecycle.

Coordinates creating, merging, and tearing down workspaces for groups of concurrent agent tasks.

Parameters:

Name Type Description Default
strategy WorkspaceIsolationStrategy

Workspace isolation strategy implementation.

required
config WorkspaceIsolationConfig

Workspace isolation configuration.

required
Source code in src/synthorg/engine/workspace/service.py
def __init__(
    self,
    *,
    strategy: WorkspaceIsolationStrategy,
    config: WorkspaceIsolationConfig,
) -> None:
    self._strategy = strategy
    self._config = config
    pw = config.planner_worktrees
    self._merge_orchestrator = MergeOrchestrator(
        strategy=strategy,
        merge_order=pw.merge_order,
        conflict_escalation=pw.conflict_escalation,
        cleanup_on_merge=pw.cleanup_on_merge,
    )

setup_group async

setup_group(*, requests)

Create workspaces for a group of agent tasks.

Rolls back all already-created workspaces if any setup fails.

Parameters:

Name Type Description Default
requests tuple[WorkspaceRequest, ...]

Workspace creation requests.

required

Returns:

Type Description
tuple[Workspace, ...]

Tuple of created workspaces.

Raises:

Type Description
WorkspaceLimitError

When max concurrent worktrees reached.

WorkspaceSetupError

When git operations fail.

Source code in src/synthorg/engine/workspace/service.py
async def setup_group(
    self,
    *,
    requests: tuple[WorkspaceRequest, ...],
) -> tuple[Workspace, ...]:
    """Create workspaces for a group of agent tasks.

    Rolls back all already-created workspaces if any setup fails.

    Args:
        requests: Workspace creation requests.

    Returns:
        Tuple of created workspaces.

    Raises:
        WorkspaceLimitError: When max concurrent worktrees reached.
        WorkspaceSetupError: When git operations fail.
    """
    logger.info(
        WORKSPACE_GROUP_SETUP_START,
        count=len(requests),
    )

    workspaces: list[Workspace] = []
    try:
        for request in requests:
            ws = await self._strategy.setup_workspace(
                request=request,
            )
            workspaces.append(ws)
    except (WorkspaceLimitError, WorkspaceSetupError) as exc:
        logger.warning(
            WORKSPACE_GROUP_SETUP_FAILED,
            count=len(requests),
            created=len(workspaces),
            error=str(exc),
        )
        await self._rollback_workspaces(workspaces)
        raise

    logger.info(
        WORKSPACE_GROUP_SETUP_COMPLETE,
        count=len(workspaces),
    )
    return tuple(workspaces)

merge_group async

merge_group(*, workspaces)

Merge all workspaces and return aggregated result.

Parameters:

Name Type Description Default
workspaces tuple[Workspace, ...]

Workspaces to merge.

required

Returns:

Type Description
WorkspaceGroupResult

Aggregated merge result for the group.

Raises:

Type Description
WorkspaceMergeError

When a merge operation fails fatally.

Source code in src/synthorg/engine/workspace/service.py
async def merge_group(
    self,
    *,
    workspaces: tuple[Workspace, ...],
) -> WorkspaceGroupResult:
    """Merge all workspaces and return aggregated result.

    Args:
        workspaces: Workspaces to merge.

    Returns:
        Aggregated merge result for the group.

    Raises:
        WorkspaceMergeError: When a merge operation fails fatally.
    """
    start = time.monotonic()
    merge_results = await self._merge_orchestrator.merge_all(
        workspaces=workspaces,
    )
    elapsed = time.monotonic() - start

    return WorkspaceGroupResult(
        group_id=str(uuid4()),
        merge_results=merge_results,
        duration_seconds=elapsed,
    )

teardown_group async

teardown_group(*, workspaces)

Tear down all workspaces in a group.

Uses best-effort teardown: attempts all workspaces even if some fail, then raises a combined error.

Parameters:

Name Type Description Default
workspaces tuple[Workspace, ...]

Workspaces to tear down.

required

Raises:

Type Description
WorkspaceCleanupError

When any teardown operation fails.

Source code in src/synthorg/engine/workspace/service.py
async def teardown_group(
    self,
    *,
    workspaces: tuple[Workspace, ...],
) -> None:
    """Tear down all workspaces in a group.

    Uses best-effort teardown: attempts all workspaces even if
    some fail, then raises a combined error.

    Args:
        workspaces: Workspaces to tear down.

    Raises:
        WorkspaceCleanupError: When any teardown operation fails.
    """
    logger.info(
        WORKSPACE_GROUP_TEARDOWN_START,
        count=len(workspaces),
    )

    errors: list[str] = []
    for workspace in workspaces:
        try:
            await self._strategy.teardown_workspace(
                workspace=workspace,
            )
        except Exception as exc:
            errors.append(
                f"workspace {workspace.workspace_id}: {exc}",
            )
            logger.warning(
                WORKSPACE_TEARDOWN_FAILED,
                workspace_id=workspace.workspace_id,
                error=str(exc),
            )

    logger.info(
        WORKSPACE_GROUP_TEARDOWN_COMPLETE,
        count=len(workspaces),
        failures=len(errors),
    )

    if errors:
        msg = f"Failed to tear down {len(errors)} workspace(s): {'; '.join(errors)}"
        raise WorkspaceCleanupError(msg)