Skip to content

Providers

LLM provider abstraction: protocol, base class, drivers, capabilities, routing, and resilience.

Protocol

protocol

Typed protocol for completion providers.

The engine and tests type-hint against CompletionProvider for loose coupling. Concrete adapters and test doubles satisfy it structurally.

CompletionProvider

Bases: Protocol

Structural interface every LLM provider adapter must satisfy.

Defines four async methods: complete for non-streaming chat completion, stream for streaming completion, get_model_capabilities for a single-model capability lookup, and batch_get_capabilities for many-model capability lookup with per-model graceful degradation.

complete async

complete(messages, model, *, tools=None, config=None)

Execute a non-streaming chat completion.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
CompletionResponse

The full completion response.

Source code in src/synthorg/providers/protocol.py
async def complete(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> CompletionResponse:
    """Execute a non-streaming chat completion.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        The full completion response.
    """
    ...

stream async

stream(messages, model, *, tools=None, config=None)

Execute a streaming chat completion.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
AsyncIterator[StreamChunk]

Async iterator of stream chunks.

Source code in src/synthorg/providers/protocol.py
async def stream(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> AsyncIterator[StreamChunk]:
    """Execute a streaming chat completion.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        Async iterator of stream chunks.
    """
    ...

get_model_capabilities async

get_model_capabilities(model)

Return capability metadata for the given model.

Parameters:

Name Type Description Default
model str

Model identifier.

required

Returns:

Type Description
ModelCapabilities

Static capability and cost information.

Source code in src/synthorg/providers/protocol.py
async def get_model_capabilities(self, model: str) -> ModelCapabilities:
    """Return capability metadata for the given model.

    Args:
        model: Model identifier.

    Returns:
        Static capability and cost information.
    """
    ...

batch_get_capabilities async

batch_get_capabilities(models)

Return capability metadata for many models in one call.

Failures degrade per-model: models whose lookup fails surface as None entries so callers preserve graceful per-model fallback. The returned mapping keys are exactly the input models tuple.

Parameters:

Name Type Description Default
models tuple[str, ...]

Tuple of model identifiers to look up.

required

Returns:

Type Description
Mapping[str, ModelCapabilities | None]

Mapping from model id to capabilities (or None on failure).

Source code in src/synthorg/providers/protocol.py
async def batch_get_capabilities(
    self,
    models: tuple[str, ...],
) -> Mapping[str, ModelCapabilities | None]:
    """Return capability metadata for many models in one call.

    Failures degrade per-model: models whose lookup fails surface as
    ``None`` entries so callers preserve graceful per-model fallback.
    The returned mapping keys are exactly the input ``models`` tuple.

    Args:
        models: Tuple of model identifiers to look up.

    Returns:
        Mapping from model id to capabilities (or ``None`` on failure).
    """
    ...

Base Provider

base

Abstract base class for completion providers.

Concrete adapters subclass BaseCompletionProvider and implement the _do_* hooks. The base class handles input validation, automatic retry, rate limiting, and provides a cost-computation helper.

BaseCompletionProvider

BaseCompletionProvider(*, retry_handler=None, rate_limiter=None, clock=None)

Bases: ABC

Shared base for all completion provider adapters.

Subclasses implement three hooks:

  • _do_complete -- raw non-streaming call
  • _do_stream -- raw streaming call
  • _do_get_model_capabilities -- capability lookup

The public methods validate inputs before delegating to hooks. When a retry_handler and/or rate_limiter are provided, calls are automatically wrapped with retry and rate-limiting logic. A static compute_cost helper is available for subclasses to build TokenUsage records from raw token counts.

Parameters:

Name Type Description Default
retry_handler RetryHandler | None

Optional retry handler for transient errors.

None
rate_limiter RateLimiter | None

Optional client-side rate limiter.

None
clock Clock | None

Optional injectable :class:~synthorg.core.clock.Clock used for latency measurement. Defaults to SystemClock; tests inject FakeClock to drive virtual time without wall-clock waits.

None
Source code in src/synthorg/providers/base.py
def __init__(
    self,
    *,
    retry_handler: RetryHandler | None = None,
    rate_limiter: RateLimiter | None = None,
    clock: Clock | None = None,
) -> None:
    self._retry_handler = retry_handler
    self._rate_limiter = rate_limiter
    self._clock: Clock = clock if clock is not None else SystemClock()

complete async

complete(messages, model, *, tools=None, config=None)

Validate inputs, delegate to _do_complete.

Applies rate limiting and retry automatically when configured.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
CompletionResponse

The completion response.

Raises:

Type Description
InvalidRequestError

If messages are empty or model is blank.

RetryExhaustedError

If all retries are exhausted.

Source code in src/synthorg/providers/base.py
async def complete(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> CompletionResponse:
    """Validate inputs, delegate to ``_do_complete``.

    Applies rate limiting and retry automatically when configured.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        The completion response.

    Raises:
        InvalidRequestError: If messages are empty or model is blank.
        RetryExhaustedError: If all retries are exhausted.
    """
    self._validate_messages(messages)
    self._validate_model(model)
    logger.debug(
        PROVIDER_CALL_START,
        model=model,
        message_count=len(messages),
    )

    async def _attempt() -> CompletionResponse:
        return await self._rate_limited_call(
            self._do_complete,
            messages,
            model,
            tools=tools,
            config=config,
        )

    from .resilience.retry import RetryResult  # noqa: PLC0415, TC001

    # Per-call child span under whatever parent span the caller
    # owns (typically ``agent.execution`` from AgentEngine).
    # ``record_exception=False`` and ``set_status_on_exception=False``
    # opt out of the auto-instrumentation that would otherwise
    # stamp the unscrubbed ``str(exc)`` into the span; we set
    # ``exception.message`` via ``safe_error_description`` instead
    # so attacker-controlled provider error strings are scrubbed
    # before reaching the OTLP exporter.
    provider_label = self._provider_label()
    span_attributes: dict[str, Any] = {
        "provider.name": provider_label,
        "provider.model": model,
        "provider.message_count": len(messages),
        "provider.tool_count": len(tools) if tools else 0,
    }
    with _tracer.start_as_current_span(
        "provider.complete",
        attributes=span_attributes,
        record_exception=False,
        set_status_on_exception=False,
    ) as span:
        t_start = self._clock.monotonic()
        retry_info: RetryResult[CompletionResponse] | None = None
        try:
            if self._retry_handler is not None:
                retry_info = await self._retry_handler.execute(_attempt)
                result = retry_info.value
            else:
                result = await _attempt()
        except MemoryError, RecursionError:
            raise
        except Exception as exc:
            latency_ms = (
                self._clock.monotonic() - t_start
            ) * _MILLISECONDS_PER_SECOND
            # ``logger.exception`` (what TRY400 suggests) would
            # attach a traceback whose serialized frame-locals can
            # leak provider credentials (API keys in headers,
            # connection URLs with user:pass). Use ``logger.error``
            # with the structured ``error_type`` + scrubbed
            # ``error`` fields instead.
            logger.error(
                PROVIDER_CALL_ERROR,
                model=model,
                latency_ms=latency_ms,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            span.set_attribute("exception.type", type(exc).__name__)
            span.set_attribute(
                "exception.message",
                safe_error_description(exc),
            )
            span.set_attribute("provider.latency_ms", latency_ms)
            # ``set_status_on_exception=False`` opts out of the
            # auto-instrumentation that would have stamped an
            # un-scrubbed ``str(exc)`` into the span status, so the
            # ERROR status must be set manually here. The scrubbed
            # error description is exposed via ``exception.message``
            # above; ``Status.description`` is intentionally left
            # unset so the OTLP exporter never carries the raw
            # provider string.
            span.set_status(Status(StatusCode.ERROR))
            record_provider_error(
                provider=provider_label,
                model=model,
                error_class=classify_provider_error(exc),
            )
            raise
        latency_ms = (self._clock.monotonic() - t_start) * _MILLISECONDS_PER_SECOND
        span.set_attribute("provider.latency_ms", latency_ms)
        if retry_info is not None:
            span.set_attribute(
                "provider.retry_count",
                max(0, retry_info.attempt_count - 1),
            )

    metadata: dict[str, object] = {"_synthorg_latency_ms": latency_ms}
    if retry_info is not None:
        metadata["_synthorg_retry_count"] = max(
            0,
            retry_info.attempt_count - 1,
        )
        if retry_info.retry_reason is not None:
            metadata["_synthorg_retry_reason"] = retry_info.retry_reason

    merged_metadata = dict(result.provider_metadata or {})
    merged_metadata.update(metadata)
    result = result.model_copy(update={"provider_metadata": merged_metadata})
    logger.debug(
        PROVIDER_CALL_SUCCESS,
        model=model,
    )

    # Cost recording chokepoint: when a ``cost_recording_scope`` is
    # open in the current asyncio task, emit a CostRecord. Sites
    # without a scope (probes, tests, and the engine path which
    # records via ``record_execution_costs`` post-execution) see no
    # change. Recording errors are logged and swallowed inside the
    # helper -- never surface to the caller.
    ctx = current_cost_context()
    if ctx is not None:
        await emit_cost_record_from_context(
            ctx,
            result,
            model=model,
            provider=self._provider_label(),
        )
    return result

stream async

stream(messages, model, *, tools=None, config=None)

Validate inputs, delegate to _do_stream.

Only the initial connection setup is retried; mid-stream errors are not retried.

.. note::

Unlike :meth:`complete`, ``stream`` does **not** fire the
cost-recording chokepoint.  Streaming responses surface
usage as a terminal ``StreamEventType.USAGE`` chunk, so the
recording logic would have to consume the iterator to
extract token counts -- conflating cost recording with the
stream-consumption contract.  Until streaming becomes a
mainstream LLM call path in this codebase, callers using
``stream()`` are responsible for emitting their own
``CostRecord`` from the final usage chunk.  No call site in
the current diff uses ``stream()`` for paid LLM work.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
AsyncIterator[StreamChunk]

Async iterator of stream chunks.

Raises:

Type Description
InvalidRequestError

If messages are empty or model is blank.

RetryExhaustedError

If all retries are exhausted.

Source code in src/synthorg/providers/base.py
async def stream(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> AsyncIterator[StreamChunk]:
    """Validate inputs, delegate to ``_do_stream``.

    Only the initial connection setup is retried; mid-stream errors
    are not retried.

    .. note::

        Unlike :meth:`complete`, ``stream`` does **not** fire the
        cost-recording chokepoint.  Streaming responses surface
        usage as a terminal ``StreamEventType.USAGE`` chunk, so the
        recording logic would have to consume the iterator to
        extract token counts -- conflating cost recording with the
        stream-consumption contract.  Until streaming becomes a
        mainstream LLM call path in this codebase, callers using
        ``stream()`` are responsible for emitting their own
        ``CostRecord`` from the final usage chunk.  No call site in
        the current diff uses ``stream()`` for paid LLM work.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        Async iterator of stream chunks.

    Raises:
        InvalidRequestError: If messages are empty or model is blank.
        RetryExhaustedError: If all retries are exhausted.
    """
    self._validate_messages(messages)
    self._validate_model(model)
    logger.debug(
        PROVIDER_STREAM_START,
        model=model,
        message_count=len(messages),
    )

    async def _attempt() -> AsyncIterator[StreamChunk]:
        return await self._rate_limited_call(
            self._do_stream,
            messages,
            model,
            tools=tools,
            config=config,
        )

    try:
        return await self._resilient_execute(_attempt)
    except MemoryError, RecursionError:
        raise
    except Exception as exc:
        # See the ``complete`` sibling handler; ``logger.error``
        # + scrubbed fields instead of ``logger.exception``
        # prevents traceback frame-locals from leaking provider
        # credentials.
        logger.error(
            PROVIDER_CALL_ERROR,
            model=model,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        record_provider_error(
            provider=self._provider_label(),
            model=model,
            error_class=classify_provider_error(exc),
        )
        raise

get_model_capabilities async

get_model_capabilities(model)

Validate model identifier, delegate to _do_get_model_capabilities.

Capability lookups go through the same retry handler and rate limiter as complete() / stream() so the contract "all provider calls go through BaseCompletionProvider" stays honest for any future driver whose _do_get_model_capabilities does network I/O. Same budget as completions: capability lookups consume a rate-limiter slot and are retried on retryable errors.

Parameters:

Name Type Description Default
model str

Model identifier.

required

Returns:

Type Description
ModelCapabilities

Static capability and cost information.

Raises:

Type Description
InvalidRequestError

If model is blank.

RetryExhaustedError

If all retries are exhausted.

Source code in src/synthorg/providers/base.py
async def get_model_capabilities(self, model: str) -> ModelCapabilities:
    """Validate model identifier, delegate to ``_do_get_model_capabilities``.

    Capability lookups go through the same retry handler and rate
    limiter as ``complete()`` / ``stream()`` so the contract "all
    provider calls go through BaseCompletionProvider" stays honest
    for any future driver whose ``_do_get_model_capabilities``
    does network I/O.  Same budget as completions: capability
    lookups consume a rate-limiter slot and are retried on
    retryable errors.

    Args:
        model: Model identifier.

    Returns:
        Static capability and cost information.

    Raises:
        InvalidRequestError: If model is blank.
        RetryExhaustedError: If all retries are exhausted.
    """
    self._validate_model(model)

    async def _attempt() -> ModelCapabilities:
        return await self._rate_limited_call(
            self._do_get_model_capabilities,
            model,
        )

    try:
        return await self._resilient_execute(_attempt)
    except MemoryError, RecursionError:
        raise
    except Exception as exc:
        # ``logger.exception`` would attach a traceback whose
        # frame-locals can leak provider credentials; use
        # ``logger.error`` with the structured ``error_type`` +
        # scrubbed ``error`` fields, mirroring ``complete()`` /
        # ``stream()``. ``record_provider_error`` keeps the
        # provider-error metric in sync with the other call paths
        # so dashboards do not under-count capability failures.
        logger.error(
            PROVIDER_CALL_ERROR,
            model=model,
            phase="get_model_capabilities",
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        record_provider_error(
            provider=self._provider_label(),
            model=model,
            error_class=classify_provider_error(exc),
        )
        raise

batch_get_capabilities async

batch_get_capabilities(models)

Fan out capability lookups across many models in parallel.

The default implementation runs get_model_capabilities per model concurrently via :class:asyncio.TaskGroup.

Per-model classification errors (model-not-found, validation, non-retryable provider errors) degrade to None entries so a single bad model id does not poison the whole batch.

RetryExhaustedError propagates: retry exhaustion is a signal that the provider is unhealthy, not a per-model classification issue. Surfacing it lets the caller decide whether to fail the whole list-models request or retry the batch later. MemoryError and RecursionError also propagate unchanged.

Subclasses that expose a cheaper bulk source (e.g. a static preset catalog) should override this to avoid the per-model round trip.

Source code in src/synthorg/providers/base.py
async def batch_get_capabilities(
    self,
    models: tuple[str, ...],
) -> Mapping[str, ModelCapabilities | None]:
    """Fan out capability lookups across many models in parallel.

    The default implementation runs ``get_model_capabilities`` per
    model concurrently via :class:`asyncio.TaskGroup`.

    Per-model classification errors (model-not-found, validation,
    non-retryable provider errors) degrade to ``None`` entries so a
    single bad model id does not poison the whole batch.

    ``RetryExhaustedError`` propagates: retry exhaustion is a
    signal that the provider is unhealthy, not a per-model
    classification issue. Surfacing it lets the caller decide
    whether to fail the whole list-models request or retry the
    batch later. ``MemoryError`` and ``RecursionError`` also
    propagate unchanged.

    Subclasses that expose a cheaper bulk source (e.g. a static
    preset catalog) should override this to avoid the per-model
    round trip.
    """
    if not models:
        return {}

    async def _one(m: str) -> tuple[str, ModelCapabilities | None]:
        try:
            return m, await self.get_model_capabilities(m)
        except MemoryError, RecursionError, RetryExhaustedError:
            raise
        except Exception as exc:
            logger.warning(
                PROVIDER_BATCH_CAPABILITIES_PARTIAL,
                model=m,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            return m, None

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(_one(m)) for m in models]
    return dict(t.result() for t in tasks)

compute_cost staticmethod

compute_cost(input_tokens, output_tokens, *, cost_per_1k_input, cost_per_1k_output)

Build a TokenUsage from raw token counts and per-1k rates.

Parameters:

Name Type Description Default
input_tokens int

Number of input tokens (must be >= 0).

required
output_tokens int

Number of output tokens (must be >= 0).

required
cost_per_1k_input float

Cost per 1,000 input tokens in the configured currency (finite and >= 0).

required
cost_per_1k_output float

Cost per 1,000 output tokens in the configured currency (finite and >= 0).

required

Returns:

Type Description
TokenUsage

Populated TokenUsage with computed cost.

Raises:

Type Description
InvalidRequestError

If any parameter is negative or non-finite.

Source code in src/synthorg/providers/base.py
@staticmethod
def compute_cost(
    input_tokens: int,
    output_tokens: int,
    *,
    cost_per_1k_input: float,
    cost_per_1k_output: float,
) -> TokenUsage:
    """Build a ``TokenUsage`` from raw token counts and per-1k rates.

    Args:
        input_tokens: Number of input tokens (must be >= 0).
        output_tokens: Number of output tokens (must be >= 0).
        cost_per_1k_input: Cost per 1,000 input tokens in the configured
            currency (finite and >= 0).
        cost_per_1k_output: Cost per 1,000 output tokens in the configured
            currency (finite and >= 0).

    Returns:
        Populated ``TokenUsage`` with computed cost.

    Raises:
        InvalidRequestError: If any parameter is negative or
            non-finite.
    """
    if input_tokens < 0:
        msg = "input_tokens must be non-negative"
        raise InvalidRequestError(
            msg,
            context={"input_tokens": input_tokens},
        )
    if output_tokens < 0:
        msg = "output_tokens must be non-negative"
        raise InvalidRequestError(
            msg,
            context={"output_tokens": output_tokens},
        )
    if cost_per_1k_input < 0 or not math.isfinite(cost_per_1k_input):
        msg = "cost_per_1k_input must be a finite non-negative number"
        raise InvalidRequestError(
            msg,
            context={"cost_per_1k_input": cost_per_1k_input},
        )
    if cost_per_1k_output < 0 or not math.isfinite(cost_per_1k_output):
        msg = "cost_per_1k_output must be a finite non-negative number"
        raise InvalidRequestError(
            msg,
            context={"cost_per_1k_output": cost_per_1k_output},
        )
    cost = (input_tokens / 1000) * cost_per_1k_input + (
        output_tokens / 1000
    ) * cost_per_1k_output
    return TokenUsage(
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        cost=round(cost, BUDGET_ROUNDING_PRECISION),
    )

Models

models

Provider-layer domain models for chat completion requests and responses.

ZERO_TOKEN_USAGE module-attribute

ZERO_TOKEN_USAGE = TokenUsage(input_tokens=0, output_tokens=0, cost=0.0)

Additive identity for TokenUsage.

TokenUsage pydantic-model

Bases: BaseModel

Token counts and cost for a single completion call.

This is the lightweight provider-layer record. The budget layer's synthorg.budget.CostRecord adds agent/task context around it.

Attributes:

Name Type Description
input_tokens int

Number of input (prompt) tokens.

output_tokens int

Number of output (completion) tokens.

total_tokens int

Sum of input and output tokens (computed).

cost float

Estimated cost in the configured currency for this call.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

input_tokens pydantic-field

input_tokens

Input token count

output_tokens pydantic-field

output_tokens

Output token count

cost pydantic-field

cost

Estimated cost in the configured currency

total_tokens property

total_tokens

Sum of input and output tokens.

ToolDefinition pydantic-model

Bases: BaseModel

Schema for a tool the model can invoke.

Uses raw JSON Schema for parameters_schema because every LLM provider consumes it natively.

Note

The parameters_schema dict is shallowly frozen by Pydantic's frozen=True -- field reassignment is prevented but nested contents can still be mutated in place. BaseTool.to_definition() provides a deep-copied schema, and ToolInvoker deep-copies arguments at the execution boundary, so no additional caller-side copying is needed for standard tool/provider workflows. Direct consumers outside these paths should deep-copy if they intend to modify the schema. See the tech stack page (docs/architecture/tech-stack.md).

Attributes:

Name Type Description
name NotBlankStr

Tool name.

description str

Human-readable description of the tool.

parameters_schema dict[str, Any]

JSON Schema dict describing the tool parameters.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_l1_name_matches

name pydantic-field

name

Tool name

description pydantic-field

description = ''

Tool description

parameters_schema pydantic-field

parameters_schema

JSON Schema for tool parameters

l1_metadata pydantic-field

l1_metadata = None

L1 always-in-context summary

l2_body pydantic-field

l2_body = None

L2 on-demand instruction body

l3_resources pydantic-field

l3_resources = ()

L3 explicit-request resources

ToolCall pydantic-model

Bases: BaseModel

A tool invocation requested by the model.

Note

The arguments dict is shallowly frozen by Pydantic's frozen=True -- field reassignment is prevented but nested contents can still be mutated in place. The ToolInvoker deep-copies arguments before passing them to tool implementations. See the tech stack page (docs/architecture/tech-stack.md).

Attributes:

Name Type Description
id NotBlankStr

Provider-assigned tool call identifier.

name NotBlankStr

Name of the tool to invoke.

arguments dict[str, Any]

Parsed arguments dict.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

id pydantic-field

id

Tool call identifier

name pydantic-field

name

Tool name

arguments pydantic-field

arguments

Tool arguments

ToolResult pydantic-model

Bases: BaseModel

Result of executing a tool call, sent back to the model.

Attributes:

Name Type Description
tool_call_id NotBlankStr

The ToolCall.id this result corresponds to.

content str

String content returned by the tool.

is_error bool

Whether the tool execution failed.

is_timeout bool

Whether the tool execution timed out specifically (a stricter form of is_error). Lets the metric layer distinguish a tool that hit its time budget from one that returned a deterministic error so dashboards don't conflate the two.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_timeout_implies_error

tool_call_id pydantic-field

tool_call_id

Matching tool call ID

content pydantic-field

content

Tool output content

is_error pydantic-field

is_error = False

Whether tool errored

is_timeout pydantic-field

is_timeout = False

Whether tool errored due to timeout specifically

ChatMessage pydantic-model

Bases: BaseModel

A single message in a chat completion conversation.

Attributes:

Name Type Description
role MessageRole

Message role (system, user, assistant, tool).

content str | None

Text content of the message.

tool_calls tuple[ToolCall, ...]

Tool calls requested by the assistant (assistant only).

tool_result ToolResult | None

Result of a tool execution (tool role only).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_role_constraints

role pydantic-field

role

Message role

content pydantic-field

content = None

Text content

tool_calls pydantic-field

tool_calls = ()

Tool calls (assistant messages only)

tool_result pydantic-field

tool_result = None

Tool result (tool messages only)

CompletionConfig pydantic-model

Bases: BaseModel

Optional parameters for a completion request.

All fields are optional -- the provider fills in defaults.

Attributes:

Name Type Description
temperature float | None

Sampling temperature (0.0-2.0). Actual valid range may vary by provider.

max_tokens int | None

Maximum tokens to generate.

stop_sequences tuple[str, ...]

Sequences that stop generation.

top_p float

Nucleus sampling threshold.

timeout float | None

Request timeout in seconds.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

temperature pydantic-field

temperature = None

Sampling temperature

max_tokens pydantic-field

max_tokens = None

Maximum tokens to generate

stop_sequences pydantic-field

stop_sequences = ()

Stop sequences

top_p pydantic-field

top_p = 1.0

Nucleus-sampling threshold. Defaults to 1.0 (full distribution, no truncation) so every completion call has an explicit deterministic value without each site having to repeat it. Override when the prompt class needs a custom value alongside temperature.

timeout pydantic-field

timeout = None

Request timeout in seconds

CompletionResponse pydantic-model

Bases: BaseModel

Result of a non-streaming completion call.

Attributes:

Name Type Description
content str | None

Generated text content (may be None for tool-use-only responses).

tool_calls tuple[ToolCall, ...]

Tool calls the model wants to execute.

finish_reason FinishReason

Why the model stopped generating.

usage TokenUsage

Token usage and cost breakdown.

model NotBlankStr

Model identifier that served the request.

provider_request_id str | None

Provider-assigned request ID for debugging.

provider_metadata dict[str, object]

Provider metadata injected by the base class (_synthorg_* keys for latency, retry count, retry reason).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_has_output

content pydantic-field

content = None

Generated text

tool_calls pydantic-field

tool_calls = ()

Requested tool calls

finish_reason pydantic-field

finish_reason

Reason generation stopped

usage pydantic-field

usage

Token usage breakdown

model pydantic-field

model

Model that served the request

provider_request_id pydantic-field

provider_request_id = None

Provider request ID

provider_metadata pydantic-field

provider_metadata

Provider metadata injected by the base class (synthorg* keys).

StreamChunk pydantic-model

Bases: BaseModel

A single chunk from a streaming completion response.

The event_type discriminator determines which optional fields are populated.

Attributes:

Name Type Description
event_type StreamEventType

Type of stream event.

content str | None

Text delta (for content_delta).

tool_call_delta ToolCall | None

Tool call received during streaming (for tool_call_delta).

usage TokenUsage | None

Final token usage (for usage event).

error_message str | None

Error description (for error event).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_event_fields

event_type pydantic-field

event_type

Stream event type

content pydantic-field

content = None

Text delta

tool_call_delta pydantic-field

tool_call_delta = None

Tool call received during streaming

usage pydantic-field

usage = None

Final token usage

error_message pydantic-field

error_message = None

Error description

add_token_usage

add_token_usage(a, b)

Create a new TokenUsage with summed token counts and cost.

Parameters:

Name Type Description Default
a TokenUsage

First usage record.

required
b TokenUsage

Second usage record.

required

Returns:

Type Description
TokenUsage

New TokenUsage with summed token counts and cost

TokenUsage

(total_tokens is computed automatically).

Source code in src/synthorg/providers/models.py
def add_token_usage(a: TokenUsage, b: TokenUsage) -> TokenUsage:
    """Create a new ``TokenUsage`` with summed token counts and cost.

    Args:
        a: First usage record.
        b: Second usage record.

    Returns:
        New ``TokenUsage`` with summed token counts and cost
        (``total_tokens`` is computed automatically).
    """
    return TokenUsage(
        input_tokens=a.input_tokens + b.input_tokens,
        output_tokens=a.output_tokens + b.output_tokens,
        cost=a.cost + b.cost,
    )

Enums

enums

Provider-layer enumerations.

AuthType

Bases: StrEnum

Authentication type for an LLM provider.

MessageRole

Bases: StrEnum

Role of a message participant in a chat completion.

FinishReason

Bases: StrEnum

Reason the model stopped generating tokens.

StreamEventType

Bases: StrEnum

Discriminator for streaming response chunks.

Errors

errors

Provider error hierarchy.

Every provider error carries a is_retryable flag so retry logic can decide whether to attempt again without inspecting concrete exception types.

ProviderErrorLabel module-attribute

ProviderErrorLabel = Literal[
    "rate_limit",
    "timeout",
    "connection",
    "internal",
    "invalid_request",
    "auth",
    "content_filter",
    "not_found",
    "other",
]

Bounded Prometheus label value returned by :func:classify_provider_error.

Kept in lockstep with :data:synthorg.observability.prometheus_labels.VALID_PROVIDER_ERROR_CLASSES by the record helper; updating either requires updating both.

ProviderLifecycleConflictError

ProviderLifecycleConflictError(message=None)

Bases: ConflictError

Raised when ProviderHealthProber.start() is called after a timed-out stop.

Mirrors :class:BackupUnrestartableError -- a stuck drain leaves the prober's loop alive on the original instance, so the canonical lifecycle pattern marks the prober unrestartable rather than layering a second loop on top of an orphan task.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

ProviderError

ProviderError(message, *, context=None)

Bases: DomainError

Base exception for all provider-layer errors.

Attributes:

Name Type Description
message

Human-readable error description.

context MappingProxyType[str, Any]

Immutable metadata about the error (provider, model, etc.).

is_retryable bool

Whether the caller should retry the request.

Class Attributes

status_code: HTTP 502 Bad Gateway (upstream failure). error_code: RFC 9457 error code; subclasses override. error_category: PROVIDER_ERROR. retryable: Alias of is_retryable for the exception handler. default_message: Generic message safe for 5xx scrubbing.

Note

When converted to string, sensitive context keys (api_key, token, secret, password, authorization) are automatically redacted regardless of casing.

Initialize a provider error.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
context dict[str, Any] | None

Arbitrary metadata about the error. Stored as an immutable mapping; defaults to empty if not provided.

None
Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

__str__

__str__()

Format error with optional context metadata.

Sensitive keys (api_key, token, etc.) are redacted to prevent accidental secret leakage in logs and tracebacks.

Source code in src/synthorg/providers/errors.py
def __str__(self) -> str:
    """Format error with optional context metadata.

    Sensitive keys (api_key, token, etc.) are redacted to prevent
    accidental secret leakage in logs and tracebacks.
    """
    if self.context:
        ctx = ", ".join(
            f"{k}='***'" if _is_sensitive_key(k) else f"{k}={v!r}"
            for k, v in self.context.items()
        )
        return f"{self.message} ({ctx})"
    return self.message

AuthenticationError

AuthenticationError(message, *, context=None)

Bases: ProviderError

Invalid or missing API credentials.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

RateLimitError

RateLimitError(message, *, retry_after=None, context=None)

Bases: ProviderError

Provider rate limit exceeded.

Initialize a rate limit error.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
retry_after float | None

Seconds to wait before retrying, if provided by the provider.

None
context dict[str, Any] | None

Arbitrary metadata about the error.

None
Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    retry_after: float | None = None,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a rate limit error.

    Args:
        message: Human-readable error description.
        retry_after: Seconds to wait before retrying, if provided
            by the provider.
        context: Arbitrary metadata about the error.
    """
    if retry_after is not None and (
        retry_after < 0 or not math.isfinite(retry_after)
    ):
        msg = "retry_after must be a finite non-negative number"
        raise ValueError(msg)
    self.retry_after = retry_after
    super().__init__(message, context=context)

ModelNotFoundError

ModelNotFoundError(message, *, context=None)

Bases: ProviderError

Requested model does not exist or is not available.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

InvalidRequestError

InvalidRequestError(message, *, context=None)

Bases: ProviderError

Malformed request (bad parameters, too many tokens, etc.).

The HTTP status stays at 422 (the provider rejected the request as invalid), but the RFC 9457 error_category is PROVIDER_ERROR to match the 7xxx error_code prefix -- the underlying signal originates from the upstream provider, not from local boundary validation. DomainError.__init_subclass__ enforces the prefix-vs-category alignment at class-definition time.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ContentFilterError

ContentFilterError(message, *, context=None)

Bases: ProviderError

Request or response blocked by the provider's content filter.

Same prefix-vs-category alignment fix as :class:InvalidRequestError: the 7xxx error_code keeps its semantic PROVIDER_ERROR category; HTTP status stays 422.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ProviderTimeoutError

ProviderTimeoutError(message, *, context=None)

Bases: ProviderError

Request timed out waiting for provider response.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ProviderConnectionError

ProviderConnectionError(message, *, context=None)

Bases: ProviderError

Network-level failure connecting to the provider.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ProviderInternalError

ProviderInternalError(message, *, context=None)

Bases: ProviderError

Provider returned a server-side error (5xx).

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

DriverNotRegisteredError

DriverNotRegisteredError(message, *, context=None)

Bases: ProviderError

Requested provider driver is not registered in the registry.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

DriverAlreadyRegisteredError

DriverAlreadyRegisteredError(message, *, context=None)

Bases: ProviderError

A driver with this name is already registered.

Reserved for future use if the registry gains mutable operations (add/remove after construction). Not currently raised.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

DriverFactoryNotFoundError

DriverFactoryNotFoundError(message, *, context=None)

Bases: ProviderError

No factory found for the requested driver type string.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ProviderAlreadyExistsError

ProviderAlreadyExistsError(message, *, context=None)

Bases: ProviderError

A provider with this name already exists.

409 Conflict: provider name uniqueness violation, not a 502 upstream failure. Override the parent's 502 default so the domain handler maps directly without a controller-level catch + re-raise as ConflictError.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ProviderNotFoundError

ProviderNotFoundError(message, *, context=None)

Bases: ProviderError

A provider with this name does not exist.

404 Not Found: provider does not exist locally, not a 502 upstream failure. Override the parent's 502 default so the domain handler maps directly.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ProviderModelNotFoundError

ProviderModelNotFoundError(message, *, context=None)

Bases: ProviderError

A model identifier does not exist on the provider.

404 Not Found: distinct from ProviderNotFoundError (the whole provider is missing) and from ProviderValidationError (the request shape is wrong). Lets the API controller route missing- model errors to HTTP 404 without parsing free-form validation text.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

ProviderValidationError

ProviderValidationError(message, *, context=None)

Bases: ProviderError

Provider configuration failed validation.

422 Unprocessable Entity: input shape is wrong, not a 502 upstream failure. Override the parent's 502 default so the domain handler maps directly.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    # Deep-copy so nested lists/dicts in ``context`` cannot be
    # mutated by the caller after the exception is raised /
    # logged: ``MappingProxyType`` only freezes the outer mapping.
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        copy.deepcopy(dict(context)) if context else {},
    )
    super().__init__(message)

classify_provider_error

classify_provider_error(exc)

Classify exc into one of nine bounded Prometheus label values.

Falls back to "other" for any exception not in the direct canonical map, which guarantees the label set in :data:VALID_PROVIDER_ERROR_CLASSES stays finite even as driver implementations add new error types.

Uses a direct-type lookup first (cheapest), then falls back to isinstance for the hierarchy so subclasses of the canonical provider-error types are bucketed with their parents. Any ProviderError subclass that is not in the direct map (e.g. DriverNotRegisteredError, ProviderValidationError) and unknown (non-ProviderError) exception types both resolve to "other"; the Prometheus label set therefore stays bounded regardless of what the provider driver raises.

Returns:

Type Description
ProviderErrorLabel

One of the :data:ProviderErrorLabel literal values; the

ProviderErrorLabel

return type gives static guarantees to callers (e.g. the

ProviderErrorLabel

Prometheus collector's record_provider_error) that only

ProviderErrorLabel

allowlisted labels flow through.

Source code in src/synthorg/providers/errors.py
def classify_provider_error(exc: BaseException) -> ProviderErrorLabel:
    """Classify *exc* into one of nine bounded Prometheus label values.

    Falls back to ``"other"`` for any exception not in the direct
    canonical map, which guarantees the label set in
    :data:`VALID_PROVIDER_ERROR_CLASSES` stays finite even as driver
    implementations add new error types.

    Uses a direct-type lookup first (cheapest), then falls back to
    ``isinstance`` for the hierarchy so subclasses of the canonical
    provider-error types are bucketed with their parents.  Any
    ``ProviderError`` subclass that is not in the direct map (e.g.
    ``DriverNotRegisteredError``, ``ProviderValidationError``) and
    unknown (non-``ProviderError``) exception types both resolve to
    ``"other"``; the Prometheus label set therefore stays bounded
    regardless of what the provider driver raises.

    Returns:
        One of the :data:`ProviderErrorLabel` literal values; the
        return type gives static guarantees to callers (e.g. the
        Prometheus collector's ``record_provider_error``) that only
        allowlisted labels flow through.
    """
    exc_type = type(exc)
    direct = _ERROR_CLASS_MAP.get(exc_type)
    if direct is not None:
        return direct
    for cls, label in _ERROR_CLASS_MAP.items():
        if isinstance(exc, cls):
            return label
    return "other"

Capabilities

capabilities

Model capability descriptors for provider routing decisions.

ModelCapabilities pydantic-model

Bases: BaseModel

Static capability and cost metadata for a single LLM model.

Used by the routing layer to decide which model handles a request based on required features (tools, vision, streaming) and cost.

Attributes:

Name Type Description
model_id NotBlankStr

Provider model identifier (e.g. "example-large-001").

provider NotBlankStr

Provider name (e.g. "example-provider").

max_context_tokens int

Maximum context window size in tokens.

max_output_tokens int

Maximum output tokens per request.

supports_tools bool

Whether the model supports tool/function calling.

supports_vision bool

Whether the model accepts image inputs.

supports_streaming bool

Whether the model supports streaming responses.

supports_streaming_tool_calls bool

Whether tool calls can be streamed.

supports_system_messages bool

Whether system messages are accepted.

cost_per_1k_input float

Cost per 1 000 input tokens, denominated in the currency declared by the source that populated this capability record -- either the provider preset (whose prices the operator is responsible for keeping aligned with budget.currency) or the upstream model database. The routing layer compares values across models from the same source only; cross-source comparisons require the operator to reconcile currencies.

cost_per_1k_output float

Cost per 1 000 output tokens, same currency semantics as cost_per_1k_input.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_cross_field_constraints

model_id pydantic-field

model_id

Model identifier

provider pydantic-field

provider

Provider name

max_context_tokens pydantic-field

max_context_tokens

Max context window tokens

max_output_tokens pydantic-field

max_output_tokens

Max output tokens per request

supports_tools pydantic-field

supports_tools = False

Supports tool calling

supports_vision pydantic-field

supports_vision = False

Supports image inputs

supports_streaming pydantic-field

supports_streaming = True

Supports streaming responses

supports_streaming_tool_calls pydantic-field

supports_streaming_tool_calls = False

Supports streaming tool calls

supports_system_messages pydantic-field

supports_system_messages = True

Supports system messages

cost_per_1k_input pydantic-field

cost_per_1k_input

Cost per 1k input tokens in the pricing currency declared by the capability source (provider preset or upstream model database); operators must keep preset pricing aligned with budget.currency

cost_per_1k_output pydantic-field

cost_per_1k_output

Cost per 1k output tokens; same currency semantics as cost_per_1k_input

Registry

registry

Provider registry -- the Employment Agency.

Maps provider names to concrete BaseCompletionProvider driver instances. Built from config via from_config, which reads each provider's driver field to select the appropriate factory.

ProviderRegistry

ProviderRegistry(drivers, *, cassette_session=None)

Immutable registry of named provider drivers.

Use from_config to build a registry from a config dict, or construct directly with a pre-built mapping.

Examples:

Build from config::

registry = ProviderRegistry.from_config(
    root_config.providers,
)
driver = registry.get("example-provider")
response = await driver.complete(messages, "medium")

Check membership::

if "example-provider" in registry:
    ...

Initialize with a name -> driver mapping.

Parameters:

Name Type Description Default
drivers dict[str, BaseCompletionProvider]

Mutable dict of provider name to driver instance. The registry takes ownership and freezes a copy.

required
cassette_session CassetteSession | None

The shared cassette session when the cassette seam is active, else None. Exposed so an app shutdown hook can emit the session-flushed event; data durability does not depend on it (the session persists after every recorded interaction).

None
Source code in src/synthorg/providers/registry.py
def __init__(
    self,
    drivers: dict[str, BaseCompletionProvider],
    *,
    cassette_session: CassetteSession | None = None,
) -> None:
    """Initialize with a name -> driver mapping.

    Args:
        drivers: Mutable dict of provider name to driver instance.
            The registry takes ownership and freezes a copy.
        cassette_session: The shared cassette session when the
            cassette seam is active, else ``None``. Exposed so an
            app shutdown hook can emit the session-flushed event;
            data durability does not depend on it (the session
            persists after every recorded interaction).
    """
    self._drivers: MappingProxyType[str, BaseCompletionProvider] = MappingProxyType(
        dict(drivers)
    )
    self._cassette_session = cassette_session

cassette_session property

cassette_session

The active cassette session, or None when inert.

get

get(name)

Look up a driver by provider name.

Parameters:

Name Type Description Default
name str

Provider name (e.g. "example-provider").

required

Returns:

Type Description
BaseCompletionProvider

The registered driver instance.

Raises:

Type Description
DriverNotRegisteredError

If no driver is registered.

Source code in src/synthorg/providers/registry.py
def get(self, name: str) -> BaseCompletionProvider:
    """Look up a driver by provider name.

    Args:
        name: Provider name (e.g. ``"example-provider"``).

    Returns:
        The registered driver instance.

    Raises:
        DriverNotRegisteredError: If no driver is registered.
    """
    driver = self._drivers.get(name)
    if driver is None:
        available = sorted(self._drivers) or ["(none)"]
        logger.error(
            PROVIDER_DRIVER_NOT_REGISTERED,
            name=name,
            available=available,
        )
        msg = (
            f"Provider {name!r} is not registered. "
            f"Available providers: {', '.join(available)}"
        )
        raise DriverNotRegisteredError(
            msg,
            context={"provider": name},
        )
    return driver

list_providers

list_providers()

Return sorted tuple of registered provider names.

Source code in src/synthorg/providers/registry.py
def list_providers(self) -> tuple[str, ...]:
    """Return sorted tuple of registered provider names."""
    return tuple(sorted(self._drivers))

__contains__

__contains__(name)

Check whether a provider name is registered.

Source code in src/synthorg/providers/registry.py
def __contains__(self, name: object) -> bool:
    """Check whether a provider name is registered."""
    try:
        return name in self._drivers
    except TypeError:
        return False

__len__

__len__()

Return the number of registered providers.

Source code in src/synthorg/providers/registry.py
def __len__(self) -> int:
    """Return the number of registered providers."""
    return len(self._drivers)

from_config classmethod

from_config(providers, *, factory_overrides=None, cassette=None)

Build a registry from a provider config dict.

For each provider, reads the driver field to select a factory. The factory is called with (provider_name, config) to produce a driver instance.

When cassette is active every driver is wrapped in a :class:CassetteCompletionProvider sharing one session -- the single provider-layer chokepoint, so no consumer (engine, coordinator, judge, runtime builder) can bypass record/replay. In replay mode the inner driver is not built at all: no factory is called, so a pure replay run constructs no real provider.

Parameters:

Name Type Description Default
providers Mapping[str, ProviderConfig]

Provider config dict (key = provider name).

required
factory_overrides dict[str, object] | None

Optional driver-type -> factory mapping for testing or native SDK swaps.

None
cassette CassetteConfig | None

Cassette configuration; None or off leaves the registry holding the concrete drivers unchanged.

None

Returns:

Type Description
Self

A new ProviderRegistry with all providers registered.

Raises:

Type Description
DriverFactoryNotFoundError

If a provider's driver does not match any known factory.

Source code in src/synthorg/providers/registry.py
@classmethod
def from_config(
    cls,
    providers: Mapping[str, ProviderConfig],
    *,
    factory_overrides: dict[str, object] | None = None,
    cassette: CassetteConfig | None = None,
) -> Self:
    """Build a registry from a provider config dict.

    For each provider, reads the ``driver`` field to select a
    factory.  The factory is called with
    ``(provider_name, config)`` to produce a driver instance.

    When ``cassette`` is active every driver is wrapped in a
    :class:`CassetteCompletionProvider` sharing one session -- the
    single provider-layer chokepoint, so no consumer (engine,
    coordinator, judge, runtime builder) can bypass record/replay.
    In replay mode the inner driver is **not built at all**: no
    factory is called, so a pure replay run constructs no real
    provider.

    Args:
        providers: Provider config dict (key = provider name).
        factory_overrides: Optional driver-type -> factory
            mapping for testing or native SDK swaps.
        cassette: Cassette configuration; ``None`` or ``off``
            leaves the registry holding the concrete drivers
            unchanged.

    Returns:
        A new ``ProviderRegistry`` with all providers registered.

    Raises:
        DriverFactoryNotFoundError: If a provider's ``driver``
            does not match any known factory.
    """
    overrides = factory_overrides or {}

    if cassette is not None and cassette.is_active:
        from .cassette import CassetteMode  # noqa: PLC0415

        if cassette.mode is CassetteMode.REPLAY:
            # Pure replay builds no inner driver, so no concrete
            # driver factory is ever called. Skip importing the
            # driver SDKs entirely: ``litellm`` is an optional
            # dependency a replay-only environment need not have,
            # and importing it here would break the pure-replay
            # contract for no benefit.
            return cls._build_cassette_registry(
                providers,
                {},
                overrides,
                cassette,
            )

    from .drivers.litellm_driver import (  # noqa: PLC0415
        LiteLLMDriver,
    )
    from .drivers.scripted import ScriptedDriver  # noqa: PLC0415

    defaults: dict[str, type[BaseCompletionProvider]] = {
        "litellm": LiteLLMDriver,
        "scripted": ScriptedDriver,
    }

    if cassette is not None and cassette.is_active:
        return cls._build_cassette_registry(
            providers,
            defaults,
            overrides,
            cassette,
        )

    drivers: dict[str, BaseCompletionProvider] = {}
    for name, config in providers.items():
        drivers[name] = _build_driver(name, config, defaults, overrides)

    logger.info(
        PROVIDER_REGISTRY_BUILT,
        provider_count=len(drivers),
        providers=sorted(drivers),
    )
    return cls(drivers)

LiteLLM Driver

litellm_driver

LiteLLM-backed completion driver.

Wraps litellm.acompletion behind the BaseCompletionProvider contract, mapping between domain models and LiteLLM's chat-completion API.

LiteLLMDriver

LiteLLMDriver(provider_name, config, *, connection_catalog=None, clock=None)

Bases: BaseCompletionProvider

Completion driver backed by LiteLLM.

Uses litellm.acompletion for both streaming and non-streaming calls. Model identifiers are prefixed with the LiteLLM routing key (litellm_provider if set, otherwise the provider name -- e.g. example-provider/example-medium-001) so LiteLLM routes to the correct backend.

Parameters:

Name Type Description Default
provider_name str

Provider key from config (e.g. "example-provider").

required
config ProviderConfig

Provider configuration including API key, base URL, and model definitions.

required

Raises:

Type Description
ProviderError

All LiteLLM exceptions are mapped to the ProviderError hierarchy via _map_exception.

Source code in src/synthorg/providers/drivers/litellm_driver.py
def __init__(
    self,
    provider_name: str,
    config: ProviderConfig,
    *,
    connection_catalog: Any | None = None,
    clock: Clock | None = None,
) -> None:
    retry_handler = (
        RetryHandler(config.retry) if config.retry.max_retries > 0 else None
    )
    rate_limiter = RateLimiter(
        config.rate_limiter,
        provider_name=provider_name,
    )
    super().__init__(
        retry_handler=retry_handler,
        rate_limiter=rate_limiter if rate_limiter.is_enabled else None,
    )
    self._provider_name = provider_name
    self._config = config
    self._connection_catalog = connection_catalog
    self._clock: Clock = clock if clock is not None else SystemClock()
    self._resolved_credentials: dict[str, str] | None = None
    # Cached credentials expire after ``_CREDENTIAL_CACHE_TTL`` so
    # rotating/OAuth tokens are re-fetched from the catalog
    # periodically instead of being pinned for the lifetime of
    # the driver. The TTL is intentionally coarse -- it is a safety
    # net for rotation, not a token-refresh mechanism.
    self._credentials_cached_at: float | None = None
    self._model_lookup: MappingProxyType[str, ProviderModelConfig] = (
        MappingProxyType(self._build_model_lookup(config.models))
    )
    self._routing_key = config.litellm_provider or provider_name

batch_get_capabilities async

batch_get_capabilities(models)

Resolve capabilities for many models in a single tight loop.

Overrides the base implementation: each capability is built from the static preset catalog plus a LiteLLM model-info lookup, all of which is synchronous and in-process. Per-model failures (unknown ids, validation errors) collapse to None entries; MemoryError and RecursionError propagate.

Source code in src/synthorg/providers/drivers/litellm_driver.py
async def batch_get_capabilities(
    self,
    models: tuple[str, ...],
) -> Mapping[str, ModelCapabilities | None]:
    """Resolve capabilities for many models in a single tight loop.

    Overrides the base implementation: each capability is built
    from the static preset catalog plus a LiteLLM model-info
    lookup, all of which is synchronous and in-process. Per-model
    failures (unknown ids, validation errors) collapse to ``None``
    entries; ``MemoryError`` and ``RecursionError`` propagate.
    """
    results: dict[str, ModelCapabilities | None] = {}
    for model in models:
        # Skip _resolve_model() because its miss path emits
        # PROVIDER_MODEL_NOT_FOUND at ERROR; an expected partial
        # miss in a batch lookup must not be recorded as a failed
        # request. Read the lookup directly and degrade silently to
        # ``None`` (the partial-failure event is reserved for real
        # capability-build errors below).
        model_config = self._model_lookup.get(model)
        if model_config is None:
            results[model] = None
            continue
        try:
            results[model] = self._build_capabilities(model_config)
        except MemoryError, RecursionError:
            raise
        except Exception as exc:
            logger.warning(
                PROVIDER_BATCH_CAPABILITIES_PARTIAL,
                provider=self._provider_name,
                model=model,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            results[model] = None
    # Return a read-only view so callers cannot mutate the batch
    # snapshot in place (matches the immutability pattern used by
    # ``_model_lookup`` itself).
    return MappingProxyType(results)

Routing

models

Domain models for the routing engine.

ResolvedModel pydantic-model

Bases: BaseModel

A fully resolved model reference.

Attributes:

Name Type Description
provider_name NotBlankStr

Provider that owns this model (e.g. "acme-provider").

model_id NotBlankStr

Concrete model identifier (e.g. "acme-large-001").

alias NotBlankStr | None

Short alias used in routing rules, if any.

cost_per_1k_input float

Cost per 1,000 input tokens in the configured currency.

cost_per_1k_output float

Cost per 1,000 output tokens in the configured currency.

max_context int

Maximum context window size in tokens.

estimated_latency_ms int | None

Estimated median latency in milliseconds.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

provider_name pydantic-field

provider_name

Provider name

model_id pydantic-field

model_id

Model identifier

alias pydantic-field

alias = None

Short alias

cost_per_1k_input pydantic-field

cost_per_1k_input = 0.0

Cost per 1k input tokens in the configured currency

cost_per_1k_output pydantic-field

cost_per_1k_output = 0.0

Cost per 1k output tokens in the configured currency

max_context pydantic-field

max_context = 200000

Maximum context window size in tokens

estimated_latency_ms pydantic-field

estimated_latency_ms = None

Estimated median latency in milliseconds

total_cost_per_1k property

total_cost_per_1k

Total cost per 1 000 tokens (input + output).

RoutingRequest pydantic-model

Bases: BaseModel

Inputs to a routing decision.

Not all fields are used by every strategy:

  • ManualStrategy requires model_override.
  • RoleBasedStrategy requires agent_level.
  • CostAwareStrategy uses task_type and remaining_budget.
  • FastestStrategy uses task_type and remaining_budget.
  • SmartStrategy uses all fields in priority order.

Attributes:

Name Type Description
agent_level SeniorityLevel | None

Seniority level of the requesting agent.

task_type NotBlankStr | None

Task type label (e.g. "development").

model_override NotBlankStr | None

Explicit model reference for manual routing.

remaining_budget float | None

Per-request cost ceiling. Compared against each model's total_cost_per_1k (i.e. cost_per_1k_input + cost_per_1k_output) to filter models that exceed this threshold. This is not a total session budget -- use the budget module for that.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_level pydantic-field

agent_level = None

Seniority level of the requesting agent

task_type pydantic-field

task_type = None

Task type label

model_override pydantic-field

model_override = None

Explicit model reference for manual routing

remaining_budget pydantic-field

remaining_budget = None

Per-request cost ceiling in the configured currency, compared against model total_cost_per_1k. Not a total session budget.

RoutingDecision pydantic-model

Bases: BaseModel

Output of a routing decision.

Attributes:

Name Type Description
resolved_model ResolvedModel

The chosen model.

strategy_used NotBlankStr

Name of the strategy that produced this decision.

reason NotBlankStr

Human-readable explanation.

fallbacks_tried tuple[str, ...]

Model refs that were tried before the final choice.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

resolved_model pydantic-field

resolved_model

The chosen model

strategy_used pydantic-field

strategy_used

Strategy name

reason pydantic-field

reason

Human-readable explanation

fallbacks_tried pydantic-field

fallbacks_tried = ()

Model refs tried before the final choice

router

Model router -- main entry point for routing decisions.

Constructed from RoutingConfig and a provider config dict. Delegates to strategy implementations.

ModelRouter

ModelRouter(routing_config, providers, *, selector=None)

Route requests to the appropriate LLM model.

Examples:

Build from config::

router = ModelRouter(
    routing_config=root_config.routing,
    providers=root_config.providers,
)
decision = router.route(
    RoutingRequest(agent_level=SeniorityLevel.SENIOR),
)

Initialize the router.

Parameters:

Name Type Description Default
routing_config RoutingConfig

Routing configuration (strategy, rules, fallback).

required
providers dict[str, ProviderConfig]

Provider configurations keyed by provider name.

required
selector ModelCandidateSelector | None

Optional candidate selector for multi-provider model resolution. Defaults to QuotaAwareSelector().

None

Raises:

Type Description
UnknownRoutingStrategyError

If the configured strategy is not recognized.

Source code in src/synthorg/providers/routing/router.py
def __init__(
    self,
    routing_config: RoutingConfig,
    providers: dict[str, ProviderConfig],
    *,
    selector: ModelCandidateSelector | None = None,
) -> None:
    """Initialize the router.

    Args:
        routing_config: Routing configuration (strategy, rules, fallback).
        providers: Provider configurations keyed by provider name.
        selector: Optional candidate selector for multi-provider
            model resolution.  Defaults to ``QuotaAwareSelector()``.

    Raises:
        UnknownRoutingStrategyError: If the configured strategy is not recognized.
    """
    self._config = routing_config
    self._resolver = ModelResolver.from_config(
        providers,
        selector=selector,
    )

    strategy_name = routing_config.strategy
    strategy = STRATEGY_MAP.get(strategy_name)
    if strategy is None:
        logger.error(
            ROUTING_STRATEGY_UNKNOWN,
            strategy=strategy_name,
            available=sorted(STRATEGY_MAP),
        )
        msg = (
            f"Unknown routing strategy {strategy_name!r}. "
            f"Available: {sorted(STRATEGY_MAP)}"
        )
        raise UnknownRoutingStrategyError(
            msg,
            context={"strategy": strategy_name},
        )
    self._strategy = strategy

    logger.info(
        ROUTING_ROUTER_BUILT,
        strategy_configured=strategy_name,
        strategy=self._strategy.name,
        rule_count=len(routing_config.rules),
        fallback_count=len(routing_config.fallback_chain),
    )

resolver property

resolver

Return the underlying model resolver.

strategy_name property

strategy_name

Return the active strategy name.

route

route(request)

Route a request to a model.

Parameters:

Name Type Description Default
request RoutingRequest

Routing inputs.

required

Returns:

Type Description
RoutingDecision

A routing decision with the chosen model.

Raises:

Type Description
ModelResolutionError

If a required model cannot be found.

NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/router.py
def route(self, request: RoutingRequest) -> RoutingDecision:
    """Route a request to a model.

    Args:
        request: Routing inputs.

    Returns:
        A routing decision with the chosen model.

    Raises:
        ModelResolutionError: If a required model cannot be found.
        NoAvailableModelError: If all candidates are exhausted.
    """
    try:
        decision = self._strategy.select(
            request,
            self._config,
            self._resolver,
        )
    except RoutingError as exc:
        logger.warning(
            ROUTING_SELECTION_FAILED,
            strategy=self._strategy.name,
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
            agent_level=(
                request.agent_level.value
                if request.agent_level is not None
                else None
            ),
            task_type=request.task_type,
            model_override=request.model_override,
        )
        raise
    logger.info(
        ROUTING_DECISION_MADE,
        strategy=decision.strategy_used,
        provider=decision.resolved_model.provider_name,
        model=decision.resolved_model.model_id,
        reason=decision.reason,
        fallbacks_tried=decision.fallbacks_tried,
    )
    return decision

strategies

Routing strategies -- stateless implementations of RoutingStrategy.

Each strategy selects a model given a RoutingRequest, a RoutingConfig, and a ModelResolver. Strategies are stateless singletons registered in a module-level mapping.

STRATEGY_MAP module-attribute

STRATEGY_MAP = MappingProxyType(
    {
        STRATEGY_NAME_MANUAL: _MANUAL,
        STRATEGY_NAME_ROLE_BASED: _ROLE_BASED,
        STRATEGY_NAME_COST_AWARE: _COST_AWARE,
        STRATEGY_NAME_FASTEST: _FASTEST,
        STRATEGY_NAME_SMART: _SMART,
        STRATEGY_NAME_CHEAPEST: _COST_AWARE,
    }
)

Maps config strategy names to singleton instances.

RoutingStrategy

Bases: Protocol

Protocol for model routing strategies.

name property

name

Strategy name (matches config value).

select

select(request, config, resolver)

Select a model for the given request.

Parameters:

Name Type Description Default
request RoutingRequest

Routing inputs (agent level, task type, etc.).

required
config RoutingConfig

Routing configuration (rules, fallback chain).

required
resolver ModelResolver

Model resolver for alias/ID lookup.

required

Returns:

Type Description
RoutingDecision

A routing decision with the chosen model.

Raises:

Type Description
ModelResolutionError

If the requested model cannot be found.

NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select a model for the given request.

    Args:
        request: Routing inputs (agent level, task type, etc.).
        config: Routing configuration (rules, fallback chain).
        resolver: Model resolver for alias/ID lookup.

    Returns:
        A routing decision with the chosen model.

    Raises:
        ModelResolutionError: If the requested model cannot be found.
        NoAvailableModelError: If all candidates are exhausted.
    """
    ...

ManualStrategy

Resolve an explicit model override.

Requires request.model_override to be set.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select the explicitly requested model.

Raises:

Type Description
ModelResolutionError

If model_override is not set or the model cannot be resolved.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,  # noqa: ARG002
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select the explicitly requested model.

    Raises:
        ModelResolutionError: If ``model_override`` is not set or
            the model cannot be resolved.
    """
    if request.model_override is None:
        logger.warning(
            ROUTING_NO_RULE_MATCHED,
            strategy=self.name,
            reason="model_override not set",
        )
        msg = "ManualStrategy requires model_override to be set"
        raise ModelResolutionError(msg)

    model = resolver.resolve(request.model_override)
    return RoutingDecision(
        resolved_model=model,
        strategy_used=self.name,
        reason=f"Explicit override: {request.model_override}",
    )

RoleBasedStrategy

Select model based on agent seniority level.

Matches the first routing rule where rule.role_level equals request.agent_level. If no rule matches, uses the seniority catalog's typical_model_tier as a fallback lookup.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select model based on role level.

Raises:

Type Description
ModelResolutionError

If no agent_level is set.

NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select model based on role level.

    Raises:
        ModelResolutionError: If no agent_level is set.
        NoAvailableModelError: If all candidates are exhausted.
    """
    level = self._require_level(request)
    return (
        self._try_rule_match(level, config, resolver)
        or self._try_seniority(level, config, resolver)
        or self._raise_no_available(level, config)
    )

CostAwareStrategy

Select the cheapest model, optionally respecting a budget.

Matches task_type rules first, then falls back to the cheapest model from the resolver.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select the cheapest available model.

Raises:

Type Description
NoAvailableModelError

If no models are registered.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select the cheapest available model.

    Raises:
        NoAvailableModelError: If no models are registered.
    """
    decision = _try_task_type_rules(
        request,
        config,
        resolver,
        self.name,
    )
    if decision is not None:
        if _within_budget(decision.resolved_model, request.remaining_budget):
            return decision
        logger.info(
            ROUTING_BUDGET_EXCEEDED,
            model=decision.resolved_model.model_id,
            cost=decision.resolved_model.total_cost_per_1k,
            remaining_budget=request.remaining_budget,
            source="task_type_rule_budget_check",
            strategy=self.name,
        )

    # Pick cheapest
    model, budget_exceeded = _cheapest_within_budget(
        resolver,
        request.remaining_budget,
    )
    reason = f"Cheapest available: {model.model_id}"
    if budget_exceeded:
        reason += " (all models exceed remaining budget)"
    return RoutingDecision(
        resolved_model=model,
        strategy_used=self.name,
        reason=reason,
    )

FastestStrategy

Select the fastest model, optionally respecting a budget.

Matches task_type rules first, then falls back to the fastest model from the resolver. When no models have latency data, delegates to cheapest (lower-cost models are typically smaller and faster, making cost a reasonable proxy).

name property

name

Return strategy name.

select

select(request, config, resolver)

Select the fastest available model.

Raises:

Type Description
NoAvailableModelError

If no models are registered.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select the fastest available model.

    Raises:
        NoAvailableModelError: If no models are registered.
    """
    skipped_task_rule: str | None = None
    decision = _try_task_type_rules(
        request,
        config,
        resolver,
        self.name,
    )
    if decision is not None:
        if _within_budget(decision.resolved_model, request.remaining_budget):
            return decision
        skipped_task_rule = decision.resolved_model.model_id
        logger.info(
            ROUTING_BUDGET_EXCEEDED,
            model=decision.resolved_model.model_id,
            cost=decision.resolved_model.total_cost_per_1k,
            remaining_budget=request.remaining_budget,
            source="task_type_rule_budget_check",
            strategy=self.name,
        )

    # Pick fastest
    model, budget_exceeded = _fastest_within_budget(
        resolver,
        request.remaining_budget,
    )
    # _fastest_within_budget may delegate to cheapest when no latency data
    basis = (
        "fastest"
        if model.estimated_latency_ms is not None
        else "cheapest (no latency data)"
    )
    reason = f"Selected by {basis}: {model.model_id}"
    if budget_exceeded:
        reason += " (all models exceed remaining budget)"
    if skipped_task_rule is not None:
        reason += f" (task-type rule {skipped_task_rule!r} exceeded budget)"
    return RoutingDecision(
        resolved_model=model,
        strategy_used=self.name,
        reason=reason,
    )

SmartStrategy

Combined strategy with priority-based signal merging.

Priority order: model_override > task_type rules > role_level rules > seniority default > cheapest available (budget-aware) > global fallback_chain > exhausted.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select a model using all available signals.

Raises:

Type Description
NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select a model using all available signals.

    Raises:
        NoAvailableModelError: If all candidates are exhausted.
    """
    return (
        self._try_override(request, resolver)
        or _try_task_type_rules(
            request,
            config,
            resolver,
            self.name,
        )
        or _try_role_rules(
            request,
            config,
            resolver,
            self.name,
        )
        or _try_seniority_default(
            request,
            resolver,
            self.name,
        )
        or self._try_cheapest(request, resolver)
        or self._try_global_chain(config, resolver)
        or self._raise_exhausted()
    )

Resilience

retry

Retry handler with exponential backoff and jitter.

RetryResult dataclass

RetryResult(value, attempt_count, retry_reason)

Bases: Generic[T]

Immutable result of a retry-wrapped execution.

Returned by :meth:RetryHandler.execute so callers get per-invocation retry metadata without shared mutable state.

Attributes:

Name Type Description
value T

The return value of the wrapped callable.

attempt_count int

Number of attempts made (1 = no retry).

retry_reason str | None

Exception type name if a retry occurred.

RetryHandler

RetryHandler(config)

Wraps async callables with retry logic.

Retries transient errors (is_retryable=True) using exponential backoff with optional jitter. Non-retryable errors raise immediately. After exhausting max_retries, raises RetryExhaustedError.

Parameters:

Name Type Description Default
config RetryConfig

Retry configuration.

required
Source code in src/synthorg/providers/resilience/retry.py
def __init__(self, config: RetryConfig) -> None:
    self._config = config

execute async

execute(func)

Execute func with retry on transient errors.

Parameters:

Name Type Description Default
func Callable[[], Coroutine[object, object, T]]

Zero-argument async callable to execute.

required

Returns:

Name Type Description
A RetryResult[T]

class:RetryResult containing the return value and

RetryResult[T]

per-invocation retry metadata.

Raises:

Type Description
RetryExhaustedError

If all retries are exhausted.

ProviderError

If the error is non-retryable.

Source code in src/synthorg/providers/resilience/retry.py
async def execute(
    self,
    func: Callable[[], Coroutine[object, object, T]],
) -> RetryResult[T]:
    """Execute *func* with retry on transient errors.

    Args:
        func: Zero-argument async callable to execute.

    Returns:
        A :class:`RetryResult` containing the return value and
        per-invocation retry metadata.

    Raises:
        RetryExhaustedError: If all retries are exhausted.
        ProviderError: If the error is non-retryable.
    """
    attempt_count = 0
    retry_reason: str | None = None
    last_error: ProviderError | None = None

    for attempt in range(1 + self._config.max_retries):
        attempt_count = attempt + 1
        try:
            value = await func()
            return RetryResult(
                value=value,
                attempt_count=attempt_count,
                retry_reason=retry_reason,
            )
        except ProviderError as exc:
            last_error = self._handle_retryable_error(exc)
            if last_error is None:
                raise
            retry_reason = type(exc).__name__
            if attempt >= self._config.max_retries:
                break
            delay = self._compute_delay(attempt, exc)
            logger.info(
                PROVIDER_RETRY_ATTEMPT,
                attempt=attempt + 1,
                max_retries=self._config.max_retries,
                delay=delay,
                error_type=type(exc).__name__,
            )
            await asyncio.sleep(delay)
        except Exception:
            logger.warning(
                PROVIDER_CALL_ERROR,
                reason="unexpected_non_provider_error",
            )
            raise

    if last_error is None:
        msg = "RetryHandler reached exhaustion with no recorded error"
        raise RuntimeError(msg)
    logger.warning(
        PROVIDER_RETRY_EXHAUSTED,
        max_retries=self._config.max_retries,
        error_type=type(last_error).__name__,
    )
    raise RetryExhaustedError(last_error) from last_error

rate_limiter

Client-side rate limiter with RPM and concurrency controls.

RateLimiter

RateLimiter(config, *, provider_name, clock=None)

Client-side rate limiter with RPM tracking and concurrency control.

Uses a sliding window for RPM tracking and an asyncio semaphore for concurrency limiting. Supports pause-until from provider retry_after hints.

Parameters:

Name Type Description Default
config RateLimiterConfig

Rate limiter configuration.

required
provider_name str

Provider name for logging context.

required
clock Clock | None

Time source for RPM-window timestamps and pause-until tracking. Defaults to SystemClock; tests inject FakeClock for deterministic window expiry.

None
Source code in src/synthorg/providers/resilience/rate_limiter.py
def __init__(
    self,
    config: RateLimiterConfig,
    *,
    provider_name: str,
    clock: Clock | None = None,
) -> None:
    self._config = config
    self._provider_name = provider_name
    self._clock: Clock = clock if clock is not None else SystemClock()
    self._semaphore: asyncio.Semaphore | None = (
        asyncio.Semaphore(config.max_concurrent)
        if config.max_concurrent > 0
        else None
    )
    self._request_timestamps: list[float] = []
    self._pause_until: float = 0.0
    self._rpm_lock: asyncio.Lock = asyncio.Lock()

is_enabled property

is_enabled

Whether any rate limiting is active.

acquire async

acquire()

Wait for an available slot.

Blocks until both the RPM window and concurrency semaphore allow a new request. Also respects any active pause.

Source code in src/synthorg/providers/resilience/rate_limiter.py
async def acquire(self) -> None:
    """Wait for an available slot.

    Blocks until both the RPM window and concurrency semaphore
    allow a new request.  Also respects any active pause.
    """
    if not self.is_enabled and self._pause_until <= self._clock.monotonic():
        return

    # Respect pause-until from retry_after.
    # Re-check in a loop in case pause() extends _pause_until while sleeping.
    # lint-allow: long-running-loop-kill-switch -- per-call retry-wait.
    while True:
        now = self._clock.monotonic()
        remaining = self._pause_until - now
        if remaining <= 0:
            break
        logger.info(
            PROVIDER_RATE_LIMITER_THROTTLED,
            provider=self._provider_name,
            wait_seconds=round(remaining, 2),
            reason="pause_active",
        )
        try:
            await self._clock.sleep(remaining)
        except asyncio.CancelledError:
            # Surface pause-interrupted cancellation in the audit
            # trail before re-raising so an oncall debugging a
            # truncated request can see the rate limiter was the
            # site that absorbed the cancel signal.
            # Error path: WARNING per CLAUDE.md "All error paths
            # must log at WARNING or ERROR with context before
            # raising". Cancellation is the negative-control path
            # for this loop, not a normal info-level event.
            logger.warning(
                PROVIDER_RATE_LIMITER_CANCELLED,
                provider=self._provider_name,
                wait_seconds=round(remaining, 2),
                reason="pause_active",
            )
            raise

    # RPM sliding window
    if self._config.max_requests_per_minute > 0:
        await self._wait_for_rpm_slot()

    # Concurrency semaphore
    if self._semaphore is not None:
        await self._semaphore.acquire()

release

release()

Release a concurrency slot.

Source code in src/synthorg/providers/resilience/rate_limiter.py
def release(self) -> None:
    """Release a concurrency slot."""
    if self._semaphore is not None:
        self._semaphore.release()

pause

pause(seconds)

Block new requests for seconds.

Called when a RateLimitError with retry_after is received. Multiple calls take the latest pause-until if it extends further.

Parameters:

Name Type Description Default
seconds float

Duration to pause in seconds. Must be finite and non-negative.

required

Raises:

Type Description
ValueError

If seconds is negative or not finite.

Source code in src/synthorg/providers/resilience/rate_limiter.py
def pause(self, seconds: float) -> None:
    """Block new requests for *seconds*.

    Called when a ``RateLimitError`` with ``retry_after`` is received.
    Multiple calls take the latest pause-until if it extends further.

    Args:
        seconds: Duration to pause in seconds.  Must be finite and
            non-negative.

    Raises:
        ValueError: If *seconds* is negative or not finite.
    """
    if not math.isfinite(seconds) or seconds < 0:
        msg = f"pause seconds must be a finite non-negative number, got {seconds!r}"
        raise ValueError(msg)
    new_until = self._clock.monotonic() + seconds
    if new_until > self._pause_until:
        self._pause_until = new_until
        logger.info(
            PROVIDER_RATE_LIMITER_PAUSED,
            provider=self._provider_name,
            pause_seconds=round(seconds, 2),
        )

errors

Resilience-specific error types.

RetryExhaustedError

RetryExhaustedError(original_error)

Bases: ProviderError

All retry attempts exhausted for a retryable error.

Raised by RetryHandler when max_retries is reached. The engine layer catches this to trigger fallback chains.

Attributes:

Name Type Description
original_error

The last retryable error that was raised.

Initialize with the original error that exhausted retries.

Parameters:

Name Type Description Default
original_error ProviderError

The last retryable ProviderError.

required
Source code in src/synthorg/providers/resilience/errors.py
def __init__(
    self,
    original_error: ProviderError,
) -> None:
    """Initialize with the original error that exhausted retries.

    Args:
        original_error: The last retryable ``ProviderError``.
    """
    self.original_error = original_error
    super().__init__(
        f"Retry exhausted after error: {original_error.message}",
        context=dict(original_error.context),
    )