Skip to content

Providers

LLM provider abstraction -- protocol, base class, drivers, capabilities, routing, and resilience.

Protocol

protocol

Typed protocol for completion providers.

The engine and tests type-hint against CompletionProvider for loose coupling. Concrete adapters and test doubles satisfy it structurally.

CompletionProvider

Bases: Protocol

Structural interface every LLM provider adapter must satisfy.

Defines three async methods: complete for non-streaming chat completion, stream for streaming completion, and get_model_capabilities for capability metadata lookup.

complete async

complete(messages, model, *, tools=None, config=None)

Execute a non-streaming chat completion.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
CompletionResponse

The full completion response.

Source code in src/synthorg/providers/protocol.py
async def complete(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> CompletionResponse:
    """Execute a non-streaming chat completion.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        The full completion response.
    """
    ...

stream async

stream(messages, model, *, tools=None, config=None)

Execute a streaming chat completion.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
AsyncIterator[StreamChunk]

Async iterator of stream chunks.

Source code in src/synthorg/providers/protocol.py
async def stream(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> AsyncIterator[StreamChunk]:
    """Execute a streaming chat completion.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        Async iterator of stream chunks.
    """
    ...

get_model_capabilities async

get_model_capabilities(model)

Return capability metadata for the given model.

Parameters:

Name Type Description Default
model str

Model identifier.

required

Returns:

Type Description
ModelCapabilities

Static capability and cost information.

Source code in src/synthorg/providers/protocol.py
async def get_model_capabilities(self, model: str) -> ModelCapabilities:
    """Return capability metadata for the given model.

    Args:
        model: Model identifier.

    Returns:
        Static capability and cost information.
    """
    ...

Base Provider

base

Abstract base class for completion providers.

Concrete adapters subclass BaseCompletionProvider and implement the _do_* hooks. The base class handles input validation, automatic retry, rate limiting, and provides a cost-computation helper.

BaseCompletionProvider

BaseCompletionProvider(*, retry_handler=None, rate_limiter=None)

Bases: ABC

Shared base for all completion provider adapters.

Subclasses implement three hooks:

  • _do_complete -- raw non-streaming call
  • _do_stream -- raw streaming call
  • _do_get_model_capabilities -- capability lookup

The public methods validate inputs before delegating to hooks. When a retry_handler and/or rate_limiter are provided, calls are automatically wrapped with retry and rate-limiting logic. A static compute_cost helper is available for subclasses to build TokenUsage records from raw token counts.

Parameters:

Name Type Description Default
retry_handler RetryHandler | None

Optional retry handler for transient errors.

None
rate_limiter RateLimiter | None

Optional client-side rate limiter.

None
Source code in src/synthorg/providers/base.py
def __init__(
    self,
    *,
    retry_handler: RetryHandler | None = None,
    rate_limiter: RateLimiter | None = None,
) -> None:
    self._retry_handler = retry_handler
    self._rate_limiter = rate_limiter

complete async

complete(messages, model, *, tools=None, config=None)

Validate inputs, delegate to _do_complete.

Applies rate limiting and retry automatically when configured.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
CompletionResponse

The completion response.

Raises:

Type Description
InvalidRequestError

If messages are empty or model is blank.

RetryExhaustedError

If all retries are exhausted.

Source code in src/synthorg/providers/base.py
async def complete(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> CompletionResponse:
    """Validate inputs, delegate to ``_do_complete``.

    Applies rate limiting and retry automatically when configured.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        The completion response.

    Raises:
        InvalidRequestError: If messages are empty or model is blank.
        RetryExhaustedError: If all retries are exhausted.
    """
    self._validate_messages(messages)
    self._validate_model(model)
    logger.debug(
        PROVIDER_CALL_START,
        model=model,
        message_count=len(messages),
    )

    async def _attempt() -> CompletionResponse:
        return await self._rate_limited_call(
            self._do_complete,
            messages,
            model,
            tools=tools,
            config=config,
        )

    try:
        result = await self._resilient_execute(_attempt)
    except Exception:
        logger.error(PROVIDER_CALL_ERROR, model=model, exc_info=True)
        raise
    logger.debug(
        PROVIDER_CALL_SUCCESS,
        model=model,
    )
    return result

stream async

stream(messages, model, *, tools=None, config=None)

Validate inputs, delegate to _do_stream.

Only the initial connection setup is retried; mid-stream errors are not retried.

Parameters:

Name Type Description Default
messages list[ChatMessage]

Conversation history.

required
model str

Model identifier to use.

required
tools list[ToolDefinition] | None

Available tools for function calling.

None
config CompletionConfig | None

Optional completion parameters.

None

Returns:

Type Description
AsyncIterator[StreamChunk]

Async iterator of stream chunks.

Raises:

Type Description
InvalidRequestError

If messages are empty or model is blank.

RetryExhaustedError

If all retries are exhausted.

Source code in src/synthorg/providers/base.py
async def stream(
    self,
    messages: list[ChatMessage],
    model: str,
    *,
    tools: list[ToolDefinition] | None = None,
    config: CompletionConfig | None = None,
) -> AsyncIterator[StreamChunk]:
    """Validate inputs, delegate to ``_do_stream``.

    Only the initial connection setup is retried; mid-stream errors
    are not retried.

    Args:
        messages: Conversation history.
        model: Model identifier to use.
        tools: Available tools for function calling.
        config: Optional completion parameters.

    Returns:
        Async iterator of stream chunks.

    Raises:
        InvalidRequestError: If messages are empty or model is blank.
        RetryExhaustedError: If all retries are exhausted.
    """
    self._validate_messages(messages)
    self._validate_model(model)
    logger.debug(
        PROVIDER_STREAM_START,
        model=model,
        message_count=len(messages),
    )

    async def _attempt() -> AsyncIterator[StreamChunk]:
        return await self._rate_limited_call(
            self._do_stream,
            messages,
            model,
            tools=tools,
            config=config,
        )

    try:
        return await self._resilient_execute(_attempt)
    except Exception:
        logger.error(PROVIDER_CALL_ERROR, model=model, exc_info=True)
        raise

get_model_capabilities async

get_model_capabilities(model)

Validate model identifier, delegate to _do_get_model_capabilities.

Parameters:

Name Type Description Default
model str

Model identifier.

required

Returns:

Type Description
ModelCapabilities

Static capability and cost information.

Raises:

Type Description
InvalidRequestError

If model is blank.

Source code in src/synthorg/providers/base.py
async def get_model_capabilities(self, model: str) -> ModelCapabilities:
    """Validate model identifier, delegate to ``_do_get_model_capabilities``.

    Args:
        model: Model identifier.

    Returns:
        Static capability and cost information.

    Raises:
        InvalidRequestError: If model is blank.
    """
    self._validate_model(model)
    return await self._do_get_model_capabilities(model)

compute_cost staticmethod

compute_cost(input_tokens, output_tokens, *, cost_per_1k_input, cost_per_1k_output)

Build a TokenUsage from raw token counts and per-1k rates.

Parameters:

Name Type Description Default
input_tokens int

Number of input tokens (must be >= 0).

required
output_tokens int

Number of output tokens (must be >= 0).

required
cost_per_1k_input float

Cost per 1,000 input tokens in USD (base currency; finite and >= 0).

required
cost_per_1k_output float

Cost per 1,000 output tokens in USD (base currency; finite and >= 0).

required

Returns:

Type Description
TokenUsage

Populated TokenUsage with computed cost.

Raises:

Type Description
InvalidRequestError

If any parameter is negative or non-finite.

Source code in src/synthorg/providers/base.py
@staticmethod
def compute_cost(
    input_tokens: int,
    output_tokens: int,
    *,
    cost_per_1k_input: float,
    cost_per_1k_output: float,
) -> TokenUsage:
    """Build a ``TokenUsage`` from raw token counts and per-1k rates.

    Args:
        input_tokens: Number of input tokens (must be >= 0).
        output_tokens: Number of output tokens (must be >= 0).
        cost_per_1k_input: Cost per 1,000 input tokens in USD
            (base currency; finite and >= 0).
        cost_per_1k_output: Cost per 1,000 output tokens in USD
            (base currency; finite and >= 0).

    Returns:
        Populated ``TokenUsage`` with computed cost.

    Raises:
        InvalidRequestError: If any parameter is negative or
            non-finite.
    """
    if input_tokens < 0:
        msg = "input_tokens must be non-negative"
        raise InvalidRequestError(
            msg,
            context={"input_tokens": input_tokens},
        )
    if output_tokens < 0:
        msg = "output_tokens must be non-negative"
        raise InvalidRequestError(
            msg,
            context={"output_tokens": output_tokens},
        )
    if cost_per_1k_input < 0 or not math.isfinite(cost_per_1k_input):
        msg = "cost_per_1k_input must be a finite non-negative number"
        raise InvalidRequestError(
            msg,
            context={"cost_per_1k_input": cost_per_1k_input},
        )
    if cost_per_1k_output < 0 or not math.isfinite(cost_per_1k_output):
        msg = "cost_per_1k_output must be a finite non-negative number"
        raise InvalidRequestError(
            msg,
            context={"cost_per_1k_output": cost_per_1k_output},
        )
    cost = (input_tokens / 1000) * cost_per_1k_input + (
        output_tokens / 1000
    ) * cost_per_1k_output
    return TokenUsage(
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        cost_usd=round(cost, BUDGET_ROUNDING_PRECISION),
    )

Models

models

Provider-layer domain models for chat completion requests and responses.

ZERO_TOKEN_USAGE module-attribute

ZERO_TOKEN_USAGE = TokenUsage(input_tokens=0, output_tokens=0, cost_usd=0.0)

Additive identity for TokenUsage.

TokenUsage pydantic-model

Bases: BaseModel

Token counts and cost for a single completion call.

This is the lightweight provider-layer record. The budget layer's synthorg.budget.CostRecord adds agent/task context around it.

Attributes:

Name Type Description
input_tokens int

Number of input (prompt) tokens.

output_tokens int

Number of output (completion) tokens.

total_tokens int

Sum of input and output tokens (computed).

cost_usd float

Estimated cost in USD (base currency) for this call.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

input_tokens pydantic-field

input_tokens

Input token count

output_tokens pydantic-field

output_tokens

Output token count

cost_usd pydantic-field

cost_usd

Estimated cost in USD (base currency)

total_tokens property

total_tokens

Sum of input and output tokens.

ToolDefinition pydantic-model

Bases: BaseModel

Schema for a tool the model can invoke.

Uses raw JSON Schema for parameters_schema because every LLM provider consumes it natively.

Note

The parameters_schema dict is shallowly frozen by Pydantic's frozen=True -- field reassignment is prevented but nested contents can still be mutated in place. BaseTool.to_definition() provides a deep-copied schema, and ToolInvoker deep-copies arguments at the execution boundary, so no additional caller-side copying is needed for standard tool/provider workflows. Direct consumers outside these paths should deep-copy if they intend to modify the schema. See the tech stack page (docs/architecture/tech-stack.md).

Attributes:

Name Type Description
name NotBlankStr

Tool name.

description str

Human-readable description of the tool.

parameters_schema dict[str, Any]

JSON Schema dict describing the tool parameters.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

name pydantic-field

name

Tool name

description pydantic-field

description = ''

Tool description

parameters_schema pydantic-field

parameters_schema

JSON Schema for tool parameters

ToolCall pydantic-model

Bases: BaseModel

A tool invocation requested by the model.

Note

The arguments dict is shallowly frozen by Pydantic's frozen=True -- field reassignment is prevented but nested contents can still be mutated in place. The ToolInvoker deep-copies arguments before passing them to tool implementations. See the tech stack page (docs/architecture/tech-stack.md).

Attributes:

Name Type Description
id NotBlankStr

Provider-assigned tool call identifier.

name NotBlankStr

Name of the tool to invoke.

arguments dict[str, Any]

Parsed arguments dict.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Tool call identifier

name pydantic-field

name

Tool name

arguments pydantic-field

arguments

Tool arguments

ToolResult pydantic-model

Bases: BaseModel

Result of executing a tool call, sent back to the model.

Attributes:

Name Type Description
tool_call_id NotBlankStr

The ToolCall.id this result corresponds to.

content str

String content returned by the tool.

is_error bool

Whether the tool execution failed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

tool_call_id pydantic-field

tool_call_id

Matching tool call ID

content pydantic-field

content

Tool output content

is_error pydantic-field

is_error = False

Whether tool errored

ChatMessage pydantic-model

Bases: BaseModel

A single message in a chat completion conversation.

Attributes:

Name Type Description
role MessageRole

Message role (system, user, assistant, tool).

content str | None

Text content of the message.

tool_calls tuple[ToolCall, ...]

Tool calls requested by the assistant (assistant only).

tool_result ToolResult | None

Result of a tool execution (tool role only).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_role_constraints

role pydantic-field

role

Message role

content pydantic-field

content = None

Text content

tool_calls pydantic-field

tool_calls = ()

Tool calls (assistant messages only)

tool_result pydantic-field

tool_result = None

Tool result (tool messages only)

CompletionConfig pydantic-model

Bases: BaseModel

Optional parameters for a completion request.

All fields are optional -- the provider fills in defaults.

Attributes:

Name Type Description
temperature float | None

Sampling temperature (0.0-2.0). Actual valid range may vary by provider.

max_tokens int | None

Maximum tokens to generate.

stop_sequences tuple[str, ...]

Sequences that stop generation.

top_p float | None

Nucleus sampling threshold.

timeout float | None

Request timeout in seconds.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

temperature pydantic-field

temperature = None

Sampling temperature

max_tokens pydantic-field

max_tokens = None

Maximum tokens to generate

stop_sequences pydantic-field

stop_sequences = ()

Stop sequences

top_p pydantic-field

top_p = None

Nucleus sampling threshold

timeout pydantic-field

timeout = None

Request timeout in seconds

CompletionResponse pydantic-model

Bases: BaseModel

Result of a non-streaming completion call.

Attributes:

Name Type Description
content str | None

Generated text content (may be None for tool-use-only responses).

tool_calls tuple[ToolCall, ...]

Tool calls the model wants to execute.

finish_reason FinishReason

Why the model stopped generating.

usage TokenUsage

Token usage and cost breakdown.

model NotBlankStr

Model identifier that served the request.

provider_request_id str | None

Provider-assigned request ID for debugging.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_has_output

content pydantic-field

content = None

Generated text

tool_calls pydantic-field

tool_calls = ()

Requested tool calls

finish_reason pydantic-field

finish_reason

Reason generation stopped

usage pydantic-field

usage

Token usage breakdown

model pydantic-field

model

Model that served the request

provider_request_id pydantic-field

provider_request_id = None

Provider request ID

StreamChunk pydantic-model

Bases: BaseModel

A single chunk from a streaming completion response.

The event_type discriminator determines which optional fields are populated.

Attributes:

Name Type Description
event_type StreamEventType

Type of stream event.

content str | None

Text delta (for content_delta).

tool_call_delta ToolCall | None

Tool call received during streaming (for tool_call_delta).

usage TokenUsage | None

Final token usage (for usage event).

error_message str | None

Error description (for error event).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_event_fields

event_type pydantic-field

event_type

Stream event type

content pydantic-field

content = None

Text delta

tool_call_delta pydantic-field

tool_call_delta = None

Tool call received during streaming

usage pydantic-field

usage = None

Final token usage

error_message pydantic-field

error_message = None

Error description

add_token_usage

add_token_usage(a, b)

Create a new TokenUsage with summed token counts and cost.

Parameters:

Name Type Description Default
a TokenUsage

First usage record.

required
b TokenUsage

Second usage record.

required

Returns:

Type Description
TokenUsage

New TokenUsage with summed token counts and cost

TokenUsage

(total_tokens is computed automatically).

Source code in src/synthorg/providers/models.py
def add_token_usage(a: TokenUsage, b: TokenUsage) -> TokenUsage:
    """Create a new ``TokenUsage`` with summed token counts and cost.

    Args:
        a: First usage record.
        b: Second usage record.

    Returns:
        New ``TokenUsage`` with summed token counts and cost
        (``total_tokens`` is computed automatically).
    """
    return TokenUsage(
        input_tokens=a.input_tokens + b.input_tokens,
        output_tokens=a.output_tokens + b.output_tokens,
        cost_usd=a.cost_usd + b.cost_usd,
    )

Enums

enums

Provider-layer enumerations.

AuthType

Bases: StrEnum

Authentication type for an LLM provider.

MessageRole

Bases: StrEnum

Role of a message participant in a chat completion.

FinishReason

Bases: StrEnum

Reason the model stopped generating tokens.

StreamEventType

Bases: StrEnum

Discriminator for streaming response chunks.

Errors

errors

Provider error hierarchy.

Every provider error carries a is_retryable flag so retry logic can decide whether to attempt again without inspecting concrete exception types.

ProviderError

ProviderError(message, *, context=None)

Bases: Exception

Base exception for all provider-layer errors.

Attributes:

Name Type Description
message

Human-readable error description.

context MappingProxyType[str, Any]

Immutable metadata about the error (provider, model, etc.).

is_retryable bool

Whether the caller should retry the request.

Note

When converted to string, sensitive context keys (api_key, token, secret, password, authorization) are automatically redacted regardless of casing.

Initialize a provider error.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
context dict[str, Any] | None

Arbitrary metadata about the error. Stored as an immutable mapping; defaults to empty if not provided.

None
Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

__str__

__str__()

Format error with optional context metadata.

Sensitive keys (api_key, token, etc.) are redacted to prevent accidental secret leakage in logs and tracebacks.

Source code in src/synthorg/providers/errors.py
def __str__(self) -> str:
    """Format error with optional context metadata.

    Sensitive keys (api_key, token, etc.) are redacted to prevent
    accidental secret leakage in logs and tracebacks.
    """
    if self.context:
        ctx = ", ".join(
            f"{k}='***'" if _is_sensitive_key(k) else f"{k}={v!r}"
            for k, v in self.context.items()
        )
        return f"{self.message} ({ctx})"
    return self.message

AuthenticationError

AuthenticationError(message, *, context=None)

Bases: ProviderError

Invalid or missing API credentials.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

RateLimitError

RateLimitError(message, *, retry_after=None, context=None)

Bases: ProviderError

Provider rate limit exceeded.

Initialize a rate limit error.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
retry_after float | None

Seconds to wait before retrying, if provided by the provider.

None
context dict[str, Any] | None

Arbitrary metadata about the error.

None
Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    retry_after: float | None = None,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a rate limit error.

    Args:
        message: Human-readable error description.
        retry_after: Seconds to wait before retrying, if provided
            by the provider.
        context: Arbitrary metadata about the error.
    """
    if retry_after is not None and (
        retry_after < 0 or not math.isfinite(retry_after)
    ):
        msg = "retry_after must be a finite non-negative number"
        raise ValueError(msg)
    self.retry_after = retry_after
    super().__init__(message, context=context)

ModelNotFoundError

ModelNotFoundError(message, *, context=None)

Bases: ProviderError

Requested model does not exist or is not available.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

InvalidRequestError

InvalidRequestError(message, *, context=None)

Bases: ProviderError

Malformed request (bad parameters, too many tokens, etc.).

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ContentFilterError

ContentFilterError(message, *, context=None)

Bases: ProviderError

Request or response blocked by the provider's content filter.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ProviderTimeoutError

ProviderTimeoutError(message, *, context=None)

Bases: ProviderError

Request timed out waiting for provider response.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ProviderConnectionError

ProviderConnectionError(message, *, context=None)

Bases: ProviderError

Network-level failure connecting to the provider.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ProviderInternalError

ProviderInternalError(message, *, context=None)

Bases: ProviderError

Provider returned a server-side error (5xx).

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

DriverNotRegisteredError

DriverNotRegisteredError(message, *, context=None)

Bases: ProviderError

Requested provider driver is not registered in the registry.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

DriverAlreadyRegisteredError

DriverAlreadyRegisteredError(message, *, context=None)

Bases: ProviderError

A driver with this name is already registered.

Reserved for future use if the registry gains mutable operations (add/remove after construction). Not currently raised.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

DriverFactoryNotFoundError

DriverFactoryNotFoundError(message, *, context=None)

Bases: ProviderError

No factory found for the requested driver type string.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ProviderAlreadyExistsError

ProviderAlreadyExistsError(message, *, context=None)

Bases: ProviderError

A provider with this name already exists.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ProviderNotFoundError

ProviderNotFoundError(message, *, context=None)

Bases: ProviderError

A provider with this name does not exist.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ProviderValidationError

ProviderValidationError(message, *, context=None)

Bases: ProviderError

Provider configuration failed validation.

Source code in src/synthorg/providers/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a provider error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

Capabilities

capabilities

Model capability descriptors for provider routing decisions.

ModelCapabilities pydantic-model

Bases: BaseModel

Static capability and cost metadata for a single LLM model.

Used by the routing layer to decide which model handles a request based on required features (tools, vision, streaming) and cost.

Attributes:

Name Type Description
model_id NotBlankStr

Provider model identifier (e.g. "example-large-001").

provider NotBlankStr

Provider name (e.g. "example-provider").

max_context_tokens int

Maximum context window size in tokens.

max_output_tokens int

Maximum output tokens per request.

supports_tools bool

Whether the model supports tool/function calling.

supports_vision bool

Whether the model accepts image inputs.

supports_streaming bool

Whether the model supports streaming responses.

supports_streaming_tool_calls bool

Whether tool calls can be streamed.

supports_system_messages bool

Whether system messages are accepted.

cost_per_1k_input float

Cost per 1 000 input tokens in USD.

cost_per_1k_output float

Cost per 1 000 output tokens in USD.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_cross_field_constraints

model_id pydantic-field

model_id

Model identifier

provider pydantic-field

provider

Provider name

max_context_tokens pydantic-field

max_context_tokens

Max context window tokens

max_output_tokens pydantic-field

max_output_tokens

Max output tokens per request

supports_tools pydantic-field

supports_tools = False

Supports tool calling

supports_vision pydantic-field

supports_vision = False

Supports image inputs

supports_streaming pydantic-field

supports_streaming = True

Supports streaming responses

supports_streaming_tool_calls pydantic-field

supports_streaming_tool_calls = False

Supports streaming tool calls

supports_system_messages pydantic-field

supports_system_messages = True

Supports system messages

cost_per_1k_input pydantic-field

cost_per_1k_input

Cost per 1k input tokens in USD

cost_per_1k_output pydantic-field

cost_per_1k_output

Cost per 1k output tokens in USD

Registry

registry

Provider registry -- the Employment Agency.

Maps provider names to concrete BaseCompletionProvider driver instances. Built from config via from_config, which reads each provider's driver field to select the appropriate factory.

ProviderRegistry

ProviderRegistry(drivers)

Immutable registry of named provider drivers.

Use from_config to build a registry from a config dict, or construct directly with a pre-built mapping.

Examples:

Build from config::

registry = ProviderRegistry.from_config(
    root_config.providers,
)
driver = registry.get("example-provider")
response = await driver.complete(messages, "medium")

Check membership::

if "example-provider" in registry:
    ...

Initialize with a name -> driver mapping.

Parameters:

Name Type Description Default
drivers dict[str, BaseCompletionProvider]

Mutable dict of provider name to driver instance. The registry takes ownership and freezes a copy.

required
Source code in src/synthorg/providers/registry.py
def __init__(
    self,
    drivers: dict[str, BaseCompletionProvider],
) -> None:
    """Initialize with a name -> driver mapping.

    Args:
        drivers: Mutable dict of provider name to driver instance.
            The registry takes ownership and freezes a copy.
    """
    self._drivers: MappingProxyType[str, BaseCompletionProvider] = MappingProxyType(
        dict(drivers)
    )

get

get(name)

Look up a driver by provider name.

Parameters:

Name Type Description Default
name str

Provider name (e.g. "example-provider").

required

Returns:

Type Description
BaseCompletionProvider

The registered driver instance.

Raises:

Type Description
DriverNotRegisteredError

If no driver is registered.

Source code in src/synthorg/providers/registry.py
def get(self, name: str) -> BaseCompletionProvider:
    """Look up a driver by provider name.

    Args:
        name: Provider name (e.g. ``"example-provider"``).

    Returns:
        The registered driver instance.

    Raises:
        DriverNotRegisteredError: If no driver is registered.
    """
    driver = self._drivers.get(name)
    if driver is None:
        available = sorted(self._drivers) or ["(none)"]
        logger.error(
            PROVIDER_DRIVER_NOT_REGISTERED,
            name=name,
            available=available,
        )
        msg = (
            f"Provider {name!r} is not registered. "
            f"Available providers: {', '.join(available)}"
        )
        raise DriverNotRegisteredError(
            msg,
            context={"provider": name},
        )
    return driver

list_providers

list_providers()

Return sorted tuple of registered provider names.

Source code in src/synthorg/providers/registry.py
def list_providers(self) -> tuple[str, ...]:
    """Return sorted tuple of registered provider names."""
    return tuple(sorted(self._drivers))

__contains__

__contains__(name)

Check whether a provider name is registered.

Source code in src/synthorg/providers/registry.py
def __contains__(self, name: object) -> bool:
    """Check whether a provider name is registered."""
    try:
        return name in self._drivers
    except TypeError:
        return False

__len__

__len__()

Return the number of registered providers.

Source code in src/synthorg/providers/registry.py
def __len__(self) -> int:
    """Return the number of registered providers."""
    return len(self._drivers)

from_config classmethod

from_config(providers, *, factory_overrides=None)

Build a registry from a provider config dict.

For each provider, reads the driver field to select a factory. The factory is called with (provider_name, config) to produce a driver instance.

Parameters:

Name Type Description Default
providers dict[str, ProviderConfig]

Provider config dict (key = provider name).

required
factory_overrides dict[str, object] | None

Optional driver-type -> factory mapping for testing or native SDK swaps.

None

Returns:

Type Description
Self

A new ProviderRegistry with all providers registered.

Raises:

Type Description
DriverFactoryNotFoundError

If a provider's driver does not match any known factory.

Source code in src/synthorg/providers/registry.py
@classmethod
def from_config(
    cls,
    providers: dict[str, ProviderConfig],
    *,
    factory_overrides: dict[str, object] | None = None,
) -> Self:
    """Build a registry from a provider config dict.

    For each provider, reads the ``driver`` field to select a
    factory.  The factory is called with
    ``(provider_name, config)`` to produce a driver instance.

    Args:
        providers: Provider config dict (key = provider name).
        factory_overrides: Optional driver-type -> factory
            mapping for testing or native SDK swaps.

    Returns:
        A new ``ProviderRegistry`` with all providers registered.

    Raises:
        DriverFactoryNotFoundError: If a provider's ``driver``
            does not match any known factory.
    """
    from .drivers.litellm_driver import (  # noqa: PLC0415
        LiteLLMDriver,
    )

    defaults: dict[str, type[BaseCompletionProvider]] = {
        "litellm": LiteLLMDriver,
    }
    overrides = factory_overrides or {}
    drivers: dict[str, BaseCompletionProvider] = {}

    for name, config in providers.items():
        driver = _build_driver(
            name,
            config,
            defaults,
            overrides,
        )
        drivers[name] = driver

    logger.info(
        PROVIDER_REGISTRY_BUILT,
        provider_count=len(drivers),
        providers=sorted(drivers),
    )
    return cls(drivers)

LiteLLM Driver

litellm_driver

LiteLLM-backed completion driver.

Wraps litellm.acompletion behind the BaseCompletionProvider contract, mapping between domain models and LiteLLM's chat-completion API.

LiteLLMDriver

LiteLLMDriver(provider_name, config)

Bases: BaseCompletionProvider

Completion driver backed by LiteLLM.

Uses litellm.acompletion for both streaming and non-streaming calls. Model identifiers are prefixed with the LiteLLM routing key (litellm_provider if set, otherwise the provider name -- e.g. example-provider/example-medium-001) so LiteLLM routes to the correct backend.

Parameters:

Name Type Description Default
provider_name str

Provider key from config (e.g. "example-provider").

required
config ProviderConfig

Provider configuration including API key, base URL, and model definitions.

required

Raises:

Type Description
ProviderError

All LiteLLM exceptions are mapped to the ProviderError hierarchy via _map_exception.

Source code in src/synthorg/providers/drivers/litellm_driver.py
def __init__(
    self,
    provider_name: str,
    config: ProviderConfig,
) -> None:
    retry_handler = (
        RetryHandler(config.retry) if config.retry.max_retries > 0 else None
    )
    rate_limiter = RateLimiter(
        config.rate_limiter,
        provider_name=provider_name,
    )
    super().__init__(
        retry_handler=retry_handler,
        rate_limiter=rate_limiter if rate_limiter.is_enabled else None,
    )
    self._provider_name = provider_name
    self._config = config
    self._model_lookup: MappingProxyType[str, ProviderModelConfig] = (
        MappingProxyType(self._build_model_lookup(config.models))
    )
    self._routing_key = config.litellm_provider or provider_name

Routing

models

Domain models for the routing engine.

ResolvedModel pydantic-model

Bases: BaseModel

A fully resolved model reference.

Attributes:

Name Type Description
provider_name NotBlankStr

Provider that owns this model (e.g. "acme-provider").

model_id NotBlankStr

Concrete model identifier (e.g. "acme-large-001").

alias NotBlankStr | None

Short alias used in routing rules, if any.

cost_per_1k_input float

Cost per 1,000 input tokens in USD (base currency).

cost_per_1k_output float

Cost per 1,000 output tokens in USD (base currency).

max_context int

Maximum context window size in tokens.

estimated_latency_ms int | None

Estimated median latency in milliseconds.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

provider_name pydantic-field

provider_name

Provider name

model_id pydantic-field

model_id

Model identifier

alias pydantic-field

alias = None

Short alias

cost_per_1k_input pydantic-field

cost_per_1k_input = 0.0

Cost per 1k input tokens in USD (base currency)

cost_per_1k_output pydantic-field

cost_per_1k_output = 0.0

Cost per 1k output tokens in USD (base currency)

max_context pydantic-field

max_context = 200000

Maximum context window size in tokens

estimated_latency_ms pydantic-field

estimated_latency_ms = None

Estimated median latency in milliseconds

total_cost_per_1k property

total_cost_per_1k

Total cost per 1 000 tokens (input + output).

RoutingRequest pydantic-model

Bases: BaseModel

Inputs to a routing decision.

Not all fields are used by every strategy:

  • ManualStrategy requires model_override.
  • RoleBasedStrategy requires agent_level.
  • CostAwareStrategy uses task_type and remaining_budget.
  • FastestStrategy uses task_type and remaining_budget.
  • SmartStrategy uses all fields in priority order.

Attributes:

Name Type Description
agent_level SeniorityLevel | None

Seniority level of the requesting agent.

task_type NotBlankStr | None

Task type label (e.g. "development").

model_override NotBlankStr | None

Explicit model reference for manual routing.

remaining_budget float | None

Per-request cost ceiling. Compared against each model's total_cost_per_1k (i.e. cost_per_1k_input + cost_per_1k_output) to filter models that exceed this threshold. This is not a total session budget -- use the budget module for that.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_level pydantic-field

agent_level = None

Seniority level of the requesting agent

task_type pydantic-field

task_type = None

Task type label

model_override pydantic-field

model_override = None

Explicit model reference for manual routing

remaining_budget pydantic-field

remaining_budget = None

Per-request cost ceiling in USD (base currency), compared against model total_cost_per_1k. Not a total session budget.

RoutingDecision pydantic-model

Bases: BaseModel

Output of a routing decision.

Attributes:

Name Type Description
resolved_model ResolvedModel

The chosen model.

strategy_used NotBlankStr

Name of the strategy that produced this decision.

reason NotBlankStr

Human-readable explanation.

fallbacks_tried tuple[str, ...]

Model refs that were tried before the final choice.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

resolved_model pydantic-field

resolved_model

The chosen model

strategy_used pydantic-field

strategy_used

Strategy name

reason pydantic-field

reason

Human-readable explanation

fallbacks_tried pydantic-field

fallbacks_tried = ()

Model refs tried before the final choice

router

Model router -- main entry point for routing decisions.

Constructed from RoutingConfig and a provider config dict. Delegates to strategy implementations.

ModelRouter

ModelRouter(routing_config, providers, *, selector=None)

Route requests to the appropriate LLM model.

Examples:

Build from config::

router = ModelRouter(
    routing_config=root_config.routing,
    providers=root_config.providers,
)
decision = router.route(
    RoutingRequest(agent_level=SeniorityLevel.SENIOR),
)

Initialize the router.

Parameters:

Name Type Description Default
routing_config RoutingConfig

Routing configuration (strategy, rules, fallback).

required
providers dict[str, ProviderConfig]

Provider configurations keyed by provider name.

required
selector ModelCandidateSelector | None

Optional candidate selector for multi-provider model resolution. Defaults to QuotaAwareSelector().

None

Raises:

Type Description
UnknownStrategyError

If the configured strategy is not recognized.

Source code in src/synthorg/providers/routing/router.py
def __init__(
    self,
    routing_config: RoutingConfig,
    providers: dict[str, ProviderConfig],
    *,
    selector: ModelCandidateSelector | None = None,
) -> None:
    """Initialize the router.

    Args:
        routing_config: Routing configuration (strategy, rules, fallback).
        providers: Provider configurations keyed by provider name.
        selector: Optional candidate selector for multi-provider
            model resolution.  Defaults to ``QuotaAwareSelector()``.

    Raises:
        UnknownStrategyError: If the configured strategy is not recognized.
    """
    self._config = routing_config
    self._resolver = ModelResolver.from_config(
        providers,
        selector=selector,
    )

    strategy_name = routing_config.strategy
    strategy = STRATEGY_MAP.get(strategy_name)
    if strategy is None:
        logger.error(
            ROUTING_STRATEGY_UNKNOWN,
            strategy=strategy_name,
            available=sorted(STRATEGY_MAP),
        )
        msg = (
            f"Unknown routing strategy {strategy_name!r}. "
            f"Available: {sorted(STRATEGY_MAP)}"
        )
        raise UnknownStrategyError(
            msg,
            context={"strategy": strategy_name},
        )
    self._strategy = strategy

    logger.info(
        ROUTING_ROUTER_BUILT,
        strategy_configured=strategy_name,
        strategy=self._strategy.name,
        rule_count=len(routing_config.rules),
        fallback_count=len(routing_config.fallback_chain),
    )

resolver property

resolver

Return the underlying model resolver.

strategy_name property

strategy_name

Return the active strategy name.

route

route(request)

Route a request to a model.

Parameters:

Name Type Description Default
request RoutingRequest

Routing inputs.

required

Returns:

Type Description
RoutingDecision

A routing decision with the chosen model.

Raises:

Type Description
ModelResolutionError

If a required model cannot be found.

NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/router.py
def route(self, request: RoutingRequest) -> RoutingDecision:
    """Route a request to a model.

    Args:
        request: Routing inputs.

    Returns:
        A routing decision with the chosen model.

    Raises:
        ModelResolutionError: If a required model cannot be found.
        NoAvailableModelError: If all candidates are exhausted.
    """
    try:
        decision = self._strategy.select(
            request,
            self._config,
            self._resolver,
        )
    except RoutingError as exc:
        logger.warning(
            ROUTING_SELECTION_FAILED,
            strategy=self._strategy.name,
            error_type=type(exc).__name__,
            error=str(exc),
            agent_level=(
                request.agent_level.value
                if request.agent_level is not None
                else None
            ),
            task_type=request.task_type,
            model_override=request.model_override,
        )
        raise
    logger.info(
        ROUTING_DECISION_MADE,
        strategy=decision.strategy_used,
        provider=decision.resolved_model.provider_name,
        model=decision.resolved_model.model_id,
        reason=decision.reason,
        fallbacks_tried=decision.fallbacks_tried,
    )
    return decision

strategies

Routing strategies -- stateless implementations of RoutingStrategy.

Each strategy selects a model given a RoutingRequest, a RoutingConfig, and a ModelResolver. Strategies are stateless singletons registered in a module-level mapping.

STRATEGY_MAP module-attribute

STRATEGY_MAP = MappingProxyType(
    {
        STRATEGY_NAME_MANUAL: _MANUAL,
        STRATEGY_NAME_ROLE_BASED: _ROLE_BASED,
        STRATEGY_NAME_COST_AWARE: _COST_AWARE,
        STRATEGY_NAME_FASTEST: _FASTEST,
        STRATEGY_NAME_SMART: _SMART,
        STRATEGY_NAME_CHEAPEST: _COST_AWARE,
    }
)

Maps config strategy names to singleton instances.

RoutingStrategy

Bases: Protocol

Protocol for model routing strategies.

name property

name

Strategy name (matches config value).

select

select(request, config, resolver)

Select a model for the given request.

Parameters:

Name Type Description Default
request RoutingRequest

Routing inputs (agent level, task type, etc.).

required
config RoutingConfig

Routing configuration (rules, fallback chain).

required
resolver ModelResolver

Model resolver for alias/ID lookup.

required

Returns:

Type Description
RoutingDecision

A routing decision with the chosen model.

Raises:

Type Description
ModelResolutionError

If the requested model cannot be found.

NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select a model for the given request.

    Args:
        request: Routing inputs (agent level, task type, etc.).
        config: Routing configuration (rules, fallback chain).
        resolver: Model resolver for alias/ID lookup.

    Returns:
        A routing decision with the chosen model.

    Raises:
        ModelResolutionError: If the requested model cannot be found.
        NoAvailableModelError: If all candidates are exhausted.
    """
    ...

ManualStrategy

Resolve an explicit model override.

Requires request.model_override to be set.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select the explicitly requested model.

Raises:

Type Description
ModelResolutionError

If model_override is not set or the model cannot be resolved.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,  # noqa: ARG002
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select the explicitly requested model.

    Raises:
        ModelResolutionError: If ``model_override`` is not set or
            the model cannot be resolved.
    """
    if request.model_override is None:
        logger.warning(
            ROUTING_NO_RULE_MATCHED,
            strategy=self.name,
            reason="model_override not set",
        )
        msg = "ManualStrategy requires model_override to be set"
        raise ModelResolutionError(msg)

    model = resolver.resolve(request.model_override)
    return RoutingDecision(
        resolved_model=model,
        strategy_used=self.name,
        reason=f"Explicit override: {request.model_override}",
    )

RoleBasedStrategy

Select model based on agent seniority level.

Matches the first routing rule where rule.role_level equals request.agent_level. If no rule matches, uses the seniority catalog's typical_model_tier as a fallback lookup.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select model based on role level.

Raises:

Type Description
ModelResolutionError

If no agent_level is set.

NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select model based on role level.

    Raises:
        ModelResolutionError: If no agent_level is set.
        NoAvailableModelError: If all candidates are exhausted.
    """
    level = self._require_level(request)
    return (
        self._try_rule_match(level, config, resolver)
        or self._try_seniority(level, config, resolver)
        or self._raise_no_available(level, config)
    )

CostAwareStrategy

Select the cheapest model, optionally respecting a budget.

Matches task_type rules first, then falls back to the cheapest model from the resolver.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select the cheapest available model.

Raises:

Type Description
NoAvailableModelError

If no models are registered.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select the cheapest available model.

    Raises:
        NoAvailableModelError: If no models are registered.
    """
    decision = _try_task_type_rules(
        request,
        config,
        resolver,
        self.name,
    )
    if decision is not None:
        if _within_budget(decision.resolved_model, request.remaining_budget):
            return decision
        logger.info(
            ROUTING_BUDGET_EXCEEDED,
            model=decision.resolved_model.model_id,
            cost=decision.resolved_model.total_cost_per_1k,
            remaining_budget=request.remaining_budget,
            source="task_type_rule_budget_check",
            strategy=self.name,
        )

    # Pick cheapest
    model, budget_exceeded = _cheapest_within_budget(
        resolver,
        request.remaining_budget,
    )
    reason = f"Cheapest available: {model.model_id}"
    if budget_exceeded:
        reason += " (all models exceed remaining budget)"
    return RoutingDecision(
        resolved_model=model,
        strategy_used=self.name,
        reason=reason,
    )

FastestStrategy

Select the fastest model, optionally respecting a budget.

Matches task_type rules first, then falls back to the fastest model from the resolver. When no models have latency data, delegates to cheapest (lower-cost models are typically smaller and faster, making cost a reasonable proxy).

name property

name

Return strategy name.

select

select(request, config, resolver)

Select the fastest available model.

Raises:

Type Description
NoAvailableModelError

If no models are registered.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select the fastest available model.

    Raises:
        NoAvailableModelError: If no models are registered.
    """
    skipped_task_rule: str | None = None
    decision = _try_task_type_rules(
        request,
        config,
        resolver,
        self.name,
    )
    if decision is not None:
        if _within_budget(decision.resolved_model, request.remaining_budget):
            return decision
        skipped_task_rule = decision.resolved_model.model_id
        logger.info(
            ROUTING_BUDGET_EXCEEDED,
            model=decision.resolved_model.model_id,
            cost=decision.resolved_model.total_cost_per_1k,
            remaining_budget=request.remaining_budget,
            source="task_type_rule_budget_check",
            strategy=self.name,
        )

    # Pick fastest
    model, budget_exceeded = _fastest_within_budget(
        resolver,
        request.remaining_budget,
    )
    # _fastest_within_budget may delegate to cheapest when no latency data
    basis = (
        "fastest"
        if model.estimated_latency_ms is not None
        else "cheapest (no latency data)"
    )
    reason = f"Selected by {basis}: {model.model_id}"
    if budget_exceeded:
        reason += " (all models exceed remaining budget)"
    if skipped_task_rule is not None:
        reason += f" (task-type rule {skipped_task_rule!r} exceeded budget)"
    return RoutingDecision(
        resolved_model=model,
        strategy_used=self.name,
        reason=reason,
    )

SmartStrategy

Combined strategy with priority-based signal merging.

Priority order: model_override > task_type rules > role_level rules > seniority default > cheapest available (budget-aware) > global fallback_chain > exhausted.

name property

name

Return strategy name.

select

select(request, config, resolver)

Select a model using all available signals.

Raises:

Type Description
NoAvailableModelError

If all candidates are exhausted.

Source code in src/synthorg/providers/routing/strategies.py
def select(
    self,
    request: RoutingRequest,
    config: RoutingConfig,
    resolver: ModelResolver,
) -> RoutingDecision:
    """Select a model using all available signals.

    Raises:
        NoAvailableModelError: If all candidates are exhausted.
    """
    return (
        self._try_override(request, resolver)
        or _try_task_type_rules(
            request,
            config,
            resolver,
            self.name,
        )
        or _try_role_rules(
            request,
            config,
            resolver,
            self.name,
        )
        or _try_seniority_default(
            request,
            resolver,
            self.name,
        )
        or self._try_cheapest(request, resolver)
        or self._try_global_chain(config, resolver)
        or self._raise_exhausted()
    )

Resilience

retry

Retry handler with exponential backoff and jitter.

RetryHandler

RetryHandler(config)

Wraps async callables with retry logic.

Retries transient errors (is_retryable=True) using exponential backoff with optional jitter. Non-retryable errors raise immediately. After exhausting max_retries, raises RetryExhaustedError.

Parameters:

Name Type Description Default
config RetryConfig

Retry configuration.

required
Source code in src/synthorg/providers/resilience/retry.py
def __init__(self, config: RetryConfig) -> None:
    self._config = config

execute async

execute(func)

Execute func with retry on transient errors.

Parameters:

Name Type Description Default
func Callable[[], Coroutine[object, object, T]]

Zero-argument async callable to execute.

required

Returns:

Type Description
T

The return value of func.

Raises:

Type Description
RetryExhaustedError

If all retries are exhausted.

ProviderError

If the error is non-retryable.

Source code in src/synthorg/providers/resilience/retry.py
async def execute(
    self,
    func: Callable[[], Coroutine[object, object, T]],
) -> T:
    """Execute *func* with retry on transient errors.

    Args:
        func: Zero-argument async callable to execute.

    Returns:
        The return value of *func*.

    Raises:
        RetryExhaustedError: If all retries are exhausted.
        ProviderError: If the error is non-retryable.
    """
    last_error: ProviderError | None = None

    for attempt in range(1 + self._config.max_retries):
        try:
            return await func()
        except ProviderError as exc:
            last_error = self._handle_retryable_error(exc)
            if last_error is None:
                raise
            if attempt >= self._config.max_retries:
                break
            delay = self._compute_delay(attempt, exc)
            logger.info(
                PROVIDER_RETRY_ATTEMPT,
                attempt=attempt + 1,
                max_retries=self._config.max_retries,
                delay=delay,
                error_type=type(exc).__name__,
            )
            await asyncio.sleep(delay)
        except Exception:
            logger.warning(
                PROVIDER_CALL_ERROR,
                reason="unexpected_non_provider_error",
                exc_info=True,
            )
            raise

    if last_error is None:
        msg = "RetryHandler reached exhaustion with no recorded error"
        raise RuntimeError(msg)
    logger.warning(
        PROVIDER_RETRY_EXHAUSTED,
        max_retries=self._config.max_retries,
        error_type=type(last_error).__name__,
    )
    raise RetryExhaustedError(last_error) from last_error

rate_limiter

Client-side rate limiter with RPM and concurrency controls.

RateLimiter

RateLimiter(config, *, provider_name)

Client-side rate limiter with RPM tracking and concurrency control.

Uses a sliding window for RPM tracking and an asyncio semaphore for concurrency limiting. Supports pause-until from provider retry_after hints.

Parameters:

Name Type Description Default
config RateLimiterConfig

Rate limiter configuration.

required
provider_name str

Provider name for logging context.

required
Source code in src/synthorg/providers/resilience/rate_limiter.py
def __init__(
    self,
    config: RateLimiterConfig,
    *,
    provider_name: str,
) -> None:
    self._config = config
    self._provider_name = provider_name
    self._semaphore: asyncio.Semaphore | None = (
        asyncio.Semaphore(config.max_concurrent)
        if config.max_concurrent > 0
        else None
    )
    self._request_timestamps: list[float] = []
    self._pause_until: float = 0.0
    self._rpm_lock: asyncio.Lock = asyncio.Lock()

is_enabled property

is_enabled

Whether any rate limiting is active.

acquire async

acquire()

Wait for an available slot.

Blocks until both the RPM window and concurrency semaphore allow a new request. Also respects any active pause.

Source code in src/synthorg/providers/resilience/rate_limiter.py
async def acquire(self) -> None:
    """Wait for an available slot.

    Blocks until both the RPM window and concurrency semaphore
    allow a new request.  Also respects any active pause.
    """
    if not self.is_enabled and self._pause_until <= time.monotonic():
        return

    # Respect pause-until from retry_after.
    # Re-check in a loop in case pause() extends _pause_until while sleeping.
    while True:
        now = time.monotonic()
        remaining = self._pause_until - now
        if remaining <= 0:
            break
        logger.info(
            PROVIDER_RATE_LIMITER_THROTTLED,
            provider=self._provider_name,
            wait_seconds=round(remaining, 2),
            reason="pause_active",
        )
        await asyncio.sleep(remaining)

    # RPM sliding window
    if self._config.max_requests_per_minute > 0:
        await self._wait_for_rpm_slot()

    # Concurrency semaphore
    if self._semaphore is not None:
        await self._semaphore.acquire()

release

release()

Release a concurrency slot.

Source code in src/synthorg/providers/resilience/rate_limiter.py
def release(self) -> None:
    """Release a concurrency slot."""
    if self._semaphore is not None:
        self._semaphore.release()

pause

pause(seconds)

Block new requests for seconds.

Called when a RateLimitError with retry_after is received. Multiple calls take the latest pause-until if it extends further.

Parameters:

Name Type Description Default
seconds float

Duration to pause in seconds. Must be finite and non-negative.

required

Raises:

Type Description
ValueError

If seconds is negative or not finite.

Source code in src/synthorg/providers/resilience/rate_limiter.py
def pause(self, seconds: float) -> None:
    """Block new requests for *seconds*.

    Called when a ``RateLimitError`` with ``retry_after`` is received.
    Multiple calls take the latest pause-until if it extends further.

    Args:
        seconds: Duration to pause in seconds.  Must be finite and
            non-negative.

    Raises:
        ValueError: If *seconds* is negative or not finite.
    """
    if not math.isfinite(seconds) or seconds < 0:
        msg = f"pause seconds must be a finite non-negative number, got {seconds!r}"
        raise ValueError(msg)
    new_until = time.monotonic() + seconds
    if new_until > self._pause_until:
        self._pause_until = new_until
        logger.info(
            PROVIDER_RATE_LIMITER_PAUSED,
            provider=self._provider_name,
            pause_seconds=round(seconds, 2),
        )

errors

Resilience-specific error types.

RetryExhaustedError

RetryExhaustedError(original_error)

Bases: ProviderError

All retry attempts exhausted for a retryable error.

Raised by RetryHandler when max_retries is reached. The engine layer catches this to trigger fallback chains.

Attributes:

Name Type Description
original_error

The last retryable error that was raised.

Initialize with the original error that exhausted retries.

Parameters:

Name Type Description Default
original_error ProviderError

The last retryable ProviderError.

required
Source code in src/synthorg/providers/resilience/errors.py
def __init__(
    self,
    original_error: ProviderError,
) -> None:
    """Initialize with the original error that exhausted retries.

    Args:
        original_error: The last retryable ``ProviderError``.
    """
    self.original_error = original_error
    super().__init__(
        f"Retry exhausted after error: {original_error.message}",
        context=dict(original_error.context),
    )