Skip to content

Budget

Cost tracking, budget enforcement, auto-downgrade, quota management, and CFO optimization.

Config

config

Budget configuration models.

Implements the Cost Controls section of the Operations design page: alert thresholds, per-task and per-agent limits, automatic model downgrade, and risk budget configuration.

BudgetAlertConfig pydantic-model

Bases: BaseModel

Alert threshold configuration for budget monitoring.

Thresholds are expressed as percentages of the total monthly budget. They must be strictly ordered: warn_at < critical_at < hard_stop_at.

Attributes:

Name Type Description
warn_at int

Percentage of budget that triggers a warning alert.

critical_at int

Percentage of budget that triggers a critical alert.

hard_stop_at int

Percentage of budget that triggers a hard stop.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_threshold_ordering

warn_at pydantic-field

warn_at = 75

Percent of budget triggering warning

critical_at pydantic-field

critical_at = 90

Percent of budget triggering critical alert

hard_stop_at pydantic-field

hard_stop_at = 100

Percent of budget triggering hard stop

AutoDowngradeConfig pydantic-model

Bases: BaseModel

Automatic model downgrade configuration.

When enabled, models are downgraded to cheaper alternatives once budget usage exceeds threshold percent. The downgrade_map is stored as a tuple of (source_alias, target_alias) pairs to maintain immutability.

Attributes:

Name Type Description
enabled bool

Whether auto-downgrade is active.

threshold int

Budget percent that triggers downgrade.

downgrade_map tuple[tuple[NotBlankStr, NotBlankStr], ...]

Ordered pairs of (from_alias, to_alias).

boundary Literal['task_assignment']

When to apply downgrade (task_assignment only, never mid-execution per the Operations design page).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _normalize_downgrade_map
  • _validate_downgrade_map

enabled pydantic-field

enabled = False

Whether auto-downgrade is active

threshold pydantic-field

threshold = 85

Budget percent triggering downgrade

downgrade_map pydantic-field

downgrade_map = ()

Ordered pairs of (from_alias, to_alias)

boundary pydantic-field

boundary = 'task_assignment'

When to apply downgrade (task_assignment only, never mid-execution)

BudgetConfig pydantic-model

Bases: BaseModel

Top-level budget configuration for a company.

Defines the overall monthly budget, alert thresholds, per-task and per-agent spending limits, and automatic model downgrade settings.

Attributes:

Name Type Description
total_monthly float

Monthly budget limit.

alerts BudgetAlertConfig

Alert threshold configuration.

per_task_limit float

Maximum cost per task.

per_agent_daily_limit float

Maximum cost per agent per day.

auto_downgrade AutoDowngradeConfig

Automatic model downgrade configuration.

reset_day int

Day of month when budget resets (1-28, avoids month-length issues).

currency str

ISO 4217 currency code for display formatting.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_per_task_limit_within_monthly
  • _validate_per_agent_daily_limit_within_monthly

total_monthly pydantic-field

total_monthly = 100.0

Monthly budget limit

alerts pydantic-field

alerts

Alert threshold configuration

per_task_limit pydantic-field

per_task_limit = 5.0

Maximum cost per task

per_agent_daily_limit pydantic-field

per_agent_daily_limit = 10.0

Maximum cost per agent per day

auto_downgrade pydantic-field

auto_downgrade

Automatic model downgrade configuration

reset_day pydantic-field

reset_day = 1

Day of month when budget resets (1-28, avoids month-length issues)

currency pydantic-field

currency = 'EUR'

ISO 4217 currency code for display formatting

risk_budget pydantic-field

risk_budget

Cumulative risk-unit action budget configuration

Cost Record

cost_record

Cost record model for per-API-call tracking.

Implements the Cost Tracking section of the Operations design page: every API call is tracked as an immutable cost record (append-only pattern).

CostRecord pydantic-model

Bases: BaseModel

Immutable record of a single API call's cost.

Once created, a CostRecord cannot be modified (frozen model). This enforces the append-only pattern: new records are created for each API call; existing records are never updated.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier (string reference).

task_id NotBlankStr

Task identifier (string reference).

provider NotBlankStr

LLM provider name.

model NotBlankStr

Model identifier.

input_tokens int

Input token count.

output_tokens int

Output token count.

cost_usd float

Cost in USD (base currency).

timestamp AwareDatetime

Timezone-aware timestamp of the API call.

call_category LLMCallCategory | None

Optional LLM call category for coordination metrics (productive, coordination, system).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_token_consistency

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id

Task identifier

provider pydantic-field

provider

LLM provider name

model pydantic-field

model

Model identifier

input_tokens pydantic-field

input_tokens

Input token count

output_tokens pydantic-field

output_tokens

Output token count

cost_usd pydantic-field

cost_usd

Cost in USD (base currency)

timestamp pydantic-field

timestamp

Timestamp of the API call

call_category pydantic-field

call_category = None

LLM call category (productive, coordination, system)

Spending Summary

spending_summary

Spending summary models for aggregated cost reporting.

Provides the aggregation data structures used by :class:~synthorg.budget.tracker.CostTracker for cost reporting and designed for consumption by the CFO agent (see Operations design page). Views of :class:~synthorg.budget.cost_record.CostRecord data are aggregated by agent, department, and time period.

PeriodSpending pydantic-model

Bases: _SpendingTotals

Spending aggregation for a specific time period.

Attributes:

Name Type Description
start datetime

Period start (inclusive).

end datetime

Period end (exclusive).

Fields:

  • total_cost_usd (float)
  • total_input_tokens (int)
  • total_output_tokens (int)
  • record_count (int)
  • start (datetime)
  • end (datetime)

Validators:

  • _validate_period_ordering

start pydantic-field

start

Period start (inclusive)

end pydantic-field

end

Period end (exclusive)

AgentSpending pydantic-model

Bases: _SpendingTotals

Spending aggregation for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

Fields:

  • total_cost_usd (float)
  • total_input_tokens (int)
  • total_output_tokens (int)
  • record_count (int)
  • agent_id (NotBlankStr)

agent_id pydantic-field

agent_id

Agent identifier

DepartmentSpending pydantic-model

Bases: _SpendingTotals

Spending aggregation for a department.

Attributes:

Name Type Description
department_name NotBlankStr

Department name.

Fields:

department_name pydantic-field

department_name

Department name

SpendingSummary pydantic-model

Bases: BaseModel

Top-level spending summary combining all aggregation dimensions.

Provides a snapshot of spending broken down by time period, agent, and department, along with budget utilization context.

Attributes:

Name Type Description
period PeriodSpending

Time-period aggregation.

by_agent tuple[AgentSpending, ...]

Per-agent spending breakdown.

by_department tuple[DepartmentSpending, ...]

Per-department spending breakdown.

budget_total_monthly float

Monthly budget for context.

budget_used_percent float

Percent of budget consumed.

alert_level BudgetAlertLevel

Current budget alert level.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_agent_ids
  • _validate_unique_department_names

period pydantic-field

period

Time-period aggregation

by_agent pydantic-field

by_agent = ()

Per-agent spending breakdown

by_department pydantic-field

by_department = ()

Per-department spending breakdown

budget_total_monthly pydantic-field

budget_total_monthly = 0.0

Monthly budget for context

budget_used_percent pydantic-field

budget_used_percent = 0.0

Percent of budget consumed

alert_level pydantic-field

alert_level = NORMAL

Current budget alert level

Cost Tiers

cost_tiers

Cost tier definitions and classification.

Provides configurable metadata for cost tiers: price ranges, display properties, and model-to-tier classification. The built-in CostTier enum (synthorg.core.enums) defines the tier values; this module adds a configurable layer on top.

CostTierDefinition pydantic-model

Bases: BaseModel

Metadata for a single cost tier.

Attributes:

Name Type Description
id NotBlankStr

Unique tier identifier (e.g. "low", "custom-budget").

display_name NotBlankStr

Human-readable name.

description str

What this tier represents.

price_range_min float

Minimum cost_per_1k_total for models in this tier.

price_range_max float | None

Maximum cost_per_1k_total; None means unbounded above.

color str

Hex color for UI rendering.

icon str

Icon identifier for UI rendering.

sort_order int

Display ordering (lower = cheaper).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_price_range

id pydantic-field

id

Unique tier identifier

display_name pydantic-field

display_name

Human-readable name

description pydantic-field

description = ''

What this tier represents

price_range_min pydantic-field

price_range_min = 0.0

Minimum cost_per_1k_total in USD (base currency)

price_range_max pydantic-field

price_range_max = None

Maximum cost_per_1k_total in USD (base currency); None = unbounded

color pydantic-field

color = '#6b7280'

Hex color for UI

icon pydantic-field

icon = 'circle'

Icon identifier for UI

sort_order pydantic-field

sort_order = 0

Display ordering (lower = cheaper)

CostTiersConfig pydantic-model

Bases: BaseModel

Configuration for cost tier definitions.

Attributes:

Name Type Description
tiers tuple[CostTierDefinition, ...]

User-defined tier overrides/additions.

include_builtin bool

Whether to merge built-in default tiers.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_ids

tiers pydantic-field

tiers = ()

User-defined tier overrides/additions

include_builtin pydantic-field

include_builtin = True

Whether to merge built-in default tiers

resolve_tiers

resolve_tiers(config)

Merge built-in and user-defined tiers, sorted by sort_order.

User tiers override built-in tiers with the same ID.

Parameters:

Name Type Description Default
config CostTiersConfig

Cost tiers configuration.

required

Returns:

Type Description
tuple[CostTierDefinition, ...]

Merged and sorted tuple of tier definitions.

Source code in src/synthorg/budget/cost_tiers.py
def resolve_tiers(
    config: CostTiersConfig,
) -> tuple[CostTierDefinition, ...]:
    """Merge built-in and user-defined tiers, sorted by sort_order.

    User tiers override built-in tiers with the same ID.

    Args:
        config: Cost tiers configuration.

    Returns:
        Merged and sorted tuple of tier definitions.
    """
    if not config.include_builtin:
        result = sorted(config.tiers, key=lambda t: t.sort_order)
        logger.debug(
            BUDGET_TIER_RESOLVED,
            tier_count=len(result),
            include_builtin=False,
        )
        return tuple(result)

    # User tiers override built-in by ID
    user_ids = {t.id for t in config.tiers}
    merged: list[CostTierDefinition] = [
        t for t in BUILTIN_TIERS if t.id not in user_ids
    ]
    merged.extend(config.tiers)
    merged.sort(key=lambda t: t.sort_order)

    logger.debug(
        BUDGET_TIER_RESOLVED,
        tier_count=len(merged),
        include_builtin=True,
        overridden_ids=sorted(user_ids & {t.id for t in BUILTIN_TIERS}),
    )
    return tuple(merged)

classify_model_tier

classify_model_tier(cost_per_1k_total, tiers)

Classify a model into a cost tier based on total cost per 1k tokens.

Matches the first tier whose price range contains the given cost. Range check: min <= cost < max (or min <= cost if max is None). If tiers have overlapping ranges, the first match in iteration order wins -- callers should ensure tiers are sorted by sort_order.

Parameters:

Name Type Description Default
cost_per_1k_total float

Combined cost_per_1k_input + cost_per_1k_output.

required
tiers tuple[CostTierDefinition, ...]

Resolved tier definitions (should be sorted by sort_order).

required

Returns:

Type Description
str | None

Tier ID of the matching tier, or None if no tier matches.

Source code in src/synthorg/budget/cost_tiers.py
def classify_model_tier(
    cost_per_1k_total: float,
    tiers: tuple[CostTierDefinition, ...],
) -> str | None:
    """Classify a model into a cost tier based on total cost per 1k tokens.

    Matches the first tier whose price range contains the given cost.
    Range check: ``min <= cost < max`` (or ``min <= cost`` if max is
    ``None``).  If tiers have overlapping ranges, the first match in
    iteration order wins -- callers should ensure tiers are sorted by
    ``sort_order``.

    Args:
        cost_per_1k_total: Combined ``cost_per_1k_input +
            cost_per_1k_output``.
        tiers: Resolved tier definitions (should be sorted by
            sort_order).

    Returns:
        Tier ID of the matching tier, or ``None`` if no tier matches.
    """
    if cost_per_1k_total < 0:
        logger.warning(
            BUDGET_TIER_CLASSIFY_MISS,
            cost_per_1k_total=cost_per_1k_total,
            tier_count=len(tiers),
            reason="negative_cost",
        )
        return None

    for tier in tiers:
        if tier.price_range_max is None:
            if cost_per_1k_total >= tier.price_range_min:
                return tier.id
        elif tier.price_range_min <= cost_per_1k_total < tier.price_range_max:
            return tier.id

    logger.debug(
        BUDGET_TIER_CLASSIFY_MISS,
        cost_per_1k_total=cost_per_1k_total,
        tier_count=len(tiers),
    )
    return None

Hierarchy

hierarchy

Budget hierarchy models.

Implements the Budget Hierarchy section of the Operations design page: Company to Department to Team, with percentage-based allocation at each level.

TeamBudget pydantic-model

Bases: BaseModel

Budget allocation for a single team within a department.

Attributes:

Name Type Description
team_name NotBlankStr

Team name (string reference).

budget_percent float

Percent of department budget allocated to this team.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

team_name pydantic-field

team_name

Team name

budget_percent pydantic-field

budget_percent = 0.0

Percent of department budget

DepartmentBudget pydantic-model

Bases: BaseModel

Budget allocation for a department with nested team allocations.

Team budget percentages may sum to less than 100% to allow for an unallocated reserve within the department.

Attributes:

Name Type Description
department_name NotBlankStr

Department name (string reference).

budget_percent float

Percent of company budget allocated to this department.

teams tuple[TeamBudget, ...]

Team budget allocations within this department.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_team_names
  • _validate_team_budget_sum

department_name pydantic-field

department_name

Department name

budget_percent pydantic-field

budget_percent = 0.0

Percent of company budget

teams pydantic-field

teams = ()

Team budget allocations

BudgetHierarchy pydantic-model

Bases: BaseModel

Company-wide budget hierarchy.

Maps the Company -> Department -> Team nesting from a budget allocation perspective (see Operations design page). Department budget percentages may sum to less than 100% to allow for an unallocated reserve at the company level.

Attributes:

Name Type Description
total_monthly float

Total company monthly budget in USD (base currency).

departments tuple[DepartmentBudget, ...]

Department budget allocations.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_department_names
  • _validate_department_budget_sum

total_monthly pydantic-field

total_monthly

Total company monthly budget in USD (base currency)

departments pydantic-field

departments = ()

Department budget allocations

Tracker

tracker

Real-time cost tracking service.

Provides an in-memory store with TTL-based eviction for :class:CostRecord entries and aggregation queries consumed by the CFO agent and budget monitoring.

Service layer for the cost tracking schema defined in the Operations design page. The current implementation is purely in-memory; persistence integration is planned.

ProviderUsageSummary

Bases: NamedTuple

Per-provider usage totals for a time window.

CostTracker

CostTracker(
    *,
    budget_config=None,
    department_resolver=None,
    auto_prune_threshold=_AUTO_PRUNE_THRESHOLD,
)

In-memory cost tracking service with TTL-based eviction.

Records :class:CostRecord entries from LLM API calls and provides aggregation queries for budget monitoring. Memory is bounded by a soft TTL-based auto-prune: when the record count exceeds auto_prune_threshold, records older than 168 hours (7 days) are removed on the next query.

Parameters:

Name Type Description Default
budget_config BudgetConfig | None

Optional budget configuration for alert level computation. When None, alert level defaults to NORMAL and budget_used_percent to 0.0.

None
department_resolver Callable[[str], str | None] | None

Optional callable mapping agent_id to a department name. When None or returning None for an agent, the agent is excluded from department aggregation.

None
auto_prune_threshold int

Maximum record count before auto-pruning is triggered on snapshot. Defaults to 100,000.

_AUTO_PRUNE_THRESHOLD

Raises:

Type Description
ValueError

If auto_prune_threshold < 1.

Source code in src/synthorg/budget/tracker.py
def __init__(
    self,
    *,
    budget_config: BudgetConfig | None = None,
    department_resolver: Callable[[str], str | None] | None = None,
    auto_prune_threshold: int = _AUTO_PRUNE_THRESHOLD,
) -> None:
    if auto_prune_threshold < 1:
        msg = f"auto_prune_threshold must be >= 1, got {auto_prune_threshold}"
        raise ValueError(msg)
    self._records: list[CostRecord] = []
    self._lock: asyncio.Lock = asyncio.Lock()
    self._budget_config = budget_config
    self._department_resolver = department_resolver
    self._auto_prune_threshold = auto_prune_threshold
    logger.debug(
        BUDGET_TRACKER_CREATED,
        has_budget_config=budget_config is not None,
        has_department_resolver=department_resolver is not None,
    )

budget_config property

budget_config

The optional budget configuration.

Returns:

Type Description
BudgetConfig | None

Budget config if set, else None.

record async

record(cost_record)

Append a cost record.

Parameters:

Name Type Description Default
cost_record CostRecord

Immutable cost record to store.

required
Source code in src/synthorg/budget/tracker.py
async def record(self, cost_record: CostRecord) -> None:
    """Append a cost record.

    Args:
        cost_record: Immutable cost record to store.
    """
    async with self._lock:
        self._records.append(cost_record)
        logger.info(
            BUDGET_RECORD_ADDED,
            agent_id=cost_record.agent_id,
            model=cost_record.model,
            cost_usd=cost_record.cost_usd,
        )

prune_expired async

prune_expired(*, now=None)

Remove records older than the 168-hour (7-day) cost window.

Call periodically from long-running services to bound memory growth.

Parameters:

Name Type Description Default
now datetime | None

Reference time. Defaults to current UTC time.

None

Returns:

Type Description
int

Number of records removed.

Source code in src/synthorg/budget/tracker.py
async def prune_expired(self, *, now: datetime | None = None) -> int:
    """Remove records older than the 168-hour (7-day) cost window.

    Call periodically from long-running services to bound
    memory growth.

    Args:
        now: Reference time.  Defaults to current UTC time.

    Returns:
        Number of records removed.
    """
    ref = now or datetime.now(UTC)
    cutoff = ref - timedelta(hours=_COST_WINDOW_HOURS)
    async with self._lock:
        pruned = self._prune_before(cutoff)
        if pruned:
            logger.info(
                BUDGET_RECORDS_PRUNED,
                pruned=pruned,
                remaining=len(self._records),
            )
        return pruned

get_total_cost async

get_total_cost(*, start=None, end=None)

Sum of cost_usd across all records, optionally filtered by time.

Parameters:

Name Type Description Default
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
float

Rounded total cost in USD (base currency).

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_total_cost(
    self,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> float:
    """Sum of ``cost_usd`` across all records, optionally filtered by time.

    Args:
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Rounded total cost in USD (base currency).

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(BUDGET_TOTAL_COST_QUERIED, start=start, end=end)
    snapshot = await self._snapshot()
    filtered = _filter_records(snapshot, start=start, end=end)
    return _aggregate(filtered).cost

get_agent_cost async

get_agent_cost(agent_id, *, start=None, end=None)

Sum of cost_usd for a single agent, optionally filtered by time.

Parameters:

Name Type Description Default
agent_id str

Agent identifier to filter by.

required
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
float

Rounded total cost in USD (base currency) for the agent.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_agent_cost(
    self,
    agent_id: str,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> float:
    """Sum of ``cost_usd`` for a single agent, optionally filtered by time.

    Args:
        agent_id: Agent identifier to filter by.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Rounded total cost in USD (base currency) for the agent.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(
        BUDGET_AGENT_COST_QUERIED,
        agent_id=agent_id,
        start=start,
        end=end,
    )
    snapshot = await self._snapshot()
    filtered = _filter_records(
        snapshot,
        agent_id=agent_id,
        start=start,
        end=end,
    )
    return _aggregate(filtered).cost

get_record_count async

get_record_count()

Total number of recorded cost entries.

Returns:

Type Description
int

Number of cost records.

Source code in src/synthorg/budget/tracker.py
async def get_record_count(self) -> int:
    """Total number of recorded cost entries.

    Returns:
        Number of cost records.
    """
    async with self._lock:
        return len(self._records)

get_records async

get_records(*, agent_id=None, task_id=None, provider=None, start=None, end=None)

Return filtered cost records.

Returns an immutable snapshot of records matching the filters.

Parameters:

Name Type Description Default
agent_id str | None

Filter by agent.

None
task_id str | None

Filter by task.

None
provider NotBlankStr | None

Filter by provider name.

None
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
tuple[CostRecord, ...]

Immutable tuple of matching cost records.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_records(
    self,
    *,
    agent_id: str | None = None,
    task_id: str | None = None,
    provider: NotBlankStr | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
) -> tuple[CostRecord, ...]:
    """Return filtered cost records.

    Returns an immutable snapshot of records matching the filters.

    Args:
        agent_id: Filter by agent.
        task_id: Filter by task.
        provider: Filter by provider name.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Immutable tuple of matching cost records.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(
        BUDGET_RECORDS_QUERIED,
        agent_id=agent_id,
        task_id=task_id,
        provider=provider,
        start=start,
        end=end,
    )
    snapshot = await self._snapshot()
    return _filter_records(
        snapshot,
        agent_id=agent_id,
        task_id=task_id,
        provider=provider,
        start=start,
        end=end,
    )

get_provider_usage async

get_provider_usage(provider_name, *, start=None, end=None)

Return aggregated token and cost totals for a provider.

Parameters:

Name Type Description Default
provider_name NotBlankStr

Provider to aggregate usage for.

required
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
ProviderUsageSummary

Total tokens (input + output) and total cost.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_provider_usage(
    self,
    provider_name: NotBlankStr,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> ProviderUsageSummary:
    """Return aggregated token and cost totals for a provider.

    Args:
        provider_name: Provider to aggregate usage for.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Total tokens (input + output) and total cost.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(
        BUDGET_PROVIDER_USAGE_QUERIED,
        provider=provider_name,
        start=start,
        end=end,
    )
    snapshot = await self._snapshot()
    filtered = _filter_records(
        snapshot,
        provider=provider_name,
        start=start,
        end=end,
    )
    if not filtered:
        return ProviderUsageSummary(total_tokens=0, total_cost=0.0)
    agg = _aggregate(filtered)
    return ProviderUsageSummary(
        total_tokens=agg.input_tokens + agg.output_tokens,
        total_cost=agg.cost,
    )

build_summary async

build_summary(*, start, end)

Build a spending summary for the given period.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required

Returns:

Type Description
SpendingSummary

Aggregated spending summary with breakdowns and alert level.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/tracker.py
async def build_summary(
    self,
    *,
    start: datetime,
    end: datetime,
) -> SpendingSummary:
    """Build a spending summary for the given period.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.

    Returns:
        Aggregated spending summary with breakdowns and alert level.

    Raises:
        ValueError: If ``start >= end``.
    """
    _validate_time_range(start, end)
    retention_cutoff = datetime.now(UTC) - timedelta(
        hours=_COST_WINDOW_HOURS,
    )
    if start < retention_cutoff:
        logger.warning(
            BUDGET_QUERY_EXCEEDS_RETENTION,
            requested_start=start.isoformat(),
            retention_cutoff=retention_cutoff.isoformat(),
            retention_hours=_COST_WINDOW_HOURS,
        )
    snapshot = await self._snapshot()
    filtered = _filter_records(snapshot, start=start, end=end)
    totals = _aggregate(filtered)

    agent_spendings = _build_agent_spendings(filtered)
    dept_spendings = self._build_dept_spendings(agent_spendings)
    budget_monthly, used_pct, alert = self._build_budget_context(
        totals.cost,
    )

    summary = SpendingSummary(
        period=PeriodSpending(
            start=start,
            end=end,
            total_cost_usd=totals.cost,
            total_input_tokens=totals.input_tokens,
            total_output_tokens=totals.output_tokens,
            record_count=totals.record_count,
        ),
        by_agent=tuple(agent_spendings),
        by_department=tuple(dept_spendings),
        budget_total_monthly=budget_monthly,
        budget_used_percent=used_pct,
        alert_level=alert,
    )

    logger.info(
        BUDGET_SUMMARY_BUILT,
        total_cost_usd=totals.cost,
        record_count=totals.record_count,
        agent_count=len(agent_spendings),
        department_count=len(dept_spendings),
        alert_level=alert.value,
    )

    return summary

get_category_breakdown async

get_category_breakdown(*, agent_id=None, task_id=None, start=None, end=None)

Build a per-category cost breakdown.

Parameters:

Name Type Description Default
agent_id str | None

Filter by agent.

None
task_id str | None

Filter by task.

None
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
CategoryBreakdown

Category breakdown of cost, tokens, and call counts.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_category_breakdown(
    self,
    *,
    agent_id: str | None = None,
    task_id: str | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
) -> CategoryBreakdown:
    """Build a per-category cost breakdown.

    Args:
        agent_id: Filter by agent.
        task_id: Filter by task.
        start: Inclusive lower bound on timestamp.
        end: Exclusive upper bound on timestamp.

    Returns:
        Category breakdown of cost, tokens, and call counts.

    Raises:
        ValueError: If ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(
        BUDGET_CATEGORY_BREAKDOWN_QUERIED,
        agent_id=agent_id,
        task_id=task_id,
        start=start,
        end=end,
    )
    snapshot = await self._snapshot()
    filtered = _filter_records(
        snapshot,
        agent_id=agent_id,
        task_id=task_id,
        start=start,
        end=end,
    )
    return build_category_breakdown(filtered)

get_orchestration_ratio async

get_orchestration_ratio(
    *, agent_id=None, task_id=None, start=None, end=None, thresholds=None
)

Compute the orchestration overhead ratio.

Parameters:

Name Type Description Default
agent_id str | None

Filter by agent.

None
task_id str | None

Filter by task.

None
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None
thresholds OrchestrationAlertThresholds | None

Optional custom alert thresholds.

None

Returns:

Type Description
OrchestrationRatio

Orchestration ratio with alert level.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_orchestration_ratio(
    self,
    *,
    agent_id: str | None = None,
    task_id: str | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
    thresholds: OrchestrationAlertThresholds | None = None,
) -> OrchestrationRatio:
    """Compute the orchestration overhead ratio.

    Args:
        agent_id: Filter by agent.
        task_id: Filter by task.
        start: Inclusive lower bound on timestamp.
        end: Exclusive upper bound on timestamp.
        thresholds: Optional custom alert thresholds.

    Returns:
        Orchestration ratio with alert level.

    Raises:
        ValueError: If ``start >= end``.
    """
    breakdown = await self.get_category_breakdown(
        agent_id=agent_id,
        task_id=task_id,
        start=start,
        end=end,
    )
    result = compute_orchestration_ratio(
        breakdown,
        thresholds=thresholds,
    )
    logger.debug(
        BUDGET_ORCHESTRATION_RATIO_QUERIED,
        agent_id=agent_id,
        task_id=task_id,
        ratio=result.ratio,
        alert_level=result.alert_level.value,
    )
    if result.alert_level != OrchestrationAlertLevel.NORMAL:
        logger.warning(
            BUDGET_ORCHESTRATION_RATIO_ALERT,
            agent_id=agent_id,
            task_id=task_id,
            ratio=result.ratio,
            alert_level=result.alert_level.value,
        )
    return result

Enforcer

enforcer

Budget enforcement service.

Composes :class:~synthorg.budget.tracker.CostTracker and :class:~synthorg.budget.config.BudgetConfig to provide pre-flight checks, in-flight budget checking, and task-boundary auto-downgrade as described in the Cost Controls section of the Operations design page.

BudgetEnforcer

BudgetEnforcer(
    *,
    budget_config,
    cost_tracker,
    model_resolver=None,
    quota_tracker=None,
    degradation_configs=None,
    risk_tracker=None,
    risk_scorer=None,
)

Budget enforcement: pre-flight, in-flight, and auto-downgrade.

Concurrency-safe via CostTracker's asyncio.Lock. Pre-flight checks are best-effort under concurrency (TOCTOU); the in-flight checker is the true safety net.

Parameters:

Name Type Description Default
budget_config BudgetConfig

Limits and thresholds.

required
cost_tracker CostTracker

Spend queries.

required
model_resolver ModelResolver | None

Auto-downgrade alias lookup.

None
quota_tracker QuotaTracker | None

Provider-level quota enforcement.

None
degradation_configs Mapping[str, DegradationConfig] | None

Per-provider degradation strategies.

None
risk_tracker RiskTracker | None

Optional risk tracking service.

None
risk_scorer RiskScorer | None

Optional risk scoring implementation.

None
Source code in src/synthorg/budget/enforcer.py
def __init__(  # noqa: PLR0913
    self,
    *,
    budget_config: BudgetConfig,
    cost_tracker: CostTracker,
    model_resolver: ModelResolver | None = None,
    quota_tracker: QuotaTracker | None = None,
    degradation_configs: Mapping[str, DegradationConfig] | None = None,
    risk_tracker: RiskTracker | None = None,
    risk_scorer: RiskScorer | None = None,
) -> None:
    self._budget_config = budget_config
    self._cost_tracker = cost_tracker
    self._model_resolver = model_resolver
    self._quota_tracker = quota_tracker
    self._degradation_configs: MappingProxyType[str, DegradationConfig] | None = (
        MappingProxyType(copy.deepcopy(dict(degradation_configs)))
        if degradation_configs is not None
        else None
    )
    self._risk_tracker = risk_tracker
    self._risk_scorer = risk_scorer

    if budget_config.risk_budget.enabled and (
        risk_tracker is None or risk_scorer is None
    ):
        logger.warning(
            RISK_BUDGET_ENFORCEMENT_CHECK,
            reason="risk_budget_enabled_but_missing_deps",
            has_tracker=risk_tracker is not None,
            has_scorer=risk_scorer is not None,
        )

cost_tracker property

cost_tracker

The underlying cost tracker.

risk_tracker property

risk_tracker

The optional risk tracker.

currency property

currency

The configured ISO 4217 currency code.

get_budget_utilization_pct async

get_budget_utilization_pct()

Return monthly budget utilization as a percentage (0--100+).

Returns None when disabled (total_monthly <= 0) or when the cost query fails (graceful degradation).

Source code in src/synthorg/budget/enforcer.py
async def get_budget_utilization_pct(self) -> float | None:
    """Return monthly budget utilization as a percentage (0--100+).

    Returns ``None`` when disabled (``total_monthly <= 0``) or
    when the cost query fails (graceful degradation).
    """
    cfg = self._budget_config
    if cfg.total_monthly <= 0:
        return None
    try:
        period_start = billing_period_start(cfg.reset_day)
        monthly_cost = await self._cost_tracker.get_total_cost(
            start=period_start,
        )
    except MemoryError, RecursionError:
        raise
    except Exception:
        logger.exception(
            BUDGET_UTILIZATION_ERROR,
            reason="falling_back_to_none",
        )
        return None
    else:
        pct = monthly_cost / cfg.total_monthly * 100
        logger.debug(
            BUDGET_UTILIZATION_QUERIED,
            monthly_cost=monthly_cost,
            total_monthly=cfg.total_monthly,
            utilization_pct=pct,
        )
        return pct

check_can_execute async

check_can_execute(agent_id, *, provider_name=None, estimated_tokens=0)

Pre-flight: verify monthly + daily + quota limits allow execution.

Parameters:

Name Type Description Default
agent_id str

Agent requesting execution.

required
provider_name str | None

Optional provider name for quota checks. When None, quota check is skipped.

None
estimated_tokens int

Estimated tokens for the upcoming request. Forwarded to the quota tracker for token-based checks.

0

Returns:

Type Description
PreFlightResult

Pre-flight result with effective provider info and

PreFlightResult

degradation details when applicable.

Raises:

Type Description
BudgetExhaustedError

Monthly hard stop exceeded.

DailyLimitExceededError

Agent daily limit exceeded (subclass of BudgetExhaustedError).

QuotaExhaustedError

Provider quota exhausted and degradation could not resolve.

Source code in src/synthorg/budget/enforcer.py
async def check_can_execute(
    self,
    agent_id: str,
    *,
    provider_name: str | None = None,
    estimated_tokens: int = 0,
) -> PreFlightResult:
    """Pre-flight: verify monthly + daily + quota limits allow execution.

    Args:
        agent_id: Agent requesting execution.
        provider_name: Optional provider name for quota checks.
            When ``None``, quota check is skipped.
        estimated_tokens: Estimated tokens for the upcoming request.
            Forwarded to the quota tracker for token-based checks.

    Returns:
        Pre-flight result with effective provider info and
        degradation details when applicable.

    Raises:
        BudgetExhaustedError: Monthly hard stop exceeded.
        DailyLimitExceededError: Agent daily limit exceeded
            (subclass of ``BudgetExhaustedError``).
        QuotaExhaustedError: Provider quota exhausted and
            degradation could not resolve.
    """
    cfg = self._budget_config
    degradation_result: DegradationResult | None = None

    try:
        if cfg.total_monthly > 0:
            await self._check_monthly_hard_stop(cfg, agent_id)
        await self._check_daily_limit(cfg, agent_id)

        if provider_name is not None:
            degradation_result = await self._check_provider_quota(
                agent_id,
                provider_name,
                estimated_tokens=estimated_tokens,
            )
    except BudgetExhaustedError:
        raise
    except MemoryError, RecursionError:
        raise
    except Exception:
        logger.exception(
            BUDGET_PREFLIGHT_ERROR,
            agent_id=agent_id,
            reason="falling_back_to_allow_execution",
        )
        return PreFlightResult()

    logger.debug(
        BUDGET_ENFORCEMENT_CHECK,
        agent_id=agent_id,
        result="pass",
    )
    if degradation_result is not None:
        return PreFlightResult(degradation=degradation_result)
    return PreFlightResult()

check_quota async

check_quota(provider_name, *, estimated_tokens=0)

Check provider quota via QuotaTracker.

Returns always-allowed when no quota tracker is configured.

Note

Unlike check_can_execute, this method does not catch unexpected exceptions from the underlying QuotaTracker. check_can_execute wraps quota checks in a try/except that falls back to allowing execution on unexpected errors (graceful degradation), but direct callers of check_quota are responsible for their own error handling.

Parameters:

Name Type Description Default
provider_name str

Provider to check.

required
estimated_tokens int

Estimated tokens for the request.

0

Returns:

Type Description
QuotaCheckResult

Quota check result.

Source code in src/synthorg/budget/enforcer.py
async def check_quota(
    self,
    provider_name: str,
    *,
    estimated_tokens: int = 0,
) -> QuotaCheckResult:
    """Check provider quota via QuotaTracker.

    Returns always-allowed when no quota tracker is configured.

    Note:
        Unlike ``check_can_execute``, this method does **not**
        catch unexpected exceptions from the underlying
        ``QuotaTracker``.  ``check_can_execute`` wraps quota
        checks in a try/except that falls back to allowing
        execution on unexpected errors (graceful degradation),
        but direct callers of ``check_quota`` are responsible
        for their own error handling.

    Args:
        provider_name: Provider to check.
        estimated_tokens: Estimated tokens for the request.

    Returns:
        Quota check result.
    """
    if self._quota_tracker is None:
        logger.debug(
            QUOTA_CHECK_ALLOWED,
            provider=provider_name,
            reason="no_quota_tracker",
        )
        return always_allowed_result(provider_name)

    return await self._quota_tracker.check_quota(
        provider_name,
        estimated_tokens=estimated_tokens,
    )

resolve_model async

resolve_model(identity)

Apply auto-downgrade at task boundary if threshold exceeded.

Returns identity unchanged when downgrade is disabled, not applicable, or lookup fails. Returns new AgentIdentity with downgraded ModelConfig otherwise.

Source code in src/synthorg/budget/enforcer.py
async def resolve_model(
    self,
    identity: AgentIdentity,
) -> AgentIdentity:
    """Apply auto-downgrade at task boundary if threshold exceeded.

    Returns identity unchanged when downgrade is disabled, not
    applicable, or lookup fails.  Returns new ``AgentIdentity``
    with downgraded ``ModelConfig`` otherwise.
    """
    cfg = self._budget_config
    downgrade = cfg.auto_downgrade

    if (
        not downgrade.enabled
        or cfg.total_monthly <= 0
        or self._model_resolver is None
    ):
        return identity

    try:
        period_start = billing_period_start(cfg.reset_day)
        monthly_cost = await self._cost_tracker.get_total_cost(
            start=period_start,
        )
    except MemoryError, RecursionError:  # builtin MemoryError (OOM)
        raise
    except Exception:
        logger.exception(
            BUDGET_RESOLVE_MODEL_ERROR,
            agent_id=str(identity.id),
            reason="cost_tracker_query_failed",
        )
        return identity

    used_pct = round(
        monthly_cost / cfg.total_monthly * 100,
        BUDGET_ROUNDING_PRECISION,
    )

    if used_pct < downgrade.threshold:
        return identity

    return _apply_downgrade(
        identity,
        self._model_resolver,
        downgrade.downgrade_map,
        used_pct,
        downgrade.threshold,
    )

make_budget_checker async

make_budget_checker(task, agent_id)

Create a sync BudgetChecker with pre-computed baselines.

Checks task limit, monthly total, and agent daily limit. Baselines are snapshot-in-time (TOCTOU acceptable). Returns None when all limits are disabled.

Source code in src/synthorg/budget/enforcer.py
async def make_budget_checker(
    self,
    task: Task,
    agent_id: str,
) -> BudgetChecker | None:
    """Create a sync BudgetChecker with pre-computed baselines.

    Checks task limit, monthly total, and agent daily limit.
    Baselines are snapshot-in-time (TOCTOU acceptable).
    Returns ``None`` when all limits are disabled.
    """
    cfg = self._budget_config
    task_limit = task.budget_limit
    monthly_budget = cfg.total_monthly
    daily_limit = cfg.per_agent_daily_limit

    # All enforcement disabled -- monthly, task, and daily all off.
    if monthly_budget <= 0 and task_limit <= 0 and daily_limit <= 0:
        return None

    monthly_baseline, daily_baseline = await self._compute_baselines_safe(
        cfg,
        monthly_budget,
        daily_limit,
        agent_id,
    )

    thresholds = _compute_thresholds(cfg, monthly_budget)

    return _build_checker_closure(
        task_limit=task_limit,
        monthly_budget=monthly_budget,
        daily_limit=daily_limit,
        monthly_baseline=monthly_baseline,
        daily_baseline=daily_baseline,
        thresholds=thresholds,
        agent_id=agent_id,
    )

check_risk_budget async

check_risk_budget(agent_id, task_id, action_type)

Pre-flight risk budget check.

Checks per-task, per-agent daily, and total daily risk limits including the projected risk of the pending action.

Pre-flight checks are best-effort under concurrency (TOCTOU). See class docstring.

Raises:

Type Description
RiskBudgetExhaustedError

When a risk limit is exceeded and enforcement is active.

Source code in src/synthorg/budget/enforcer.py
async def check_risk_budget(
    self,
    agent_id: str,
    task_id: str,
    action_type: str,
) -> RiskCheckResult:
    """Pre-flight risk budget check.

    Checks per-task, per-agent daily, and total daily risk limits
    including the projected risk of the pending action.

    Pre-flight checks are best-effort under concurrency (TOCTOU).
    See class docstring.

    Raises:
        RiskBudgetExhaustedError: When a risk limit is exceeded
            and enforcement is active.
    """
    risk_cfg = self._budget_config.risk_budget
    if not risk_cfg.enabled or self._risk_tracker is None:
        return RiskCheckResult()

    logger.debug(
        RISK_BUDGET_ENFORCEMENT_CHECK,
        agent_id=agent_id,
        task_id=task_id,
        action_type=action_type,
    )

    try:
        # Score the pending action for projected enforcement.
        projected = 0.0
        if self._risk_scorer is not None:
            projected = self._risk_scorer.score(action_type).risk_units

        day_start = daily_period_start()
        t = self._risk_tracker
        checks = (
            (
                risk_cfg.per_task_risk_limit,
                partial(t.get_task_risk, task_id),
                RISK_BUDGET_TASK_LIMIT_EXCEEDED,
                "Per-task",
            ),
            (
                risk_cfg.per_agent_daily_risk_limit,
                partial(t.get_agent_risk, agent_id, start=day_start),
                RISK_BUDGET_DAILY_LIMIT_EXCEEDED,
                "Per-agent daily",
            ),
            (
                risk_cfg.total_daily_risk_limit,
                partial(t.get_total_risk, start=day_start),
                RISK_BUDGET_LIMIT_EXCEEDED,
                "Total daily",
            ),
        )
        for limit, get_risk, event, label in checks:
            self._enforce_risk_limit(
                limit,
                await get_risk(),
                projected,
                event,
                label,
                agent_id,
                task_id,
            )
    except MemoryError, RecursionError:
        raise
    except RiskBudgetExhaustedError:
        raise
    except Exception:
        logger.exception(
            RISK_BUDGET_ENFORCEMENT_CHECK,
            agent_id=agent_id,
            task_id=task_id,
            reason="risk_check_error",
        )

    return RiskCheckResult(risk_units=projected)

record_risk async

record_risk(agent_id, task_id, action_type)

Score and record a risk entry for the given action.

Returns None when risk budgets are disabled, no tracker is configured, or no scorer is available.

Source code in src/synthorg/budget/enforcer.py
async def record_risk(
    self,
    agent_id: str,
    task_id: str,
    action_type: str,
) -> RiskRecord | None:
    """Score and record a risk entry for the given action.

    Returns ``None`` when risk budgets are disabled, no tracker
    is configured, or no scorer is available.
    """
    from synthorg.budget.risk_record import (  # noqa: PLC0415
        RiskRecord as _RiskRecord,
    )

    risk_cfg = self._budget_config.risk_budget
    if (
        not risk_cfg.enabled
        or self._risk_tracker is None
        or self._risk_scorer is None
    ):
        return None

    try:
        score = self._risk_scorer.score(action_type)
        record = _RiskRecord(
            agent_id=agent_id,
            task_id=task_id,
            action_type=action_type,
            risk_score=score,
            risk_units=score.risk_units,
            timestamp=datetime.now(UTC),
        )
        await self._risk_tracker.record(record)
    except MemoryError, RecursionError:
        raise
    except Exception:
        logger.exception(
            RISK_BUDGET_RECORD_FAILED,
            agent_id=agent_id,
            action_type=action_type,
        )
        return None
    logger.info(
        RISK_BUDGET_RECORD_ADDED,
        agent_id=agent_id,
        task_id=task_id,
        action_type=action_type,
        risk_units=score.risk_units,
    )
    return record

Quota

quota

Quota and subscription models for provider cost tracking.

Defines quota windows, subscription configurations, degradation strategies, and quota check result models for providers that operate under subscription plans, local deployments, or pay-as-you-go billing.

QuotaWindow

Bases: StrEnum

Time window for quota enforcement.

QuotaLimit pydantic-model

Bases: BaseModel

A single quota limit for a time window.

Attributes:

Name Type Description
window QuotaWindow

Time window for this limit.

max_requests int

Maximum requests in the window (0 = unlimited).

max_tokens int

Maximum tokens in the window (0 = unlimited).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _at_least_one_limit

window pydantic-field

window

Time window for this limit

max_requests pydantic-field

max_requests = 0

Maximum requests in the window (0 = unlimited)

max_tokens pydantic-field

max_tokens = 0

Maximum tokens in the window (0 = unlimited)

ProviderCostModel

Bases: StrEnum

How a provider charges for usage.

Members

PER_TOKEN: Standard pay-as-you-go; cost computed from cost_per_1k_input/output. SUBSCRIPTION: Monthly flat fee; individual calls are pre-paid. LOCAL: Zero monetary cost; only hardware constraints.

SubscriptionConfig pydantic-model

Bases: BaseModel

Subscription and quota configuration for a provider.

Attributes:

Name Type Description
plan_name NotBlankStr

Name of the subscription plan.

cost_model ProviderCostModel

How the provider charges for usage.

monthly_cost float

Fixed monthly subscription fee in USD (base currency).

quotas tuple[QuotaLimit, ...]

Rate/token/request limits per time window.

hardware_limits str | None

Free-text hardware constraints for local models.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_quotas_unique_windows
  • _validate_cost_model_constraints

plan_name pydantic-field

plan_name = 'pay_as_you_go'

Subscription plan name

cost_model pydantic-field

cost_model = PER_TOKEN

How the provider charges for usage

monthly_cost pydantic-field

monthly_cost = 0.0

Fixed monthly subscription fee in USD (base currency)

quotas pydantic-field

quotas = ()

Rate/token/request limits per time window

hardware_limits pydantic-field

hardware_limits = None

Free-text hardware constraints for local models

DegradationAction

Bases: StrEnum

Action to take when a provider's quota is exhausted.

Members

FALLBACK: Route to a fallback provider. QUEUE: Wait for quota window reset, then retry. ALERT: Raise error and alert user.

DegradationConfig pydantic-model

Bases: BaseModel

Configuration for graceful degradation when quota is exhausted.

Attributes:

Name Type Description
strategy DegradationAction

What to do when quota is exhausted.

fallback_providers tuple[NotBlankStr, ...]

Ordered fallback provider names.

queue_max_wait_seconds int

Max seconds to wait when queueing.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_fallback_providers

strategy pydantic-field

strategy = ALERT

Degradation strategy when quota exhausted

fallback_providers pydantic-field

fallback_providers = ()

Ordered fallback provider names

queue_max_wait_seconds pydantic-field

queue_max_wait_seconds = 300

Max wait seconds when queueing

QuotaSnapshot pydantic-model

Bases: BaseModel

Point-in-time snapshot of quota usage for a provider window.

Attributes:

Name Type Description
provider_name NotBlankStr

Provider this snapshot belongs to.

window QuotaWindow

Time window for this snapshot.

requests_used int

Requests consumed in this window.

requests_limit int

Maximum requests allowed (0 = unlimited).

tokens_used int

Tokens consumed in this window.

tokens_limit int

Maximum tokens allowed (0 = unlimited).

window_resets_at datetime | None

When the current window resets.

captured_at datetime

When this snapshot was captured.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

provider_name pydantic-field

provider_name

Provider name

window pydantic-field

window

Time window

requests_used pydantic-field

requests_used = 0

Requests used

requests_limit pydantic-field

requests_limit = 0

Requests limit (0 = unlimited)

tokens_used pydantic-field

tokens_used = 0

Tokens used

tokens_limit pydantic-field

tokens_limit = 0

Tokens limit (0 = unlimited)

window_resets_at pydantic-field

window_resets_at = None

When the current window resets

captured_at pydantic-field

captured_at

When snapshot was captured

requests_remaining property

requests_remaining

Remaining requests in this window.

Returns None when the limit is not enforced (unlimited). Returns 0 when fully consumed.

tokens_remaining property

tokens_remaining

Remaining tokens in this window.

Returns None when the limit is not enforced (unlimited). Returns 0 when fully consumed.

is_exhausted property

is_exhausted

Whether any enforced limit in this window is exhausted.

QuotaCheckResult pydantic-model

Bases: BaseModel

Result of a pre-flight quota check.

Attributes:

Name Type Description
allowed bool

Whether the request is allowed.

provider_name NotBlankStr

Provider that was checked.

reason str

Human-readable reason (set when denied).

exhausted_windows tuple[QuotaWindow, ...]

Which windows are exhausted (if any).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_denied_has_reason

allowed pydantic-field

allowed

Whether the request is allowed

provider_name pydantic-field

provider_name

Provider checked

reason pydantic-field

reason = ''

Reason (set when denied)

exhausted_windows pydantic-field

exhausted_windows = ()

Exhausted windows

always_allowed_result

always_allowed_result(provider_name)

Build an always-allowed QuotaCheckResult.

Source code in src/synthorg/budget/quota.py
def always_allowed_result(provider_name: NotBlankStr) -> QuotaCheckResult:
    """Build an always-allowed QuotaCheckResult."""
    return QuotaCheckResult(
        allowed=True,
        provider_name=provider_name,
    )

window_start

window_start(window, *, now=None)

Compute the UTC-aware start of the current quota window.

Parameters:

Name Type Description Default
window QuotaWindow

Which time window to compute.

required
now datetime | None

Reference timestamp. Defaults to datetime.now(UTC). Must be timezone-aware; naive datetimes are rejected.

None

Returns:

Type Description
datetime

UTC-aware datetime at the start of the current window.

Raises:

Type Description
ValueError

If now is a naive (timezone-unaware) datetime.

Source code in src/synthorg/budget/quota.py
def window_start(
    window: QuotaWindow,
    *,
    now: datetime | None = None,
) -> datetime:
    """Compute the UTC-aware start of the current quota window.

    Args:
        window: Which time window to compute.
        now: Reference timestamp. Defaults to ``datetime.now(UTC)``.
            Must be timezone-aware; naive datetimes are rejected.

    Returns:
        UTC-aware datetime at the start of the current window.

    Raises:
        ValueError: If *now* is a naive (timezone-unaware) datetime.
    """
    if now is None:
        now = datetime.now(UTC)
    elif now.tzinfo is None:
        msg = "now must be timezone-aware, got naive datetime"
        logger.warning(
            CONFIG_VALIDATION_FAILED,
            model="window_start",
            field="now",
            reason=msg,
        )
        raise ValueError(msg)
    else:
        now = now.astimezone(UTC)

    if window == QuotaWindow.PER_MINUTE:
        return datetime(
            now.year,
            now.month,
            now.day,
            now.hour,
            now.minute,
            tzinfo=UTC,
        )
    if window == QuotaWindow.PER_HOUR:
        return datetime(
            now.year,
            now.month,
            now.day,
            now.hour,
            tzinfo=UTC,
        )
    if window == QuotaWindow.PER_DAY:
        return datetime(
            now.year,
            now.month,
            now.day,
            tzinfo=UTC,
        )
    # PER_MONTH -- first day of the month
    return datetime(now.year, now.month, 1, tzinfo=UTC)

effective_cost_per_1k

effective_cost_per_1k(cost_per_1k_input, cost_per_1k_output, cost_model)

Compute effective cost per 1k tokens based on cost model.

Returns 0.0 for SUBSCRIPTION and LOCAL models (pre-paid / free). Returns cost_per_1k_input + cost_per_1k_output for PER_TOKEN.

Parameters:

Name Type Description Default
cost_per_1k_input float

Cost per 1k input tokens.

required
cost_per_1k_output float

Cost per 1k output tokens.

required
cost_model ProviderCostModel

The provider's cost model.

required

Returns:

Type Description
float

Effective cost per 1k tokens.

Source code in src/synthorg/budget/quota.py
def effective_cost_per_1k(
    cost_per_1k_input: float,
    cost_per_1k_output: float,
    cost_model: ProviderCostModel,
) -> float:
    """Compute effective cost per 1k tokens based on cost model.

    Returns 0.0 for SUBSCRIPTION and LOCAL models (pre-paid / free).
    Returns ``cost_per_1k_input + cost_per_1k_output`` for PER_TOKEN.

    Args:
        cost_per_1k_input: Cost per 1k input tokens.
        cost_per_1k_output: Cost per 1k output tokens.
        cost_model: The provider's cost model.

    Returns:
        Effective cost per 1k tokens.
    """
    if cost_model in (ProviderCostModel.SUBSCRIPTION, ProviderCostModel.LOCAL):
        return 0.0
    return cost_per_1k_input + cost_per_1k_output

quota_tracker

Quota tracking service.

Tracks per-provider request and token usage against configured quota windows. Window-based counters are rotated automatically when a window boundary is crossed.

Concurrency-safe via asyncio.Lock (same pattern as :class:~synthorg.budget.tracker.CostTracker).

QuotaTracker

QuotaTracker(*, subscriptions)

Tracks per-provider quota usage across configured time windows.

Providers without a subscription config are silently ignored (no-op on record, always allowed on check).

Note

check_quota followed by record_usage is subject to a TOCTOU gap: another coroutine could record usage between the check and the record, pushing the counter past the limit. This is by-design for the single-loop asyncio concurrency model -- the gap only exists across await points and is acceptable for quota enforcement (the in-flight budget checker is the true safety net).

Parameters:

Name Type Description Default
subscriptions Mapping[str, SubscriptionConfig]

Mapping of provider name to subscription config.

required
Source code in src/synthorg/budget/quota_tracker.py
def __init__(
    self,
    *,
    subscriptions: Mapping[str, SubscriptionConfig],
) -> None:
    self._subscriptions: dict[str, SubscriptionConfig] = copy.deepcopy(
        dict(subscriptions),
    )
    self._lock = asyncio.Lock()
    self._loop: asyncio.AbstractEventLoop | None = None
    self._usage: dict[str, dict[QuotaWindow, _WindowUsage]] = {}

    # Initialize usage tracking for providers with quotas
    for provider_name, sub_config in self._subscriptions.items():
        if sub_config.quotas:
            self._usage[provider_name] = {}
            for quota in sub_config.quotas:
                ws = window_start(quota.window)
                self._usage[provider_name][quota.window] = _WindowUsage(
                    requests=0,
                    tokens=0,
                    window_start=ws,
                )

    logger.debug(
        QUOTA_TRACKER_CREATED,
        provider_count=len(self._subscriptions),
        tracked_providers=sorted(self._usage),
    )

record_usage async

record_usage(provider_name, *, requests=1, tokens=0)

Record usage against all configured windows for a provider.

Rotates window counters if a window boundary has been crossed. Providers with no subscription config are skipped with a DEBUG log.

Parameters:

Name Type Description Default
provider_name str

Provider to record usage for.

required
requests int

Number of requests to record (must be >= 0).

1
tokens int

Number of tokens to record (must be >= 0).

0

Raises:

Type Description
ValueError

If requests or tokens is negative.

Source code in src/synthorg/budget/quota_tracker.py
async def record_usage(
    self,
    provider_name: str,
    *,
    requests: int = 1,
    tokens: int = 0,
) -> None:
    """Record usage against all configured windows for a provider.

    Rotates window counters if a window boundary has been crossed.
    Providers with no subscription config are skipped with a DEBUG log.

    Args:
        provider_name: Provider to record usage for.
        requests: Number of requests to record (must be >= 0).
        tokens: Number of tokens to record (must be >= 0).

    Raises:
        ValueError: If requests or tokens is negative.
    """
    if requests < 0:
        msg = f"requests must be non-negative, got {requests}"
        logger.warning(
            QUOTA_USAGE_SKIPPED,
            provider=provider_name,
            reason=msg,
        )
        raise ValueError(msg)
    if tokens < 0:
        msg = f"tokens must be non-negative, got {tokens}"
        logger.warning(
            QUOTA_USAGE_SKIPPED,
            provider=provider_name,
            reason=msg,
        )
        raise ValueError(msg)

    if provider_name not in self._usage:
        reason = (
            "no_quotas_configured"
            if provider_name in self._subscriptions
            else "unknown_provider"
        )
        logger.debug(
            QUOTA_USAGE_SKIPPED,
            provider=provider_name,
            reason=reason,
        )
        return

    self._capture_loop()
    async with self._lock:
        now = datetime.now(UTC)
        provider_usage = self._usage[provider_name]

        for window_type in list(provider_usage):
            current = provider_usage[window_type]
            expected_start = window_start(window_type, now=now)

            if expected_start != current.window_start:
                # Window boundary crossed -- rotate
                provider_usage[window_type] = _WindowUsage(
                    requests=requests,
                    tokens=tokens,
                    window_start=expected_start,
                )
                logger.debug(
                    QUOTA_WINDOW_ROTATED,
                    provider=provider_name,
                    window=window_type.value,
                    old_start=str(current.window_start),
                    new_start=str(expected_start),
                )
            else:
                provider_usage[window_type] = _WindowUsage(
                    requests=current.requests + requests,
                    tokens=current.tokens + tokens,
                    window_start=current.window_start,
                )

        logger.debug(
            QUOTA_USAGE_RECORDED,
            provider=provider_name,
            requests=requests,
            tokens=tokens,
        )

check_quota async

check_quota(provider_name, *, estimated_tokens=0)

Pre-flight check: can this provider handle a request?

Providers with no subscription config always return allowed.

Parameters:

Name Type Description Default
provider_name str

Provider to check.

required
estimated_tokens int

Estimated tokens for the request (must be >= 0).

0

Returns:

Type Description
QuotaCheckResult

Check result with allowed status and reason.

Raises:

Type Description
ValueError

If estimated_tokens is negative.

Source code in src/synthorg/budget/quota_tracker.py
async def check_quota(
    self,
    provider_name: str,
    *,
    estimated_tokens: int = 0,
) -> QuotaCheckResult:
    """Pre-flight check: can this provider handle a request?

    Providers with no subscription config always return allowed.

    Args:
        provider_name: Provider to check.
        estimated_tokens: Estimated tokens for the request (must be >= 0).

    Returns:
        Check result with allowed status and reason.

    Raises:
        ValueError: If estimated_tokens is negative.
    """
    if estimated_tokens < 0:
        msg = f"estimated_tokens must be non-negative, got {estimated_tokens}"
        logger.warning(
            QUOTA_CHECK_DENIED,
            provider=provider_name,
            reason=msg,
        )
        raise ValueError(msg)

    if provider_name not in self._usage:
        reason = (
            "no_quotas_configured"
            if provider_name in self._subscriptions
            else "unknown_provider"
        )
        logger.debug(
            QUOTA_CHECK_ALLOWED,
            provider=provider_name,
            reason=reason,
        )
        return QuotaCheckResult(
            allowed=True,
            provider_name=provider_name,
        )

    sub_config = self._subscriptions[provider_name]
    quota_map = {q.window: q for q in sub_config.quotas}

    self._capture_loop()
    async with self._lock:
        now = datetime.now(UTC)
        provider_usage = self._usage[provider_name]
        exhausted: list[QuotaWindow] = []
        reasons: list[str] = []

        for window_type, usage in provider_usage.items():
            expected_start = window_start(window_type, now=now)

            # If window has rotated, counters would be zero
            if expected_start != usage.window_start:
                continue

            quota = quota_map.get(window_type)
            if quota is None:
                continue

            if _is_window_exhausted(
                usage,
                quota,
                estimated_tokens,
            ):
                exhausted.append(window_type)
                reasons.append(
                    _build_exhaustion_reason(
                        provider_name,
                        window_type,
                        usage,
                        quota,
                        estimated_tokens,
                    ),
                )

    if exhausted:
        result = QuotaCheckResult(
            allowed=False,
            provider_name=provider_name,
            reason="; ".join(reasons),
            exhausted_windows=tuple(exhausted),
        )
        logger.info(
            QUOTA_CHECK_DENIED,
            provider=provider_name,
            exhausted_windows=[w.value for w in exhausted],
            reason=result.reason,
        )
        return result

    logger.debug(
        QUOTA_CHECK_ALLOWED,
        provider=provider_name,
    )
    return QuotaCheckResult(
        allowed=True,
        provider_name=provider_name,
    )

get_snapshot async

get_snapshot(provider_name, window=None)

Get current usage snapshots for a provider.

Parameters:

Name Type Description Default
provider_name str

Provider to query.

required
window QuotaWindow | None

Optional specific window to query. If None, returns all windows.

None

Returns:

Type Description
tuple[QuotaSnapshot, ...]

Tuple of quota snapshots.

Source code in src/synthorg/budget/quota_tracker.py
async def get_snapshot(
    self,
    provider_name: str,
    window: QuotaWindow | None = None,
) -> tuple[QuotaSnapshot, ...]:
    """Get current usage snapshots for a provider.

    Args:
        provider_name: Provider to query.
        window: Optional specific window to query. If ``None``,
            returns all windows.

    Returns:
        Tuple of quota snapshots.
    """
    if provider_name not in self._usage:
        reason = (
            "no_quotas_configured"
            if provider_name in self._subscriptions
            else "unknown_provider"
        )
        logger.debug(
            QUOTA_SNAPSHOT_QUERIED,
            provider=provider_name,
            snapshot_count=0,
            reason=reason,
        )
        return ()

    sub_config = self._subscriptions[provider_name]
    quota_map = {q.window: q for q in sub_config.quotas}

    self._capture_loop()
    async with self._lock:
        now = datetime.now(UTC)
        snapshots: list[QuotaSnapshot] = []
        provider_usage = self._usage[provider_name]

        for window_type, usage in provider_usage.items():
            if window is not None and window_type != window:
                continue

            quota = quota_map.get(window_type)
            if quota is None:
                continue

            expected_start = window_start(window_type, now=now)
            # If window has rotated, show zero usage
            if expected_start != usage.window_start:
                req_used = 0
                tok_used = 0
            else:
                req_used = usage.requests
                tok_used = usage.tokens

            snapshots.append(
                QuotaSnapshot(
                    provider_name=provider_name,
                    window=window_type,
                    requests_used=req_used,
                    requests_limit=quota.max_requests,
                    tokens_used=tok_used,
                    tokens_limit=quota.max_tokens,
                    window_resets_at=_window_end(
                        window_type,
                        expected_start,
                    ),
                    captured_at=now,
                ),
            )

    logger.debug(
        QUOTA_SNAPSHOT_QUERIED,
        provider=provider_name,
        snapshot_count=len(snapshots),
    )
    return tuple(snapshots)

get_all_snapshots async

get_all_snapshots()

Get usage snapshots for all tracked providers.

Note

Snapshots are collected per-provider with separate lock acquisitions, so cross-provider consistency is not guaranteed under concurrent writes.

Returns:

Type Description
dict[str, tuple[QuotaSnapshot, ...]]

Dict mapping provider name to tuple of snapshots.

Source code in src/synthorg/budget/quota_tracker.py
async def get_all_snapshots(
    self,
) -> dict[str, tuple[QuotaSnapshot, ...]]:
    """Get usage snapshots for all tracked providers.

    Note:
        Snapshots are collected per-provider with separate lock
        acquisitions, so cross-provider consistency is not guaranteed
        under concurrent writes.

    Returns:
        Dict mapping provider name to tuple of snapshots.
    """
    result: dict[str, tuple[QuotaSnapshot, ...]] = {}
    for provider_name in self._usage:
        result[provider_name] = await self.get_snapshot(provider_name)
    return result

peek_quota_available

peek_quota_available()

Synchronous snapshot of per-provider quota availability.

Reads cached counters without acquiring the lock. This is safe when called synchronously from the asyncio event loop thread (no concurrent mutations during synchronous execution) and acceptable for heuristic selection decisions where TOCTOU is tolerated. Do not call from a thread-pool executor or from outside the event loop -- that would race with record_usage.

Providers without configured quotas are excluded from the result (they are always allowed and do not need selection guidance).

Returns:

Type Description
dict[str, bool]

Dict mapping provider name to availability status.

Raises:

Type Description
RuntimeError

If called from a different event loop than the one used by this tracker's async methods, or from outside an event loop when async methods have already been called (indicating a thread-pool executor context).

Source code in src/synthorg/budget/quota_tracker.py
def peek_quota_available(self) -> dict[str, bool]:
    """Synchronous snapshot of per-provider quota availability.

    Reads cached counters **without acquiring the lock**.  This is
    safe when called synchronously from the ``asyncio`` event loop
    thread (no concurrent mutations during synchronous execution)
    and acceptable for heuristic selection decisions where TOCTOU
    is tolerated.  Do **not** call from a thread-pool executor or
    from outside the event loop -- that would race with
    ``record_usage``.

    Providers without configured quotas are excluded from the
    result (they are always allowed and do not need selection
    guidance).

    Returns:
        Dict mapping provider name to availability status.

    Raises:
        RuntimeError: If called from a different event loop than
            the one used by this tracker's async methods, or from
            outside an event loop when async methods have already
            been called (indicating a thread-pool executor context).
    """
    self._assert_loop_safe()
    now = datetime.now(UTC)
    result: dict[str, bool] = {}

    for provider_name, provider_usage in self._usage.items():
        sub_config = self._subscriptions.get(provider_name)
        if sub_config is None:
            continue
        quota_map = {q.window: q for q in sub_config.quotas}
        exhausted = False

        for window_type, usage in provider_usage.items():
            expected_start = window_start(window_type, now=now)
            if expected_start != usage.window_start:
                continue  # Window rotated -- treat as zero usage
            quota = quota_map.get(window_type)
            if quota is None:
                continue
            # Use estimated_tokens=1 so providers at exactly the
            # token cap are treated as exhausted (zero headroom).
            if _is_window_exhausted(usage, quota, estimated_tokens=1):
                exhausted = True
                break

        result[provider_name] = not exhausted

    return result

Billing

billing

Billing period computation utilities.

Pure functions for determining billing period boundaries based on a configurable reset day. Used by :class:~synthorg.budget.enforcer.BudgetEnforcer to scope cost queries to the current billing cycle.

billing_period_start

billing_period_start(reset_day, *, now=None)

Compute the UTC-aware start of the current billing period.

If now.day >= reset_day, returns current month's reset_day at 00:00 UTC. Otherwise, returns previous month's reset_day at 00:00 UTC.

Parameters:

Name Type Description Default
reset_day int

Day of month when the billing period resets (1-28).

required
now datetime | None

Reference timestamp. Defaults to datetime.now(UTC).

None

Returns:

Type Description
datetime

UTC-aware datetime at midnight on the billing period start day.

Raises:

Type Description
ValueError

If reset_day is not in [1, 28].

Source code in src/synthorg/budget/billing.py
def billing_period_start(
    reset_day: int,
    *,
    now: datetime | None = None,
) -> datetime:
    """Compute the UTC-aware start of the current billing period.

    If ``now.day >= reset_day``, returns current month's ``reset_day``
    at 00:00 UTC.  Otherwise, returns previous month's ``reset_day``
    at 00:00 UTC.

    Args:
        reset_day: Day of month when the billing period resets (1-28).
        now: Reference timestamp.  Defaults to ``datetime.now(UTC)``.

    Returns:
        UTC-aware datetime at midnight on the billing period start day.

    Raises:
        ValueError: If ``reset_day`` is not in ``[1, 28]``.
    """
    if not 1 <= reset_day <= 28:  # noqa: PLR2004
        msg = f"reset_day must be 1-28, got {reset_day}"
        raise ValueError(msg)

    if now is None:
        now = datetime.now(UTC)

    if now.day >= reset_day:
        return datetime(now.year, now.month, reset_day, tzinfo=UTC)

    # Roll back to previous month
    if now.month == 1:
        return datetime(now.year - 1, 12, reset_day, tzinfo=UTC)
    return datetime(now.year, now.month - 1, reset_day, tzinfo=UTC)

daily_period_start

daily_period_start(*, now=None)

Compute the UTC-aware start of today (midnight UTC).

Parameters:

Name Type Description Default
now datetime | None

Reference timestamp. Defaults to datetime.now(UTC).

None

Returns:

Type Description
datetime

UTC-aware datetime at midnight of the current day.

Source code in src/synthorg/budget/billing.py
def daily_period_start(*, now: datetime | None = None) -> datetime:
    """Compute the UTC-aware start of today (midnight UTC).

    Args:
        now: Reference timestamp.  Defaults to ``datetime.now(UTC)``.

    Returns:
        UTC-aware datetime at midnight of the current day.
    """
    if now is None:
        now = datetime.now(UTC)
    return datetime(now.year, now.month, now.day, tzinfo=UTC)

Optimizer

optimizer

CFO cost optimization service.

Provides spending anomaly detection, cost efficiency analysis, model downgrade recommendations, routing optimization suggestions, and operation approval decisions. Composes :class:~synthorg.budget.tracker.CostTracker and :class:~synthorg.budget.config.BudgetConfig for read-only analytical queries -- the advisory complement to :class:~synthorg.budget.enforcer.BudgetEnforcer.

Service layer backing the CFO role (see Operations design page).

CostOptimizer

CostOptimizer(*, cost_tracker, budget_config, config=None, model_resolver=None)

CFO analytical service for cost optimization.

Composes CostTracker and BudgetConfig for read-only analysis: anomaly detection, efficiency analysis, downgrade recommendations, routing optimization suggestions, and operation approval evaluation.

Parameters:

Name Type Description Default
cost_tracker CostTracker

Cost tracking service for querying spend.

required
budget_config BudgetConfig

Budget configuration for limits and thresholds.

required
config CostOptimizerConfig | None

Optimizer-specific configuration. Defaults to CostOptimizerConfig() when None.

None
model_resolver ModelResolver | None

Optional model resolver for downgrade and routing optimization recommendations.

None
Source code in src/synthorg/budget/optimizer.py
def __init__(
    self,
    *,
    cost_tracker: CostTracker,
    budget_config: BudgetConfig,
    config: CostOptimizerConfig | None = None,
    model_resolver: ModelResolver | None = None,
) -> None:
    self._cost_tracker = cost_tracker
    self._budget_config = budget_config
    self._config = config or CostOptimizerConfig()
    self._model_resolver = model_resolver
    logger.debug(
        CFO_OPTIMIZER_CREATED,
        has_model_resolver=model_resolver is not None,
        anomaly_sigma=self._config.anomaly_sigma_threshold,
    )

detect_anomalies async

detect_anomalies(*, start, end, window_count=5)

Detect spending anomalies in the given period.

Divides [start, end) into window_count equal windows, groups records by agent, and flags agents whose last-window spending deviates significantly from their historical mean.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required
window_count int

Number of time windows to divide the period into. Must be >= 2 and <= 1000.

5

Returns:

Type Description
AnomalyDetectionResult

Anomaly detection result with any detected anomalies.

Raises:

Type Description
ValueError

If start >= end, window_count < 2, or window_count > 1000.

Source code in src/synthorg/budget/optimizer.py
async def detect_anomalies(
    self,
    *,
    start: datetime,
    end: datetime,
    window_count: int = 5,
) -> AnomalyDetectionResult:
    """Detect spending anomalies in the given period.

    Divides ``[start, end)`` into ``window_count`` equal windows,
    groups records by agent, and flags agents whose last-window
    spending deviates significantly from their historical mean.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.
        window_count: Number of time windows to divide the period
            into.  Must be >= 2 and <= 1000.

    Returns:
        Anomaly detection result with any detected anomalies.

    Raises:
        ValueError: If ``start >= end``, ``window_count < 2``, or
            ``window_count > 1000``.
    """
    if start >= end:
        logger.warning(
            CFO_ANOMALY_SCAN_COMPLETE,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)
    if window_count < 2:  # noqa: PLR2004
        logger.warning(
            CFO_ANOMALY_SCAN_COMPLETE,
            error="window_count_below_minimum",
            window_count=window_count,
        )
        msg = f"window_count must be >= 2, got {window_count}"
        raise ValueError(msg)
    if window_count > _MAX_WINDOW_COUNT:
        logger.warning(
            CFO_ANOMALY_SCAN_COMPLETE,
            error="window_count_above_maximum",
            window_count=window_count,
        )
        msg = f"window_count must be <= {_MAX_WINDOW_COUNT}, got {window_count}"
        raise ValueError(msg)

    now = datetime.now(UTC)
    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )

    total_duration = end - start
    window_duration = total_duration / window_count
    window_starts = tuple(start + window_duration * i for i in range(window_count))

    # Pre-group records by agent for O(N+M) complexity (#8)
    by_agent = _group_records_by_agent(records)
    agent_ids = sorted(by_agent)
    anomalies: list[SpendingAnomaly] = []

    for agent_id in agent_ids:
        window_costs = _compute_window_costs(
            by_agent[agent_id],
            window_starts,
            window_duration,
        )
        anomaly = _detect_spike_anomaly(
            agent_id,
            window_costs,
            now,
            window_starts,
            window_duration,
            self._config,
            currency=self._budget_config.currency,
        )
        if anomaly is not None:
            logger.warning(
                CFO_ANOMALY_DETECTED,
                agent_id=agent_id,
                anomaly_type=anomaly.anomaly_type.value,
                severity=anomaly.severity.value,
                deviation_factor=anomaly.deviation_factor,
            )
            anomalies.append(anomaly)

    result = AnomalyDetectionResult(
        anomalies=tuple(anomalies),
        scan_period_start=start,
        scan_period_end=end,
        agents_scanned=len(agent_ids),
        scan_timestamp=now,
    )

    logger.info(
        CFO_ANOMALY_SCAN_COMPLETE,
        anomaly_count=len(anomalies),
        agents_scanned=len(agent_ids),
    )

    return result

analyze_efficiency async

analyze_efficiency(*, start, end)

Analyze cost efficiency of all agents in the period.

Computes cost-per-1k-tokens for each agent and rates them relative to the global average.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required

Returns:

Type Description
EfficiencyAnalysis

Efficiency analysis with per-agent ratings.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/optimizer.py
async def analyze_efficiency(
    self,
    *,
    start: datetime,
    end: datetime,
) -> EfficiencyAnalysis:
    """Analyze cost efficiency of all agents in the period.

    Computes cost-per-1k-tokens for each agent and rates them
    relative to the global average.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.

    Returns:
        Efficiency analysis with per-agent ratings.

    Raises:
        ValueError: If ``start >= end``.
    """
    if start >= end:
        logger.warning(
            CFO_EFFICIENCY_ANALYSIS_COMPLETE,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)

    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )

    result = _build_efficiency_from_records(
        records,
        start=start,
        end=end,
        threshold_factor=self._config.inefficiency_threshold_factor,
    )

    logger.info(
        CFO_EFFICIENCY_ANALYSIS_COMPLETE,
        agent_count=len(result.agents),
        inefficient_count=result.inefficient_agent_count,
        global_avg_cost_per_1k=result.global_avg_cost_per_1k,
    )

    return result

recommend_downgrades async

recommend_downgrades(*, start, end)

Recommend model downgrades for inefficient agents.

Runs efficiency analysis and uses the model resolver and downgrade map to find cheaper alternatives.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required

Returns:

Type Description
DowngradeAnalysis

Downgrade analysis with recommendations. Empty when no

DowngradeAnalysis

model_resolver is configured.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/optimizer.py
async def recommend_downgrades(
    self,
    *,
    start: datetime,
    end: datetime,
) -> DowngradeAnalysis:
    """Recommend model downgrades for inefficient agents.

    Runs efficiency analysis and uses the model resolver and
    downgrade map to find cheaper alternatives.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.

    Returns:
        Downgrade analysis with recommendations. Empty when no
        model_resolver is configured.

    Raises:
        ValueError: If ``start >= end``.
    """
    if start >= end:
        logger.warning(
            CFO_DOWNGRADE_RECOMMENDED,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)

    if self._model_resolver is None:
        logger.warning(
            CFO_RESOLVER_MISSING,
            reason="no_model_resolver_configured",
        )
        budget_pressure = await self._compute_budget_pressure()
        return DowngradeAnalysis(
            recommendations=(),
            budget_pressure_percent=budget_pressure,
        )

    async with asyncio.TaskGroup() as tg:
        records_task = tg.create_task(
            self._cost_tracker.get_records(start=start, end=end),
        )
        pressure_task = tg.create_task(self._compute_budget_pressure())

    records = records_task.result()
    budget_pressure = pressure_task.result()

    efficiency = _build_efficiency_from_records(
        records,
        start=start,
        end=end,
        threshold_factor=self._config.inefficiency_threshold_factor,
    )

    logger.info(
        CFO_EFFICIENCY_ANALYSIS_COMPLETE,
        agent_count=len(efficiency.agents),
        inefficient_count=efficiency.inefficient_agent_count,
        global_avg_cost_per_1k=efficiency.global_avg_cost_per_1k,
    )

    by_agent = _group_records_by_agent(records)
    recommendations = self._build_recommendations(
        efficiency=efficiency,
        by_agent=by_agent,
    )

    return DowngradeAnalysis(
        recommendations=tuple(recommendations),
        budget_pressure_percent=budget_pressure,
    )

suggest_routing_optimizations async

suggest_routing_optimizations(*, start, end)

Suggest routing optimizations based on actual usage patterns.

Analyzes each agent's most-used model and suggests cheaper alternatives available through the model resolver, comparing by cost and context window size.

Unlike recommend_downgrades which only targets INEFFICIENT agents, this method analyzes all agents and suggests cheaper alternatives regardless of efficiency rating -- any agent that could use a cheaper model is a candidate.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required

Returns:

Type Description
RoutingOptimizationAnalysis

Routing optimization analysis with per-agent suggestions.

RoutingOptimizationAnalysis

Empty when no model_resolver is configured.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/optimizer.py
async def suggest_routing_optimizations(
    self,
    *,
    start: datetime,
    end: datetime,
) -> RoutingOptimizationAnalysis:
    """Suggest routing optimizations based on actual usage patterns.

    Analyzes each agent's most-used model and suggests cheaper
    alternatives available through the model resolver, comparing by
    cost and context window size.

    Unlike ``recommend_downgrades`` which only targets INEFFICIENT
    agents, this method analyzes all agents and suggests cheaper
    alternatives regardless of efficiency rating -- any agent that
    could use a cheaper model is a candidate.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.

    Returns:
        Routing optimization analysis with per-agent suggestions.
        Empty when no model_resolver is configured.

    Raises:
        ValueError: If ``start >= end``.
    """
    if start >= end:
        logger.warning(
            CFO_ROUTING_OPTIMIZATION_COMPLETE,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)

    if self._model_resolver is None:
        logger.warning(
            CFO_RESOLVER_MISSING,
            reason="no_model_resolver_configured",
        )
        return RoutingOptimizationAnalysis(
            suggestions=(),
            analysis_period_start=start,
            analysis_period_end=end,
            agents_analyzed=0,
        )

    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )

    by_agent = _group_records_by_agent(records)
    all_models = self._model_resolver.all_models_sorted_by_cost()
    suggestions = self._find_routing_suggestions(by_agent, all_models)

    result = RoutingOptimizationAnalysis(
        suggestions=tuple(suggestions),
        analysis_period_start=start,
        analysis_period_end=end,
        agents_analyzed=len(by_agent),
    )

    logger.info(
        CFO_ROUTING_OPTIMIZATION_COMPLETE,
        suggestion_count=len(suggestions),
        agents_analyzed=len(by_agent),
        total_savings_per_1k=result.total_estimated_savings_per_1k,
    )

    return result

evaluate_operation async

evaluate_operation(*, agent_id, estimated_cost_usd, now=None)

Evaluate whether an operation should proceed.

Evaluates three criteria in order:

  1. Rejects negative estimated_cost_usd immediately.
  2. Denies if the projected alert level (after adding the estimated cost) meets or exceeds the auto-deny threshold.
  3. Denies if the projected cost would exceed the hard-stop limit.
  4. Approves with optional warning conditions for high-cost operations or elevated alert levels.

When total_monthly <= 0 budget enforcement is disabled and the operation is always approved with no conditions.

Parameters:

Name Type Description Default
agent_id str

Agent requesting the operation.

required
estimated_cost_usd float

Estimated cost of the operation. Must be >= 0.

required
now datetime | None

Reference timestamp for billing period computation. Defaults to datetime.now(UTC).

None

Returns:

Type Description
ApprovalDecision

Approval decision with reasoning.

Raises:

Type Description
ValueError

If estimated_cost_usd is negative.

Source code in src/synthorg/budget/optimizer.py
async def evaluate_operation(
    self,
    *,
    agent_id: str,
    estimated_cost_usd: float,
    now: datetime | None = None,
) -> ApprovalDecision:
    """Evaluate whether an operation should proceed.

    Evaluates three criteria in order:

    1. Rejects negative ``estimated_cost_usd`` immediately.
    2. Denies if the *projected* alert level (after adding the
       estimated cost) meets or exceeds the auto-deny threshold.
    3. Denies if the projected cost would exceed the hard-stop
       limit.
    4. Approves with optional warning conditions for high-cost
       operations or elevated alert levels.

    When ``total_monthly <= 0`` budget enforcement is disabled and
    the operation is always approved with no conditions.

    Args:
        agent_id: Agent requesting the operation.
        estimated_cost_usd: Estimated cost of the operation.  Must
            be >= 0.
        now: Reference timestamp for billing period computation.
            Defaults to ``datetime.now(UTC)``.

    Returns:
        Approval decision with reasoning.

    Raises:
        ValueError: If ``estimated_cost_usd`` is negative.
    """
    if estimated_cost_usd < 0:
        logger.warning(
            CFO_OPERATION_DENIED,
            agent_id=agent_id,
            estimated_cost=estimated_cost_usd,
            reason="negative_estimated_cost",
        )
        msg = f"estimated_cost_usd must be >= 0, got {estimated_cost_usd}"
        raise ValueError(msg)

    cfg = self._budget_config

    if cfg.total_monthly <= 0:
        return ApprovalDecision(
            approved=True,
            reason="Budget enforcement disabled (no monthly budget)",
            budget_remaining_usd=0.0,
            budget_used_percent=0.0,
            alert_level=BudgetAlertLevel.NORMAL,
            conditions=(),
        )

    period_start = billing_period_start(cfg.reset_day, now=now)
    monthly_cost = await self._cost_tracker.get_total_cost(
        start=period_start,
    )
    remaining = round(
        cfg.total_monthly - monthly_cost,
        BUDGET_ROUNDING_PRECISION,
    )
    used_pct = round(
        monthly_cost / cfg.total_monthly * 100,
        BUDGET_ROUNDING_PRECISION,
    )
    alert_level = _compute_alert_level(used_pct, cfg)

    # Use projected alert level (after cost) for auto-deny check (#11)
    projected_cost = round(
        monthly_cost + estimated_cost_usd,
        BUDGET_ROUNDING_PRECISION,
    )
    projected_pct = round(
        projected_cost / cfg.total_monthly * 100,
        BUDGET_ROUNDING_PRECISION,
    )
    projected_alert = _compute_alert_level(projected_pct, cfg)

    denial = self._check_denial(
        agent_id=agent_id,
        estimated_cost_usd=estimated_cost_usd,
        remaining=remaining,
        used_pct=used_pct,
        alert_level=alert_level,
        projected_cost=projected_cost,
        projected_alert=projected_alert,
    )
    if denial is not None:
        return denial

    conditions = self._build_approval_conditions(
        estimated_cost_usd=estimated_cost_usd,
        projected_alert=projected_alert,
        projected_pct=projected_pct,
    )

    decision = ApprovalDecision(
        approved=True,
        reason="Approved",
        budget_remaining_usd=remaining,
        budget_used_percent=used_pct,
        alert_level=alert_level,
        conditions=conditions,
    )

    logger.info(
        CFO_APPROVAL_EVALUATED,
        agent_id=agent_id,
        approved=True,
        estimated_cost=estimated_cost_usd,
        alert_level=alert_level.value,
        conditions_count=len(conditions),
    )

    return decision

optimizer_models

CFO / CostOptimizer domain models.

Frozen Pydantic models for anomaly detection, cost efficiency analysis, downgrade recommendations, and approval decisions. Used by :class:~synthorg.budget.optimizer.CostOptimizer and :class:~synthorg.budget.reports.ReportGenerator.

AnomalyType

Bases: StrEnum

Type of spending anomaly detected.

SUSTAINED_HIGH and RATE_INCREASE are reserved for future detection algorithms; only SPIKE is currently produced.

AnomalySeverity

Bases: StrEnum

Severity of a detected spending anomaly.

EfficiencyRating

Bases: StrEnum

Cost efficiency rating for an agent.

SpendingAnomaly pydantic-model

Bases: BaseModel

A detected spending anomaly for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent exhibiting the anomaly.

anomaly_type AnomalyType

Classification of the anomaly.

severity AnomalySeverity

Severity level of the anomaly.

description NotBlankStr

Human-readable explanation.

current_value float

Spending in the most recent window.

baseline_value float

Mean spending across historical windows.

deviation_factor float

How many standard deviations above baseline. Set to 0.0 when the baseline is zero (no historical spending).

detected_at datetime

Timestamp when the anomaly was detected.

period_start datetime

Start of the window that triggered the anomaly.

period_end datetime

End of the window that triggered the anomaly.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_period_ordering

agent_id pydantic-field

agent_id

Agent identifier

anomaly_type pydantic-field

anomaly_type

Anomaly classification

severity pydantic-field

severity

Severity level

description pydantic-field

description

Human-readable explanation

current_value pydantic-field

current_value

Spending in the most recent window

baseline_value pydantic-field

baseline_value

Mean spending across historical windows

deviation_factor pydantic-field

deviation_factor

Standard deviations above baseline

detected_at pydantic-field

detected_at

When the anomaly was detected

period_start pydantic-field

period_start

Anomalous window start

period_end pydantic-field

period_end

Anomalous window end

AnomalyDetectionResult pydantic-model

Bases: BaseModel

Result of an anomaly detection scan.

Attributes:

Name Type Description
anomalies tuple[SpendingAnomaly, ...]

Detected anomalies (may be empty).

scan_period_start datetime

Start of the scanned period.

scan_period_end datetime

End of the scanned period.

agents_scanned int

Number of unique agents in the data.

scan_timestamp datetime

When the scan was performed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_period_ordering

anomalies pydantic-field

anomalies = ()

Detected anomalies

scan_period_start pydantic-field

scan_period_start

Scanned period start

scan_period_end pydantic-field

scan_period_end

Scanned period end

agents_scanned pydantic-field

agents_scanned

Unique agents in data

scan_timestamp pydantic-field

scan_timestamp

When the scan ran

AgentEfficiency pydantic-model

Bases: BaseModel

Cost efficiency metrics for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

total_cost_usd float

Total cost in the analysis period.

total_tokens int

Total tokens consumed (input + output).

cost_per_1k_tokens float

Cost per 1000 tokens (computed).

record_count int

Number of cost records.

efficiency_rating EfficiencyRating

Efficiency classification.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent identifier

total_cost_usd pydantic-field

total_cost_usd

Total cost in the analysis period

total_tokens pydantic-field

total_tokens

Total tokens consumed

record_count pydantic-field

record_count

Number of cost records

efficiency_rating pydantic-field

efficiency_rating

Efficiency classification

cost_per_1k_tokens property

cost_per_1k_tokens

Cost per 1000 tokens, derived from total_cost and total_tokens.

EfficiencyAnalysis pydantic-model

Bases: BaseModel

Result of a cost efficiency analysis.

Attributes:

Name Type Description
agents tuple[AgentEfficiency, ...]

Per-agent efficiency metrics (sorted by cost_per_1k desc).

global_avg_cost_per_1k float

Global average cost per 1000 tokens.

analysis_period_start datetime

Start of the analysis period.

analysis_period_end datetime

End of the analysis period.

inefficient_agent_count int

Number of agents rated INEFFICIENT (computed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_period_ordering
  • _validate_agents_sort_order

agents pydantic-field

agents = ()

Per-agent efficiency metrics

global_avg_cost_per_1k pydantic-field

global_avg_cost_per_1k

Global average cost per 1000 tokens

analysis_period_start pydantic-field

analysis_period_start

Analysis period start

analysis_period_end pydantic-field

analysis_period_end

Analysis period end

inefficient_agent_count property

inefficient_agent_count

Number of agents rated INEFFICIENT.

DowngradeRecommendation pydantic-model

Bases: BaseModel

A model downgrade recommendation for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

current_model NotBlankStr

Currently used model identifier.

recommended_model NotBlankStr

Recommended cheaper model.

estimated_savings_per_1k float

Estimated savings per 1000 tokens.

reason NotBlankStr

Human-readable explanation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_different_models

agent_id pydantic-field

agent_id

Agent identifier

current_model pydantic-field

current_model

Current model identifier

recommended_model pydantic-field

recommended_model

Recommended cheaper model

estimated_savings_per_1k pydantic-field

estimated_savings_per_1k

Estimated savings per 1000 tokens

reason pydantic-field

reason

Human-readable explanation

DowngradeAnalysis pydantic-model

Bases: BaseModel

Result of a downgrade recommendation analysis.

Attributes:

Name Type Description
recommendations tuple[DowngradeRecommendation, ...]

Per-agent downgrade recommendations.

total_estimated_savings_per_1k float

Aggregate estimated savings per 1000 tokens across all recommendations (computed).

budget_pressure_percent float

Current budget utilization percentage.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

recommendations pydantic-field

recommendations = ()

Per-agent downgrade recommendations

budget_pressure_percent pydantic-field

budget_pressure_percent

Current budget utilization percentage

total_estimated_savings_per_1k property

total_estimated_savings_per_1k

Aggregate estimated savings per 1000 tokens.

ApprovalDecision pydantic-model

Bases: BaseModel

Result of evaluating whether an operation should proceed.

Attributes:

Name Type Description
approved bool

Whether the operation is approved.

reason NotBlankStr

Explanation for the decision.

budget_remaining_usd float

Remaining budget in USD (base currency) (may be negative if over budget).

budget_used_percent float

Percentage of budget consumed.

alert_level BudgetAlertLevel

Current budget alert level.

conditions tuple[NotBlankStr, ...]

Any conditions attached to approval.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

approved pydantic-field

approved

Whether the operation is approved

reason pydantic-field

reason

Explanation for the decision

budget_remaining_usd pydantic-field

budget_remaining_usd

Remaining budget in USD (base currency) (negative when over budget)

budget_used_percent pydantic-field

budget_used_percent

Percentage of budget consumed

alert_level pydantic-field

alert_level

Current budget alert level

conditions pydantic-field

conditions = ()

Conditions attached to approval

CostOptimizerConfig pydantic-model

Bases: BaseModel

Configuration for the CostOptimizer service.

Attributes:

Name Type Description
anomaly_sigma_threshold float

Number of standard deviations above mean to flag as anomalous.

anomaly_spike_factor float

Multiplier above mean to flag as spike (independent of stddev).

inefficiency_threshold_factor float

Factor above global average cost_per_1k to flag as inefficient.

approval_auto_deny_alert_level BudgetAlertLevel

Alert level at or above which operations are automatically denied.

approval_warn_threshold_usd float

Cost threshold for adding a warning condition to approval. When set to 0.0, every approved operation receives a "High-cost operation" condition (effectively "always warn").

min_anomaly_windows int

Minimum number of historical windows required before anomaly detection activates.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

anomaly_sigma_threshold pydantic-field

anomaly_sigma_threshold = 2.0

Sigma threshold for anomaly detection

anomaly_spike_factor pydantic-field

anomaly_spike_factor = 3.0

Spike factor multiplier above mean

inefficiency_threshold_factor pydantic-field

inefficiency_threshold_factor = 1.5

Factor above global avg for inefficiency

approval_auto_deny_alert_level pydantic-field

approval_auto_deny_alert_level = HARD_STOP

Alert level triggering auto-deny

approval_warn_threshold_usd pydantic-field

approval_warn_threshold_usd = 1.0

Cost threshold for warning condition

min_anomaly_windows pydantic-field

min_anomaly_windows = 3

Minimum historical windows for anomaly detection

RoutingSuggestion pydantic-model

Bases: BaseModel

A routing optimization suggestion for a single agent.

Suggests switching an agent's most-used model to a cheaper alternative that provides sufficient context window size.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

current_model NotBlankStr

Currently most-used model identifier.

suggested_model NotBlankStr

Suggested cheaper alternative.

current_cost_per_1k float

Current model's total cost per 1k tokens.

suggested_cost_per_1k float

Suggested model's total cost per 1k tokens.

estimated_savings_per_1k float

Estimated savings per 1k tokens (computed).

reason NotBlankStr

Human-readable explanation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_different_models
  • _validate_savings_positive

agent_id pydantic-field

agent_id

Agent identifier

current_model pydantic-field

current_model

Current most-used model

suggested_model pydantic-field

suggested_model

Suggested cheaper model

current_cost_per_1k pydantic-field

current_cost_per_1k

Current model total cost per 1k tokens

suggested_cost_per_1k pydantic-field

suggested_cost_per_1k

Suggested model total cost per 1k tokens

reason pydantic-field

reason

Human-readable explanation

estimated_savings_per_1k property

estimated_savings_per_1k

Estimated savings per 1k tokens.

RoutingOptimizationAnalysis pydantic-model

Bases: BaseModel

Result of a routing optimization analysis.

Attributes:

Name Type Description
suggestions tuple[RoutingSuggestion, ...]

Per-agent routing optimization suggestions.

total_estimated_savings_per_1k float

Aggregate estimated savings per 1k tokens across all suggestions (computed).

analysis_period_start datetime

Start of the analysis period.

analysis_period_end datetime

End of the analysis period.

agents_analyzed int

Number of agents analyzed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_period_ordering

suggestions pydantic-field

suggestions = ()

Per-agent routing optimization suggestions

analysis_period_start pydantic-field

analysis_period_start

Analysis period start

analysis_period_end pydantic-field

analysis_period_end

Analysis period end

agents_analyzed pydantic-field

agents_analyzed

Number of agents analyzed

total_estimated_savings_per_1k property

total_estimated_savings_per_1k

Aggregate estimated savings per 1k tokens.

Reports

reports

CFO spending report generation.

Provides multi-dimensional spending reports with breakdowns by task, provider, model, and time-period comparison. Composes :class:~synthorg.budget.tracker.CostTracker and :class:~synthorg.budget.config.BudgetConfig.

Service layer backing CFO reporting (see Operations design page).

TaskSpending pydantic-model

Bases: BaseModel

Spending aggregation for a single task.

Attributes:

Name Type Description
task_id NotBlankStr

Task identifier.

total_cost_usd float

Total cost for the task.

total_tokens int

Total tokens consumed (input + output).

record_count int

Number of cost records.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

task_id pydantic-field

task_id

Task identifier

total_cost_usd pydantic-field

total_cost_usd

Total cost

total_tokens pydantic-field

total_tokens

Total tokens consumed

record_count pydantic-field

record_count

Number of cost records

ProviderDistribution pydantic-model

Bases: BaseModel

Cost distribution for a single provider.

Attributes:

Name Type Description
provider NotBlankStr

Provider name.

total_cost_usd float

Total cost for the provider.

record_count int

Number of cost records.

percentage_of_total float

Percentage of total spending.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

provider pydantic-field

provider

Provider name

total_cost_usd pydantic-field

total_cost_usd

Total cost

record_count pydantic-field

record_count

Number of cost records

percentage_of_total pydantic-field

percentage_of_total

Percentage of total spending

ModelDistribution pydantic-model

Bases: BaseModel

Cost distribution for a single model.

Attributes:

Name Type Description
model NotBlankStr

Model identifier.

provider NotBlankStr

Provider name.

total_cost_usd float

Total cost for the model.

record_count int

Number of cost records.

percentage_of_total float

Percentage of total spending.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

model pydantic-field

model

Model identifier

provider pydantic-field

provider

Provider name

total_cost_usd pydantic-field

total_cost_usd

Total cost

record_count pydantic-field

record_count

Number of cost records

percentage_of_total pydantic-field

percentage_of_total

Percentage of total spending

PeriodComparison pydantic-model

Bases: BaseModel

Comparison of spending between two consecutive periods.

Attributes:

Name Type Description
current_period_cost float

Cost in the current period.

previous_period_cost float

Cost in the previous period.

cost_change_usd float

Absolute change in cost (computed).

cost_change_percent float | None

Percentage change in cost (computed). None when previous period cost is zero.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

current_period_cost pydantic-field

current_period_cost

Current period cost

previous_period_cost pydantic-field

previous_period_cost

Previous period cost

cost_change_usd property

cost_change_usd

Absolute cost change (current - previous).

cost_change_percent property

cost_change_percent

Percentage cost change. None when previous period cost is zero.

SpendingReport pydantic-model

Bases: BaseModel

Multi-dimensional spending report.

Attributes:

Name Type Description
summary SpendingSummary

Overall spending summary for the period.

by_task tuple[TaskSpending, ...]

Per-task spending breakdown.

by_provider tuple[ProviderDistribution, ...]

Per-provider cost distribution.

by_model tuple[ModelDistribution, ...]

Per-model cost distribution.

period_comparison PeriodComparison | None

Comparison with previous period (optional).

top_agents_by_cost tuple[tuple[NotBlankStr, float], ...]

Top agents by cost (sorted descending).

top_tasks_by_cost tuple[tuple[NotBlankStr, float], ...]

Top tasks by cost (sorted descending).

generated_at datetime

When the report was generated.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_agent_ranking_order
  • _validate_task_ranking_order

summary pydantic-field

summary

Overall spending summary

by_task pydantic-field

by_task = ()

Per-task spending breakdown

by_provider pydantic-field

by_provider = ()

Per-provider cost distribution

by_model pydantic-field

by_model = ()

Per-model cost distribution

period_comparison pydantic-field

period_comparison = None

Comparison with previous period

top_agents_by_cost pydantic-field

top_agents_by_cost = ()

Top agents by cost (agent_id, cost_usd)

top_tasks_by_cost pydantic-field

top_tasks_by_cost = ()

Top tasks by cost (task_id, cost_usd)

generated_at pydantic-field

generated_at

When the report was generated

ReportGenerator

ReportGenerator(*, cost_tracker, budget_config)

Generates multi-dimensional spending reports.

Composes CostTracker and BudgetConfig to produce reports with breakdowns by task, provider, model, and period comparison.

Parameters:

Name Type Description Default
cost_tracker CostTracker

Cost tracking service for querying spend.

required
budget_config BudgetConfig

Budget configuration for context.

required
Source code in src/synthorg/budget/reports.py
def __init__(
    self,
    *,
    cost_tracker: CostTracker,
    budget_config: BudgetConfig,
) -> None:
    self._cost_tracker = cost_tracker
    self._budget_config = budget_config
    logger.debug(
        CFO_REPORT_GENERATOR_CREATED,
        has_budget_config=True,
    )

generate_report async

generate_report(*, start, end, top_n=10, include_period_comparison=True)

Generate a spending report for the given period.

Fetches records and summary concurrently; derives total_cost from the records snapshot for consistent distribution percentages.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required
top_n int

Maximum number of top agents/tasks to include.

10
include_period_comparison bool

Whether to compute a comparison with the previous period of the same duration.

True

Returns:

Type Description
SpendingReport

Multi-dimensional spending report.

Raises:

Type Description
ValueError

If start >= end or top_n < 1.

Source code in src/synthorg/budget/reports.py
async def generate_report(
    self,
    *,
    start: datetime,
    end: datetime,
    top_n: int = 10,
    include_period_comparison: bool = True,
) -> SpendingReport:
    """Generate a spending report for the given period.

    Fetches records and summary concurrently; derives ``total_cost``
    from the records snapshot for consistent distribution
    percentages.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.
        top_n: Maximum number of top agents/tasks to include.
        include_period_comparison: Whether to compute a comparison
            with the previous period of the same duration.

    Returns:
        Multi-dimensional spending report.

    Raises:
        ValueError: If ``start >= end`` or ``top_n < 1``.
    """
    if start >= end:
        logger.warning(
            CFO_REPORT_VALIDATION_ERROR,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)
    if top_n < 1:
        logger.warning(
            CFO_REPORT_VALIDATION_ERROR,
            error="top_n_below_minimum",
            top_n=top_n,
        )
        msg = f"top_n must be >= 1, got {top_n}"
        raise ValueError(msg)

    now = datetime.now(UTC)

    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )
    summary = await self._cost_tracker.build_summary(
        start=start,
        end=end,
    )

    # Derive total_cost from records for consistent percentages
    total_cost = round(
        math.fsum(r.cost_usd for r in records),
        BUDGET_ROUNDING_PRECISION,
    )
    by_task = _build_task_spendings(records)
    by_provider = _build_provider_distribution(records, total_cost)
    by_model = _build_model_distribution(records, total_cost)

    top_agents = _build_top_agents(summary, top_n)
    top_tasks = _build_top_tasks(by_task, top_n)

    period_comparison: PeriodComparison | None = None
    if include_period_comparison:
        period_comparison = await self._build_period_comparison(
            start,
            end,
            total_cost,
        )

    report = SpendingReport(
        summary=summary,
        by_task=by_task,
        by_provider=by_provider,
        by_model=by_model,
        period_comparison=period_comparison,
        top_agents_by_cost=top_agents,
        top_tasks_by_cost=top_tasks,
        generated_at=now,
    )

    logger.info(
        CFO_REPORT_GENERATED,
        total_cost_usd=total_cost,
        task_count=len(by_task),
        provider_count=len(by_provider),
        model_count=len(by_model),
        has_comparison=period_comparison is not None,
    )

    return report

Enums

enums

Budget-specific enumerations.

BudgetAlertLevel

Bases: StrEnum

Alert severity levels for budget thresholds.

Used by :class:~synthorg.budget.spending_summary.SpendingSummary to indicate the current budget state relative to configured thresholds.

Errors

errors

Budget-layer error hierarchy.

Defines budget-specific exceptions in a leaf module with minimal intra-project imports, preventing circular dependency chains when these exceptions are needed by both the budget enforcer and the engine layer.

BudgetExhaustedError

Bases: Exception

Budget exhaustion signal.

Used in two contexts:

  1. Raised directly by :meth:BudgetEnforcer.check_can_execute when pre-flight budget checks fail (e.g., monthly hard stop, daily limit, or provider quota exceeded).
  2. Caught by the engine layer (AgentEngine.run) and used to build an AgentRunResult with TerminationReason.BUDGET_EXHAUSTED.

DailyLimitExceededError

Bases: BudgetExhaustedError

Per-agent daily spending limit exceeded.

RiskBudgetExhaustedError

RiskBudgetExhaustedError(
    msg, *, agent_id=None, task_id=None, risk_units_used=0.0, risk_limit=0.0
)

Bases: BudgetExhaustedError

Raised when cumulative risk budget is exhausted.

Subclass of BudgetExhaustedError so existing engine-level catch handlers cover it transparently.

Attributes:

Name Type Description
agent_id

The agent that exceeded the limit, or None.

task_id

The task during which the limit was exceeded, or None.

risk_units_used

Cumulative risk units consumed.

risk_limit

The limit that was exceeded.

Source code in src/synthorg/budget/errors.py
def __init__(
    self,
    msg: str,
    *,
    agent_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
    risk_units_used: float = 0.0,
    risk_limit: float = 0.0,
) -> None:
    super().__init__(msg)
    self.agent_id = agent_id
    self.task_id = task_id
    self.risk_units_used = risk_units_used
    self.risk_limit = risk_limit

QuotaExhaustedError

QuotaExhaustedError(msg, *, provider_name=None, degradation_action=None)

Bases: BudgetExhaustedError

Raised when provider quota is exhausted and unresolvable.

Covers all terminal degradation outcomes: ALERT strategy (intentional immediate raise), failed FALLBACK (no providers available or all exhausted), and failed QUEUE (wait exceeded or still exhausted after waiting).

Attributes:

Name Type Description
provider_name

The provider whose quota was exhausted, or None when not available.

degradation_action

The degradation strategy that was attempted, or None when not available.

Source code in src/synthorg/budget/errors.py
def __init__(
    self,
    msg: str,
    *,
    provider_name: NotBlankStr | None = None,
    degradation_action: DegradationAction | None = None,
) -> None:
    super().__init__(msg)
    self.provider_name = provider_name
    self.degradation_action = degradation_action