Skip to content

Budget

Cost tracking, budget enforcement, auto-downgrade, quota management, and CFO optimization.

Config

config

Budget configuration models.

Implements the Cost Controls section of the Operations design page: alert thresholds, per-task and per-agent limits, automatic model downgrade, and risk budget configuration.

BudgetAlertConfig pydantic-model

Bases: BaseModel

Alert threshold configuration for budget monitoring.

Thresholds are expressed as percentages of the total monthly budget. They must be strictly ordered: warn_at < critical_at < hard_stop_at.

Attributes:

Name Type Description
warn_at int

Percentage of budget that triggers a warning alert.

critical_at int

Percentage of budget that triggers a critical alert.

hard_stop_at int

Percentage of budget that triggers a hard stop.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors
  • _validate_threshold_ordering

warn_at pydantic-field

warn_at = 75

Percent of budget triggering warning

critical_at pydantic-field

critical_at = 90

Percent of budget triggering critical alert

hard_stop_at pydantic-field

hard_stop_at = 100

Percent of budget triggering hard stop

AutoDowngradeConfig pydantic-model

Bases: BaseModel

Automatic model downgrade configuration.

When enabled, models are downgraded to cheaper alternatives once budget usage exceeds threshold percent. The downgrade_map is stored as a tuple of (source_alias, target_alias) pairs to maintain immutability.

Attributes:

Name Type Description
enabled bool

Whether auto-downgrade is active.

threshold int

Budget percent that triggers downgrade.

downgrade_map tuple[tuple[NotBlankStr, NotBlankStr], ...]

Ordered pairs of (from_alias, to_alias).

boundary Literal['task_assignment']

When to apply downgrade (task_assignment only, never mid-execution per the Operations design page).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors
  • _normalize_downgrade_map
  • _validate_downgrade_map

enabled pydantic-field

enabled = False

Whether auto-downgrade is active

threshold pydantic-field

threshold = 85

Budget percent triggering downgrade

downgrade_map pydantic-field

downgrade_map = ()

Ordered pairs of (from_alias, to_alias)

boundary pydantic-field

boundary = 'task_assignment'

When to apply downgrade (task_assignment only, never mid-execution)

BudgetConfig pydantic-model

Bases: BaseModel

Top-level budget configuration for a company.

Defines the overall monthly budget, alert thresholds, per-task and per-agent spending limits, and automatic model downgrade settings.

Attributes:

Name Type Description
total_monthly float

Monthly budget limit.

alerts BudgetAlertConfig

Alert threshold configuration.

per_task_limit float

Maximum cost per task.

per_agent_daily_limit float

Maximum cost per agent per day.

auto_downgrade AutoDowngradeConfig

Automatic model downgrade configuration.

reset_day int

Day of month when budget resets (1-28, avoids month-length issues).

currency CurrencyCode

ISO 4217 currency code for display formatting.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _apply_mirrors
  • _validate_per_task_limit_within_monthly
  • _validate_per_agent_daily_limit_within_monthly

total_monthly pydantic-field

total_monthly = 100.0

Monthly budget limit

alerts pydantic-field

alerts

Alert threshold configuration

per_task_limit pydantic-field

per_task_limit = 5.0

Maximum cost per task

per_agent_daily_limit pydantic-field

per_agent_daily_limit = 10.0

Maximum cost per agent per day

auto_downgrade pydantic-field

auto_downgrade

Automatic model downgrade configuration

reset_day pydantic-field

reset_day = 1

Day of month when budget resets (1-28, avoids month-length issues)

currency pydantic-field

currency = DEFAULT_CURRENCY

ISO 4217 currency code stamped onto every new cost record and used for display formatting. SynthOrg does not convert provider costs -- provider token prices are reported in the provider-native currency (see DEFAULT_CURRENCY) and changing this setting relabels the code stamped onto subsequent records without translating any numeric values. Historical rows retain the code that was active when they were written.

risk_budget pydantic-field

risk_budget

Cumulative risk-unit action budget configuration

pte_tracking_enabled pydantic-field

pte_tracking_enabled = False

Enable Prefill Token Equivalents tracking (observability-only)

Cost Record

cost_record

Cost record model for per-API-call tracking.

Implements the Cost Tracking section of the Operations design page: every API call is tracked as an immutable cost record (append-only pattern).

CostRecord pydantic-model

Bases: BaseModel

Immutable record of a single API call's cost.

Once created, a CostRecord cannot be modified (frozen model). This enforces the append-only pattern: new records are created for each API call; existing records are never updated.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier (string reference).

task_id NotBlankStr

Task identifier (string reference).

provider NotBlankStr

LLM provider name.

model NotBlankStr

Model identifier.

input_tokens int

Input token count.

output_tokens int

Output token count.

cost float

Numeric cost of the call, denominated in currency. Every record carries its own currency so aggregators can enforce same-currency invariants without relying on a global configuration value; see currency for the accompanying ISO 4217 code.

currency CurrencyCode

ISO 4217 currency code for cost. See :class:synthorg.budget.currency.CurrencyCode and the _KNOWN_ISO4217 allowlist in synthorg.budget.currency for the accepted values; concrete code literals are deliberately not listed here so this docstring does not drift from the allowlist or privilege a specific region.

timestamp AwareDatetime

Timezone-aware timestamp of the API call.

call_category LLMCallCategory | None

Optional LLM call category (productive, coordination, system, embedding).

accuracy_effort_ratio float | None

Accuracy-effort ratio for the task this call belongs to (populated at task completion when quality signals are available, None otherwise).

latency_ms float | None

Round-trip latency in milliseconds (None if not measured).

cache_hit bool | None

Whether the provider served this call from cache.

retry_count int | None

Number of retry attempts before success (0 = first try succeeded).

retry_reason str | None

Exception type name of the last retried error.

finish_reason FinishReason | None

LLM finish reason for this call.

success bool | None

Whether the call completed without error or content filter.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_token_consistency
  • _validate_retry_consistency

agent_id pydantic-field

agent_id

Agent identifier

task_id pydantic-field

task_id

Task identifier

project_id pydantic-field

project_id = None

Project this cost belongs to

provider pydantic-field

provider

LLM provider name

model pydantic-field

model

Model identifier

input_tokens pydantic-field

input_tokens

Input token count

output_tokens pydantic-field

output_tokens

Output token count

cost pydantic-field

cost

Numeric cost of the call, denominated in currency

currency pydantic-field

currency

ISO 4217 currency code for cost

timestamp pydantic-field

timestamp

Timestamp of the API call

call_category pydantic-field

call_category = None

LLM call category (productive, coordination, system, embedding)

accuracy_effort_ratio pydantic-field

accuracy_effort_ratio = None

Accuracy-effort ratio for the task this call belongs to (populated at task completion when quality signals are available)

latency_ms pydantic-field

latency_ms = None

Round-trip latency in milliseconds

cache_hit pydantic-field

cache_hit = None

Whether the provider served this call from cache

retry_count pydantic-field

retry_count = None

Number of retry attempts before success

retry_reason pydantic-field

retry_reason = None

Exception type name of the last retried error

finish_reason pydantic-field

finish_reason = None

LLM finish reason for this call

success pydantic-field

success = None

Whether the call completed without error or content filter

claim_id pydantic-field

claim_id

Idempotency key for this billing event. Generated once at construction (UUID4 by default) so retries / JetStream redelivery / in-process tracker double-submission cannot double-bill: CostTracker.record keeps a bounded LRU of seen claim_id values and treats repeats as no-ops.

Spending Summary

spending_summary

Spending summary models for aggregated cost reporting.

Provides the aggregation data structures used by :class:~synthorg.budget.tracker.CostTracker for cost reporting and designed for consumption by the CFO agent (see Operations design page). Views of :class:~synthorg.budget.cost_record.CostRecord data are aggregated by agent, department, and time period.

PeriodSpending pydantic-model

Bases: _SpendingTotals

Spending aggregation for a specific time period.

Attributes:

Name Type Description
start datetime

Period start (inclusive).

end datetime

Period end (exclusive).

Fields:

  • total_cost (float)
  • currency (CurrencyCode | None)
  • total_input_tokens (int)
  • total_output_tokens (int)
  • record_count (int)
  • start (datetime)
  • end (datetime)

Validators:

  • _validate_currency_presence
  • _validate_period_ordering

start pydantic-field

start

Period start (inclusive)

end pydantic-field

end

Period end (exclusive)

AgentSpending pydantic-model

Bases: _SpendingTotals

Spending aggregation for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

Fields:

  • total_cost (float)
  • currency (CurrencyCode | None)
  • total_input_tokens (int)
  • total_output_tokens (int)
  • record_count (int)
  • agent_id (NotBlankStr)

Validators:

  • _validate_currency_presence

agent_id pydantic-field

agent_id

Agent identifier

DepartmentSpending pydantic-model

Bases: _SpendingTotals

Spending aggregation for a department.

Attributes:

Name Type Description
department_name NotBlankStr

Department name.

Fields:

  • total_cost (float)
  • currency (CurrencyCode | None)
  • total_input_tokens (int)
  • total_output_tokens (int)
  • record_count (int)
  • department_name (NotBlankStr)

Validators:

  • _validate_currency_presence

department_name pydantic-field

department_name

Department name

SpendingSummary pydantic-model

Bases: BaseModel

Top-level spending summary combining all aggregation dimensions.

Provides a snapshot of spending broken down by time period, agent, and department, along with budget utilization context.

Attributes:

Name Type Description
period PeriodSpending

Time-period aggregation.

by_agent tuple[AgentSpending, ...]

Per-agent spending breakdown.

by_department tuple[DepartmentSpending, ...]

Per-department spending breakdown.

budget_total_monthly float

Monthly budget for context.

budget_used_percent float

Percent of budget consumed.

alert_level BudgetAlertLevel

Current budget alert level.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_unique_agent_ids
  • _validate_unique_department_names

period pydantic-field

period

Time-period aggregation

by_agent pydantic-field

by_agent = ()

Per-agent spending breakdown

by_department pydantic-field

by_department = ()

Per-department spending breakdown

budget_total_monthly pydantic-field

budget_total_monthly = 0.0

Monthly budget for context

budget_used_percent pydantic-field

budget_used_percent = 0.0

Percent of budget consumed

alert_level pydantic-field

alert_level = NORMAL

Current budget alert level

Cost Tiers

cost_tiers

Cost tier definitions and classification.

Provides configurable metadata for cost tiers: price ranges, display properties, and model-to-tier classification. The built-in CostTier enum (synthorg.core.enums) defines the tier values; this module adds a configurable layer on top.

CostTierDefinition pydantic-model

Bases: BaseModel

Metadata for a single cost tier.

Attributes:

Name Type Description
id NotBlankStr

Unique tier identifier (e.g. "low", "custom-budget").

display_name NotBlankStr

Human-readable name.

description str

What this tier represents.

price_range_min float

Minimum cost_per_1k_total for models in this tier.

price_range_max float | None

Maximum cost_per_1k_total; None means unbounded above.

color str

Hex color for UI rendering.

icon str

Icon identifier for UI rendering.

sort_order int

Display ordering (lower = cheaper).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_price_range

id pydantic-field

id

Unique tier identifier

display_name pydantic-field

display_name

Human-readable name

description pydantic-field

description = ''

What this tier represents

price_range_min pydantic-field

price_range_min = 0.0

Minimum cost_per_1k_total in the configured currency

price_range_max pydantic-field

price_range_max = None

Maximum cost_per_1k_total in the configured currency; None = unbounded

color pydantic-field

color = '#6b7280'

Hex color for UI

icon pydantic-field

icon = 'circle'

Icon identifier for UI

sort_order pydantic-field

sort_order = 0

Display ordering (lower = cheaper)

CostTiersConfig pydantic-model

Bases: BaseModel

Configuration for cost tier definitions.

Attributes:

Name Type Description
tiers tuple[CostTierDefinition, ...]

User-defined tier overrides/additions.

include_builtin bool

Whether to merge built-in default tiers.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_unique_ids

tiers pydantic-field

tiers = ()

User-defined tier overrides/additions

include_builtin pydantic-field

include_builtin = True

Whether to merge built-in default tiers

resolve_tiers

resolve_tiers(config)

Merge built-in and user-defined tiers, sorted by sort_order.

User tiers override built-in tiers with the same ID.

Parameters:

Name Type Description Default
config CostTiersConfig

Cost tiers configuration.

required

Returns:

Type Description
tuple[CostTierDefinition, ...]

Merged and sorted tuple of tier definitions.

Source code in src/synthorg/budget/cost_tiers.py
def resolve_tiers(
    config: CostTiersConfig,
) -> tuple[CostTierDefinition, ...]:
    """Merge built-in and user-defined tiers, sorted by sort_order.

    User tiers override built-in tiers with the same ID.

    Args:
        config: Cost tiers configuration.

    Returns:
        Merged and sorted tuple of tier definitions.
    """
    if not config.include_builtin:
        result = sorted(config.tiers, key=lambda t: t.sort_order)
        logger.debug(
            BUDGET_TIER_RESOLVED,
            tier_count=len(result),
            include_builtin=False,
        )
        return tuple(result)

    # User tiers override built-in by ID
    user_ids = {t.id for t in config.tiers}
    merged: list[CostTierDefinition] = [
        t for t in BUILTIN_TIERS if t.id not in user_ids
    ]
    merged.extend(config.tiers)
    merged.sort(key=lambda t: t.sort_order)

    logger.debug(
        BUDGET_TIER_RESOLVED,
        tier_count=len(merged),
        include_builtin=True,
        overridden_ids=sorted(user_ids & {t.id for t in BUILTIN_TIERS}),
    )
    return tuple(merged)

classify_model_tier

classify_model_tier(cost_per_1k_total, tiers)

Classify a model into a cost tier based on total cost per 1k tokens.

Matches the first tier whose price range contains the given cost. Range check: min <= cost < max (or min <= cost if max is None). If tiers have overlapping ranges, the first match in iteration order wins -- callers should ensure tiers are sorted by sort_order.

Parameters:

Name Type Description Default
cost_per_1k_total float

Combined cost_per_1k_input + cost_per_1k_output.

required
tiers tuple[CostTierDefinition, ...]

Resolved tier definitions (should be sorted by sort_order).

required

Returns:

Type Description
str | None

Tier ID of the matching tier, or None if no tier matches.

Source code in src/synthorg/budget/cost_tiers.py
def classify_model_tier(
    cost_per_1k_total: float,
    tiers: tuple[CostTierDefinition, ...],
) -> str | None:
    """Classify a model into a cost tier based on total cost per 1k tokens.

    Matches the first tier whose price range contains the given cost.
    Range check: ``min <= cost < max`` (or ``min <= cost`` if max is
    ``None``).  If tiers have overlapping ranges, the first match in
    iteration order wins -- callers should ensure tiers are sorted by
    ``sort_order``.

    Args:
        cost_per_1k_total: Combined ``cost_per_1k_input +
            cost_per_1k_output``.
        tiers: Resolved tier definitions (should be sorted by
            sort_order).

    Returns:
        Tier ID of the matching tier, or ``None`` if no tier matches.
    """
    if cost_per_1k_total < 0:
        logger.warning(
            BUDGET_TIER_CLASSIFY_MISS,
            cost_per_1k_total=cost_per_1k_total,
            tier_count=len(tiers),
            reason="negative_cost",
        )
        return None

    for tier in tiers:
        if tier.price_range_max is None:
            if cost_per_1k_total >= tier.price_range_min:
                return tier.id
        elif tier.price_range_min <= cost_per_1k_total < tier.price_range_max:
            return tier.id

    logger.debug(
        BUDGET_TIER_CLASSIFY_MISS,
        cost_per_1k_total=cost_per_1k_total,
        tier_count=len(tiers),
    )
    return None

Hierarchy

hierarchy

Budget hierarchy models.

Implements the Budget Hierarchy section of the Operations design page: Company to Department to Team, with percentage-based allocation at each level.

TeamBudget pydantic-model

Bases: BaseModel

Budget allocation for a single team within a department.

Attributes:

Name Type Description
team_name NotBlankStr

Team name (string reference).

budget_percent float

Percent of department budget allocated to this team.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

team_name pydantic-field

team_name

Team name

budget_percent pydantic-field

budget_percent = 0.0

Percent of department budget

DepartmentBudget pydantic-model

Bases: BaseModel

Budget allocation for a department with nested team allocations.

Team budget percentages may sum to less than 100% to allow for an unallocated reserve within the department.

Attributes:

Name Type Description
department_name NotBlankStr

Department name (string reference).

budget_percent float

Percent of company budget allocated to this department.

teams tuple[TeamBudget, ...]

Team budget allocations within this department.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_unique_team_names
  • _validate_team_budget_sum

department_name pydantic-field

department_name

Department name

budget_percent pydantic-field

budget_percent = 0.0

Percent of company budget

teams pydantic-field

teams = ()

Team budget allocations

BudgetHierarchy pydantic-model

Bases: BaseModel

Company-wide budget hierarchy.

Maps the Company -> Department -> Team nesting from a budget allocation perspective (see Operations design page). Department budget percentages may sum to less than 100% to allow for an unallocated reserve at the company level.

Attributes:

Name Type Description
total_monthly float

Total company monthly budget in the configured currency.

departments tuple[DepartmentBudget, ...]

Department budget allocations.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_unique_department_names
  • _validate_department_budget_sum

total_monthly pydantic-field

total_monthly

Total company monthly budget in the configured currency

departments pydantic-field

departments = ()

Department budget allocations

Tracker

tracker

Real-time cost tracking service.

Provides an in-memory store with TTL-based eviction for :class:CostRecord entries and aggregation queries consumed by the CFO agent and budget monitoring.

Service layer for the cost tracking schema defined in the Operations design page. The current implementation is purely in-memory; persistence integration is planned.

ProviderUsageSummary

Bases: NamedTuple

Per-provider usage totals for a time window.

CostTracker

CostTracker(
    *,
    budget_config=None,
    department_resolver=None,
    auto_prune_threshold=_AUTO_PRUNE_THRESHOLD,
    project_cost_repo=None,
    claim_lru_capacity=_DEFAULT_CLAIM_LRU_CAPACITY,
    clock=None,
)

Bases: CostTrackerSummaryMixin

In-memory cost tracking service with TTL-based eviction.

Records :class:CostRecord entries from LLM API calls and provides aggregation queries for budget monitoring. Memory is bounded by a soft TTL-based auto-prune: when the record count exceeds auto_prune_threshold, records older than 168 hours (7 days) are removed on the next query.

Parameters:

Name Type Description Default
budget_config BudgetConfig | None

Optional budget configuration for alert level computation. When None, alert level defaults to NORMAL and budget_used_percent to 0.0.

None
department_resolver Callable[[str], str | None] | None

Optional callable mapping agent_id to a department name. When None or returning None for an agent, the agent is excluded from department aggregation.

None
auto_prune_threshold int

Maximum record count before auto-pruning is triggered on snapshot. Defaults to 100,000.

_AUTO_PRUNE_THRESHOLD

Raises:

Type Description
ValueError

If auto_prune_threshold < 1.

Source code in src/synthorg/budget/tracker.py
def __init__(  # noqa: PLR0913 -- composable construction surface; all kwargs optional
    self,
    *,
    budget_config: BudgetConfig | None = None,
    department_resolver: Callable[[str], str | None] | None = None,
    auto_prune_threshold: int = _AUTO_PRUNE_THRESHOLD,
    project_cost_repo: ProjectCostAggregateRepository | None = None,
    claim_lru_capacity: int = _DEFAULT_CLAIM_LRU_CAPACITY,
    clock: Clock | None = None,
) -> None:
    if auto_prune_threshold < 1:
        msg = f"auto_prune_threshold must be >= 1, got {auto_prune_threshold}"
        raise ValueError(msg)
    if claim_lru_capacity < 1:
        msg = f"claim_lru_capacity must be >= 1, got {claim_lru_capacity}"
        raise ValueError(msg)
    self._records: list[CostRecord] = []
    # Defer Lock construction until the first async method runs so
    # the lock binds to the live event loop, not whichever loop (if
    # any) was current when the tracker was constructed. xdist
    # workers tear down their loop between tests and a Lock bound
    # to a closed loop deadlocks the next test.
    self._lock: asyncio.Lock | None = None
    self._budget_config = budget_config
    self._department_resolver = department_resolver
    self._auto_prune_threshold = auto_prune_threshold
    self._project_cost_repo = project_cost_repo
    self._clock: Clock = clock or SystemClock()
    # Strong references to in-flight background recording tasks
    # scheduled by the cost-recording chokepoint. Owned by the
    # tracker (one per :class:`AppState`, fresh per test) so xdist
    # workers cannot leak tasks bound to a closed event loop into
    # the next test's loop. Tasks self-evict on completion via
    # ``add_done_callback(self._pending_record_tasks.discard)``.
    self._pending_record_tasks: set[asyncio.Task[None]] = set()
    # Bounded LRU of finalised claim_ids the tracker has already
    # appended. Stored as ``OrderedDict[str, None]`` so re-
    # submission moves the key to the tail in O(1) and the head
    # can be popped on capacity overflow without scanning.
    # In-flight reservations are kept in a separate set so the
    # capacity trim never evicts a claim that is still being
    # processed; mixing both states in one ``OrderedDict`` would
    # let the trim pop a still-running reservation and allow a
    # duplicate to slip past the membership check.
    self._inflight_claims: set[str] = set()
    self._seen_claims: OrderedDict[str, None] = OrderedDict()
    self._claim_lru_capacity = claim_lru_capacity
    logger.debug(
        BUDGET_TRACKER_CREATED,
        has_budget_config=budget_config is not None,
        has_department_resolver=department_resolver is not None,
        has_project_cost_repo=project_cost_repo is not None,
        claim_lru_capacity=claim_lru_capacity,
    )

budget_config property

budget_config

The optional budget configuration.

Returns:

Type Description
BudgetConfig | None

Budget config if set, else None.

record async

record(cost_record)

Append a cost record.

The in-memory append runs under _lock. After the lock is released, _update_project_aggregate is awaited to update the durable project cost aggregate when the record has a project_id and a repository is configured. Aggregate updates are best-effort: failures are logged at WARNING but do not affect the in-memory recording.

When a BudgetConfig is attached, the incoming record's currency must match budget_config.currency; mismatches raise :class:MixedCurrencyAggregationError at the ingestion boundary so downstream aggregators never see mixed-currency data in the first place.

Idempotency: a bounded LRU of accepted cost_record.claim_id values protects against double-bills from JetStream redelivery or in-process retries. Repeat submissions are no-ops and emit BUDGET_RECORD_DEDUPED at INFO. Eviction at the LRU capacity is best-effort; once a claim ages out, a re-submitted record with the same key is treated as fresh.

Parameters:

Name Type Description Default
cost_record CostRecord

Immutable cost record to store.

required

Raises:

Type Description
MixedCurrencyAggregationError

If the record's currency does not match the configured budget.currency.

Source code in src/synthorg/budget/tracker.py
async def record(self, cost_record: CostRecord) -> None:
    """Append a cost record.

    The in-memory append runs under ``_lock``.  After the lock
    is released, ``_update_project_aggregate`` is awaited to
    update the durable project cost aggregate when the record
    has a ``project_id`` and a repository is configured.
    Aggregate updates are best-effort: failures are logged at
    WARNING but do not affect the in-memory recording.

    When a ``BudgetConfig`` is attached, the incoming record's
    ``currency`` must match ``budget_config.currency``; mismatches
    raise :class:`MixedCurrencyAggregationError` at the ingestion
    boundary so downstream aggregators never see mixed-currency
    data in the first place.

    Idempotency: a bounded LRU of accepted ``cost_record.claim_id``
    values protects against double-bills from JetStream redelivery
    or in-process retries. Repeat submissions are no-ops and emit
    ``BUDGET_RECORD_DEDUPED`` at INFO. Eviction at the LRU
    capacity is best-effort; once a claim ages out, a re-submitted
    record with the same key is treated as fresh.

    Args:
        cost_record: Immutable cost record to store.

    Raises:
        MixedCurrencyAggregationError: If the record's currency
            does not match the configured ``budget.currency``.
    """
    # Currency check first -- it's synchronous, has no in-flight
    # state to roll back, and a mismatch is a hard caller-contract
    # violation that must surface BEFORE we reserve a claim_id
    # slot. Per-project same-currency invariant is enforced by
    # ``ProjectCostAggregateRepository.increment``; no tracker-
    # side pin is required there.
    if (
        self._budget_config is not None
        and cost_record.currency != self._budget_config.currency
    ):
        # Log at WARNING with full record + budget context BEFORE
        # raising so a downstream catch-and-translate cannot hide
        # repeated mismatches from operators.
        logger.warning(
            BUDGET_MIXED_CURRENCY_REJECTED,
            agent_id=cost_record.agent_id,
            task_id=cost_record.task_id,
            project_id=cost_record.project_id,
            record_currency=cost_record.currency,
            budget_currency=self._budget_config.currency,
        )
        msg = (
            f"Record currency {cost_record.currency!r} does not match "
            f"configured budget currency "
            f"{self._budget_config.currency!r}"
        )
        raise MixedCurrencyAggregationError(
            msg,
            currencies=frozenset(
                {
                    cost_record.currency,
                    self._budget_config.currency,
                }
            ),
            agent_id=cost_record.agent_id,
            task_id=cost_record.task_id,
            project_id=cost_record.project_id,
        )

    # Idempotency fast-path with in-flight reservation. ``_lock``
    # protects both ``_inflight_claims`` (set of in-flight reservations)
    # and ``_seen_claims`` (bounded LRU of finalised entries). We
    # check both states under the lock, then either dedupe (if
    # already seen / in flight) or reserve the claim in
    # ``_inflight_claims`` so a concurrent second call sees the
    # entry and dedupes immediately. Keeping in-flight separate
    # from the LRU prevents the capacity trim from popping a
    # still-running reservation, which would let a duplicate
    # slip past the membership check.
    async with self._get_lock():
        if (
            cost_record.claim_id in self._inflight_claims
            or cost_record.claim_id in self._seen_claims
        ):
            if cost_record.claim_id in self._seen_claims:
                self._seen_claims.move_to_end(cost_record.claim_id)
            logger.info(
                BUDGET_RECORD_DEDUPED,
                claim_id=cost_record.claim_id,
                agent_id=cost_record.agent_id,
                task_id=cost_record.task_id,
                provider=cost_record.provider,
                model=cost_record.model,
                cost=cost_record.cost,
            )
            return
        self._inflight_claims.add(cost_record.claim_id)

    # Run the durable aggregate update OUTSIDE the lock -- DB I/O
    # must not block concurrent in-memory readers/writers. Any
    # failure releases the in-flight reservation so a retry with
    # the same claim_id is not falsely deduped.
    try:
        await self._update_project_aggregate(cost_record)
    except BaseException:
        async with self._get_lock():
            self._inflight_claims.discard(cost_record.claim_id)
        raise

    async with self._get_lock():
        # Promote the reservation to a finalised LRU entry under
        # the lock so the membership check above never observes
        # a gap where the claim is in neither set. Eviction only
        # affects ``_seen_claims``, so still-running reservations
        # in ``_inflight_claims`` are untouched.
        self._inflight_claims.discard(cost_record.claim_id)
        self._records.append(cost_record)
        self._seen_claims[cost_record.claim_id] = None
        self._seen_claims.move_to_end(cost_record.claim_id)
        while len(self._seen_claims) > self._claim_lru_capacity:
            self._seen_claims.popitem(last=False)
        logger.info(
            BUDGET_RECORD_ADDED,
            agent_id=cost_record.agent_id,
            model=cost_record.model,
            cost=cost_record.cost,
        )

prune_expired async

prune_expired(*, now=None)

Remove records older than the 168-hour (7-day) cost window.

Call periodically from long-running services to bound memory growth.

Parameters:

Name Type Description Default
now datetime | None

Reference time. Defaults to self._clock.now() so FakeClock injection deterministically controls the eviction cutoff.

None

Returns:

Type Description
int

Number of records removed.

Source code in src/synthorg/budget/tracker.py
async def prune_expired(self, *, now: datetime | None = None) -> int:
    """Remove records older than the 168-hour (7-day) cost window.

    Call periodically from long-running services to bound
    memory growth.

    Args:
        now: Reference time.  Defaults to ``self._clock.now()`` so
            ``FakeClock`` injection deterministically controls the
            eviction cutoff.

    Returns:
        Number of records removed.
    """
    ref = now or self._clock.now()
    cutoff = ref - timedelta(hours=_COST_WINDOW_HOURS)
    async with self._get_lock():
        pruned = self._prune_before(cutoff)
        if pruned:
            logger.info(
                BUDGET_RECORDS_PRUNED,
                pruned=pruned,
                remaining=len(self._records),
            )
        return pruned

get_total_cost async

get_total_cost(*, start=None, end=None)

Sum of cost across all records, optionally filtered by time.

Parameters:

Name Type Description Default
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
float

Rounded total cost in the configured currency.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_total_cost(
    self,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> float:
    """Sum of ``cost`` across all records, optionally filtered by time.

    Args:
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Rounded total cost in the configured currency.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    query_start = time.perf_counter()
    try:
        _validate_time_range(start, end)
        logger.debug(BUDGET_TOTAL_COST_QUERIED, start=start, end=end)
        snapshot = await self._snapshot()
        filtered = _filter_records(snapshot, start=start, end=end)
        return _aggregate(filtered).cost
    finally:
        record_budget_query(
            query_type="total_cost",
            duration_sec=time.perf_counter() - query_start,
        )

get_agent_cost async

get_agent_cost(agent_id, *, start=None, end=None)

Sum of cost for a single agent, optionally filtered by time.

Parameters:

Name Type Description Default
agent_id str

Agent identifier to filter by.

required
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
float

Rounded total cost in the configured currency for the agent.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_agent_cost(
    self,
    agent_id: str,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> float:
    """Sum of ``cost`` for a single agent, optionally filtered by time.

    Args:
        agent_id: Agent identifier to filter by.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Rounded total cost in the configured currency for the agent.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    query_start = time.perf_counter()
    try:
        _validate_time_range(start, end)
        logger.debug(
            BUDGET_AGENT_COST_QUERIED,
            agent_id=agent_id,
            start=start,
            end=end,
        )
        snapshot = await self._snapshot()
        filtered = _filter_records(
            snapshot,
            agent_id=agent_id,
            start=start,
            end=end,
        )
        return _aggregate(filtered).cost
    finally:
        record_budget_query(
            query_type="agent_cost",
            duration_sec=time.perf_counter() - query_start,
        )

get_project_cost async

get_project_cost(project_id, *, start=None, end=None)

Sum of cost for a single project.

Parameters:

Name Type Description Default
project_id NotBlankStr

Project identifier to filter by.

required
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
float

Rounded total cost in the configured currency for the project.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_project_cost(
    self,
    project_id: NotBlankStr,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> float:
    """Sum of ``cost`` for a single project.

    Args:
        project_id: Project identifier to filter by.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Rounded total cost in the configured currency for the project.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    query_start = time.perf_counter()
    try:
        _validate_time_range(start, end)
        logger.debug(
            BUDGET_PROJECT_COST_QUERIED,
            project_id=project_id,
            start=start,
            end=end,
        )
        snapshot = await self._snapshot()
        filtered = _filter_records(
            snapshot,
            project_id=project_id,
            start=start,
            end=end,
        )
        return _aggregate(filtered).cost
    finally:
        record_budget_query(
            query_type="project_cost",
            duration_sec=time.perf_counter() - query_start,
        )

get_project_records async

get_project_records(project_id, *, start=None, end=None)

Return cost records for a specific project.

Parameters:

Name Type Description Default
project_id NotBlankStr

Project identifier to filter by.

required
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
tuple[CostRecord, ...]

Immutable tuple of matching cost records.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_project_records(
    self,
    project_id: NotBlankStr,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> tuple[CostRecord, ...]:
    """Return cost records for a specific project.

    Args:
        project_id: Project identifier to filter by.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Immutable tuple of matching cost records.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(
        BUDGET_PROJECT_RECORDS_QUERIED,
        project_id=project_id,
        start=start,
        end=end,
    )
    snapshot = await self._snapshot()
    return _filter_records(
        snapshot,
        project_id=project_id,
        start=start,
        end=end,
    )

get_record_count async

get_record_count()

Total number of recorded cost entries.

Returns:

Type Description
int

Number of cost records.

Source code in src/synthorg/budget/tracker.py
async def get_record_count(self) -> int:
    """Total number of recorded cost entries.

    Returns:
        Number of cost records.
    """
    async with self._get_lock():
        return len(self._records)

get_records async

get_records(*, agent_id=None, task_id=None, provider=None, start=None, end=None)

Return filtered cost records.

Returns an immutable snapshot of records matching the filters.

Parameters:

Name Type Description Default
agent_id str | None

Filter by agent.

None
task_id str | None

Filter by task.

None
provider NotBlankStr | None

Filter by provider name.

None
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
tuple[CostRecord, ...]

Immutable tuple of matching cost records.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_records(
    self,
    *,
    agent_id: str | None = None,
    task_id: str | None = None,
    provider: NotBlankStr | None = None,
    start: datetime | None = None,
    end: datetime | None = None,
) -> tuple[CostRecord, ...]:
    """Return filtered cost records.

    Returns an immutable snapshot of records matching the filters.

    Args:
        agent_id: Filter by agent.
        task_id: Filter by task.
        provider: Filter by provider name.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Immutable tuple of matching cost records.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(
        BUDGET_RECORDS_QUERIED,
        agent_id=agent_id,
        task_id=task_id,
        provider=provider,
        start=start,
        end=end,
    )
    snapshot = await self._snapshot()
    return _filter_records(
        snapshot,
        agent_id=agent_id,
        task_id=task_id,
        provider=provider,
        start=start,
        end=end,
    )

get_provider_usage async

get_provider_usage(provider_name, *, start=None, end=None)

Return aggregated token and cost totals for a provider.

Parameters:

Name Type Description Default
provider_name NotBlankStr

Provider to aggregate usage for.

required
start datetime | None

Inclusive lower bound on timestamp.

None
end datetime | None

Exclusive upper bound on timestamp.

None

Returns:

Type Description
ProviderUsageSummary

Total tokens (input + output) and total cost.

Raises:

Type Description
ValueError

If both start and end are given and start >= end.

Source code in src/synthorg/budget/tracker.py
async def get_provider_usage(
    self,
    provider_name: NotBlankStr,
    *,
    start: datetime | None = None,
    end: datetime | None = None,
) -> ProviderUsageSummary:
    """Return aggregated token and cost totals for a provider.

    Args:
        provider_name: Provider to aggregate usage for.
        start: Inclusive lower bound on ``timestamp``.
        end: Exclusive upper bound on ``timestamp``.

    Returns:
        Total tokens (input + output) and total cost.

    Raises:
        ValueError: If both *start* and *end* are given and
            ``start >= end``.
    """
    _validate_time_range(start, end)
    logger.debug(
        BUDGET_PROVIDER_USAGE_QUERIED,
        provider=provider_name,
        start=start,
        end=end,
    )
    snapshot = await self._snapshot()
    filtered = _filter_records(
        snapshot,
        provider=provider_name,
        start=start,
        end=end,
    )
    if not filtered:
        return ProviderUsageSummary(total_tokens=0, total_cost=0.0)
    agg = _aggregate(filtered)
    return ProviderUsageSummary(
        total_tokens=agg.input_tokens + agg.output_tokens,
        total_cost=agg.cost,
    )

clear

clear()

Reset all recorded cost data for test isolation.

Deliberately synchronous and does not acquire _lock. record() and _snapshot() serialise concurrent async readers/writers via async with self._lock; clear() is the test-only counterpart, called from sync test-fixture setup after the previous event loop has closed and cancelled any in-flight tasks. Under that invariant no async coroutine can race with this method, and list.clear() is a single atomic C-level operation under the GIL. Calling this method from production / async code is unsupported.

Also resets _seen_claims and _inflight_claims so a reused claim_id on a fresh test does not get falsely deduped.

Source code in src/synthorg/budget/tracker.py
def clear(self) -> None:
    """Reset all recorded cost data for test isolation.

    Deliberately synchronous and does **not** acquire ``_lock``.
    ``record()`` and ``_snapshot()`` serialise concurrent async
    readers/writers via ``async with self._lock``; ``clear()`` is
    the test-only counterpart, called from sync test-fixture
    setup *after* the previous event loop has closed and cancelled
    any in-flight tasks.  Under that invariant no async coroutine
    can race with this method, and ``list.clear()`` is a single
    atomic C-level operation under the GIL.  Calling this method
    from production / async code is unsupported.

    Also resets ``_seen_claims`` and ``_inflight_claims`` so a
    reused ``claim_id`` on a fresh test does not get falsely
    deduped.
    """
    cleared_count = len(self._records)
    self._records.clear()
    self._seen_claims.clear()
    self._inflight_claims.clear()
    logger.info(BUDGET_TRACKER_CLEARED, cleared_count=cleared_count)

track_pending_record

track_pending_record(task)

Hold a strong reference to a background recording task.

The cost-recording chokepoint schedules cost_tracker.record(...) as a background task so the user-visible provider.complete() response is never blocked on tracker I/O. asyncio's loop only keeps weak references to tasks, so without an external strong reference the loop's GC may cancel an in-flight task. This method registers the strong reference and wires a self-eviction callback so the set never grows beyond in-flight tasks.

Tracker ownership of the set means each test (which constructs its own :class:CostTracker) gets a fresh, isolated set -- leaked tasks from a prior test bound to a closed event loop cannot poison the next test's loop.

Source code in src/synthorg/budget/tracker.py
def track_pending_record(self, task: asyncio.Task[None]) -> None:
    """Hold a strong reference to a background recording task.

    The cost-recording chokepoint schedules ``cost_tracker.record(...)``
    as a background task so the user-visible ``provider.complete()``
    response is never blocked on tracker I/O. asyncio's loop only
    keeps weak references to tasks, so without an external strong
    reference the loop's GC may cancel an in-flight task. This
    method registers the strong reference and wires a self-eviction
    callback so the set never grows beyond in-flight tasks.

    Tracker ownership of the set means each test (which constructs
    its own :class:`CostTracker`) gets a fresh, isolated set --
    leaked tasks from a prior test bound to a closed event loop
    cannot poison the next test's loop.
    """
    self._pending_record_tasks.add(task)
    task.add_done_callback(self._pending_record_tasks.discard)

drain_pending_records async

drain_pending_records()

Wait for all in-flight background record tasks to settle.

Test-only utility: tests that need to observe CostTracker state immediately after a provider.complete() call can await tracker.drain_pending_records() to deterministically wait for the recording side effect.

No-op when there are no pending tasks. Recoverable failures inside the background tasks are already logged + swallowed in _record_cost_in_background (see :mod:synthorg.providers.cost_recording). :class:MemoryError and :class:RecursionError propagate so a drain invoked from a test path doesn't silently swallow interpreter-fatal signals via return_exceptions=True. :class:asyncio.CancelledError is re-raised so cancellation propagates instead of producing a misleading WARN log: a cancelled background task is the expected outcome of a graceful shutdown or a test cancelling the surrounding TaskGroup, not a regression.

Source code in src/synthorg/budget/tracker.py
async def drain_pending_records(self) -> None:
    """Wait for all in-flight background record tasks to settle.

    Test-only utility: tests that need to observe ``CostTracker``
    state immediately after a ``provider.complete()`` call can
    ``await tracker.drain_pending_records()`` to deterministically
    wait for the recording side effect.

    No-op when there are no pending tasks. Recoverable failures
    inside the background tasks are already logged + swallowed in
    ``_record_cost_in_background`` (see
    :mod:`synthorg.providers.cost_recording`).
    :class:`MemoryError` and :class:`RecursionError` propagate so a
    ``drain`` invoked from a test path doesn't silently swallow
    interpreter-fatal signals via ``return_exceptions=True``.
    :class:`asyncio.CancelledError` is re-raised so cancellation
    propagates instead of producing a misleading WARN log: a
    cancelled background task is the *expected* outcome of a
    graceful shutdown or a test cancelling the surrounding
    ``TaskGroup``, not a regression.
    """
    if not self._pending_record_tasks:
        return
    # Snapshot before awaiting: ``_pending_record_tasks`` is mutated
    # by the ``add_done_callback`` registered above, and iterating
    # the live set while it shrinks would risk skipping tasks.
    pending = tuple(self._pending_record_tasks)
    results = await asyncio.gather(*pending, return_exceptions=True)
    cancelled_count = 0
    for outcome in results:
        if isinstance(outcome, (MemoryError, RecursionError)):
            raise outcome
        if isinstance(outcome, asyncio.CancelledError):
            # Cancellation is expected during graceful shutdown;
            # count for the propagation below but don't WARN.
            cancelled_count += 1
            continue
        if isinstance(outcome, BaseException):
            # ``_record_cost_in_background`` already logs + swallows
            # recoverable failures, so reaching this branch means
            # something downstream raised without going through the
            # documented logging path. Surface defensively at WARN
            # so the regression is visible in test output rather
            # than silently dropped by ``return_exceptions=True``.
            logger.warning(
                BUDGET_PENDING_RECORD_DRAIN_UNEXPECTED,
                error_type=type(outcome).__name__,
                error=safe_error_description(outcome),
            )
    if cancelled_count:
        # Re-raise a CancelledError so the caller's surrounding
        # TaskGroup / context observes the cancellation instead of
        # silently masking it. Specific instance is not preserved
        # because the gather snapshot may hold many; one suffices
        # to propagate the signal.
        raise asyncio.CancelledError

Enforcer

enforcer

Budget enforcement service.

Composes :class:~synthorg.budget.tracker.CostTracker and :class:~synthorg.budget.config.BudgetConfig to provide pre-flight checks, in-flight budget checking, and task-boundary auto-downgrade as described in the Cost Controls section of the Operations design page.

BudgetEnforcer

BudgetEnforcer(
    *,
    budget_config,
    cost_tracker,
    model_resolver=None,
    quota_tracker=None,
    degradation_configs=None,
    risk_tracker=None,
    risk_scorer=None,
    notification_dispatcher=None,
    project_cost_repo=None,
)

Bases: BudgetEnforcerRiskMixin

Budget enforcement: pre-flight, in-flight, and auto-downgrade.

Concurrency-safe via CostTracker's asyncio.Lock. Pre-flight checks are best-effort under concurrency (TOCTOU); the in-flight checker is the true safety net.

Parameters:

Name Type Description Default
budget_config BudgetConfig

Limits and thresholds.

required
cost_tracker CostTracker

Spend queries.

required
model_resolver ModelResolver | None

Auto-downgrade alias lookup.

None
quota_tracker QuotaTracker | None

Provider-level quota enforcement.

None
degradation_configs Mapping[str, DegradationConfig] | None

Per-provider degradation strategies.

None
risk_tracker RiskTracker | None

Optional risk tracking service.

None
risk_scorer RiskScorer | None

Optional risk scoring implementation.

None
notification_dispatcher NotificationDispatcher | None

Optional notification dispatcher.

None
project_cost_repo ProjectCostAggregateRepository | None

Optional durable project cost aggregate repository for lifetime budget enforcement.

None
Source code in src/synthorg/budget/enforcer.py
def __init__(  # noqa: PLR0913
    self,
    *,
    budget_config: BudgetConfig,
    cost_tracker: CostTracker,
    model_resolver: ModelResolver | None = None,
    quota_tracker: QuotaTracker | None = None,
    degradation_configs: Mapping[str, DegradationConfig] | None = None,
    risk_tracker: RiskTracker | None = None,
    risk_scorer: RiskScorer | None = None,
    notification_dispatcher: NotificationDispatcher | None = None,
    project_cost_repo: ProjectCostAggregateRepository | None = None,
) -> None:
    self._budget_config = budget_config
    self._cost_tracker = cost_tracker
    self._model_resolver = model_resolver
    self._quota_tracker = quota_tracker
    self._notification_dispatcher = notification_dispatcher
    self._project_cost_repo = project_cost_repo
    self._degradation_configs: MappingProxyType[str, DegradationConfig] | None = (
        MappingProxyType(copy.deepcopy(dict(degradation_configs)))
        if degradation_configs is not None
        else None
    )
    self._risk_tracker = risk_tracker
    self._risk_scorer = risk_scorer
    self._background_tasks = BackgroundTaskRegistry(owner="budget.enforcer")

    if budget_config.risk_budget.enabled and (
        risk_tracker is None or risk_scorer is None
    ):
        logger.warning(
            RISK_BUDGET_ENFORCEMENT_CHECK,
            reason="risk_budget_enabled_but_missing_deps",
            has_tracker=risk_tracker is not None,
            has_scorer=risk_scorer is not None,
        )

cost_tracker property

cost_tracker

The underlying cost tracker.

risk_tracker property

risk_tracker

The optional risk tracker.

currency property

currency

The configured ISO 4217 currency code.

stop async

stop(*, timeout_sec=_DEFAULT_TIMEOUT_SEC)

Drain pending budget-notification tasks.

Mirrors :meth:ApprovalTimeoutScheduler.stop so the application shutdown path can await outstanding fire-and-forget notifications (monthly / daily exhaustion alerts) rather than dropping them. timeout_sec bounds the wait; on expiry the registry cancels stragglers and logs BACKGROUND_TASKS_DRAIN_TIMEOUT.

Source code in src/synthorg/budget/enforcer.py
async def stop(self, *, timeout_sec: float = _DEFAULT_TIMEOUT_SEC) -> None:
    """Drain pending budget-notification tasks.

    Mirrors :meth:`ApprovalTimeoutScheduler.stop` so the
    application shutdown path can await outstanding
    fire-and-forget notifications (monthly / daily exhaustion
    alerts) rather than dropping them. ``timeout_sec`` bounds
    the wait; on expiry the registry cancels stragglers and
    logs ``BACKGROUND_TASKS_DRAIN_TIMEOUT``.
    """
    await self._background_tasks.drain(timeout_sec=timeout_sec)

get_budget_utilization_pct async

get_budget_utilization_pct()

Return monthly budget utilization as a percentage (0--100+).

Returns None when disabled (total_monthly <= 0) or when the cost query fails (graceful degradation).

Source code in src/synthorg/budget/enforcer.py
async def get_budget_utilization_pct(self) -> float | None:
    """Return monthly budget utilization as a percentage (0--100+).

    Returns ``None`` when disabled (``total_monthly <= 0``) or
    when the cost query fails (graceful degradation).
    """
    cfg = self._budget_config
    if cfg.total_monthly <= 0:
        return None
    try:
        period_start = billing_period_start(cfg.reset_day)
        monthly_cost = await self._cost_tracker.get_total_cost(
            start=period_start,
        )
    except MemoryError, RecursionError:
        raise
    except Exception as exc:
        logger.error(
            BUDGET_UTILIZATION_ERROR,
            reason="falling_back_to_none",
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        return None
    else:
        pct = monthly_cost / cfg.total_monthly * 100
        logger.debug(
            BUDGET_UTILIZATION_QUERIED,
            monthly_cost=monthly_cost,
            total_monthly=cfg.total_monthly,
            utilization_pct=pct,
        )
        return pct

check_can_execute async

check_can_execute(agent_id, *, provider_name=None, estimated_tokens=0)

Pre-flight: verify monthly + daily + quota limits allow execution.

Parameters:

Name Type Description Default
agent_id str

Agent requesting execution.

required
provider_name str | None

Optional provider name for quota checks. When None, quota check is skipped.

None
estimated_tokens int

Estimated tokens for the upcoming request. Forwarded to the quota tracker for token-based checks.

0

Returns:

Type Description
PreFlightResult

Pre-flight result with effective provider info and

PreFlightResult

degradation details when applicable.

Raises:

Type Description
BudgetExhaustedError

Monthly hard stop exceeded.

DailyLimitExceededError

Agent daily limit exceeded (subclass of BudgetExhaustedError).

QuotaExhaustedError

Provider quota exhausted and degradation could not resolve.

Source code in src/synthorg/budget/enforcer.py
async def check_can_execute(
    self,
    agent_id: str,
    *,
    provider_name: str | None = None,
    estimated_tokens: int = 0,
) -> PreFlightResult:
    """Pre-flight: verify monthly + daily + quota limits allow execution.

    Args:
        agent_id: Agent requesting execution.
        provider_name: Optional provider name for quota checks.
            When ``None``, quota check is skipped.
        estimated_tokens: Estimated tokens for the upcoming request.
            Forwarded to the quota tracker for token-based checks.

    Returns:
        Pre-flight result with effective provider info and
        degradation details when applicable.

    Raises:
        BudgetExhaustedError: Monthly hard stop exceeded.
        DailyLimitExceededError: Agent daily limit exceeded
            (subclass of ``BudgetExhaustedError``).
        QuotaExhaustedError: Provider quota exhausted and
            degradation could not resolve.
    """
    cfg = self._budget_config
    degradation_result: DegradationResult | None = None

    try:
        if cfg.total_monthly > 0:
            await self._check_monthly_hard_stop(cfg, agent_id)
        await self._check_daily_limit(cfg, agent_id)

        if provider_name is not None:
            degradation_result = await self._check_provider_quota(
                agent_id,
                provider_name,
                estimated_tokens=estimated_tokens,
            )
    except BudgetExhaustedError:
        raise
    except MemoryError, RecursionError:
        raise
    except Exception as exc:
        logger.error(
            BUDGET_PREFLIGHT_ERROR,
            agent_id=agent_id,
            reason="falling_back_to_allow_execution",
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        return PreFlightResult()

    logger.debug(
        BUDGET_ENFORCEMENT_CHECK,
        agent_id=agent_id,
        result="pass",
    )
    if degradation_result is not None:
        return PreFlightResult(degradation=degradation_result)
    return PreFlightResult()

check_project_budget async

check_project_budget(project_id, project_budget)

Check project-level budget and raise if exceeded.

Returns immediately when project_budget <= 0 (enforcement disabled). Otherwise uses the durable project cost aggregate when available, providing accurate lifetime totals that survive the in-memory tracker's 168-hour retention window. Falls back to in-memory tracking when no aggregate repository is configured or when the aggregate query fails.

Parameters:

Name Type Description Default
project_id NotBlankStr

Project identifier for cost lookup.

required
project_budget float

Total project budget (from Project.budget).

required

Raises:

Type Description
ProjectBudgetExhaustedError

When project spend >= budget.

MemoryError

Re-raised unconditionally.

RecursionError

Re-raised unconditionally.

Source code in src/synthorg/budget/enforcer.py
async def check_project_budget(
    self,
    project_id: NotBlankStr,
    project_budget: float,
) -> None:
    """Check project-level budget and raise if exceeded.

    Returns immediately when ``project_budget <= 0`` (enforcement
    disabled).  Otherwise uses the durable project cost aggregate
    when available, providing accurate lifetime totals that survive
    the in-memory tracker's 168-hour retention window.  Falls back
    to in-memory tracking when no aggregate repository is configured
    or when the aggregate query fails.

    Args:
        project_id: Project identifier for cost lookup.
        project_budget: Total project budget (from Project.budget).

    Raises:
        ProjectBudgetExhaustedError: When project spend >= budget.
        MemoryError: Re-raised unconditionally.
        RecursionError: Re-raised unconditionally.
    """
    if project_budget <= 0:
        return

    project_cost = await self._get_project_cost(project_id)
    if project_cost is None:
        return

    logger.debug(
        BUDGET_PROJECT_ENFORCEMENT_CHECK,
        project_id=project_id,
        project_cost=project_cost,
        project_budget=project_budget,
    )

    if project_cost >= project_budget:
        logger.warning(
            BUDGET_PROJECT_BUDGET_EXCEEDED,
            project_id=project_id,
            project_cost=project_cost,
            project_budget=project_budget,
        )
        _fmt = format_cost
        _cur = self._budget_config.currency
        msg = (
            f"Project {project_id!r} budget exhausted: "
            f"{_fmt(project_cost, _cur)} >= "
            f"{_fmt(project_budget, _cur)}"
        )
        raise ProjectBudgetExhaustedError(
            msg,
            project_id=project_id,
            project_budget=project_budget,
            project_spent=project_cost,
        )

check_quota async

check_quota(provider_name, *, estimated_tokens=0)

Check provider quota via QuotaTracker.

Returns always-allowed when no quota tracker is configured.

Note

Unlike check_can_execute, this method does not catch unexpected exceptions from the underlying QuotaTracker. check_can_execute wraps quota checks in a try/except that falls back to allowing execution on unexpected errors (graceful degradation), but direct callers of check_quota are responsible for their own error handling.

Parameters:

Name Type Description Default
provider_name str

Provider to check.

required
estimated_tokens int

Estimated tokens for the request.

0

Returns:

Type Description
QuotaCheckResult

Quota check result.

Source code in src/synthorg/budget/enforcer.py
async def check_quota(
    self,
    provider_name: str,
    *,
    estimated_tokens: int = 0,
) -> QuotaCheckResult:
    """Check provider quota via QuotaTracker.

    Returns always-allowed when no quota tracker is configured.

    Note:
        Unlike ``check_can_execute``, this method does **not**
        catch unexpected exceptions from the underlying
        ``QuotaTracker``.  ``check_can_execute`` wraps quota
        checks in a try/except that falls back to allowing
        execution on unexpected errors (graceful degradation),
        but direct callers of ``check_quota`` are responsible
        for their own error handling.

    Args:
        provider_name: Provider to check.
        estimated_tokens: Estimated tokens for the request.

    Returns:
        Quota check result.
    """
    if self._quota_tracker is None:
        logger.debug(
            QUOTA_CHECK_ALLOWED,
            provider=provider_name,
            reason="no_quota_tracker",
        )
        return always_allowed_result(provider_name)

    return await self._quota_tracker.check_quota(
        provider_name,
        estimated_tokens=estimated_tokens,
    )

resolve_model async

resolve_model(identity)

Apply auto-downgrade at task boundary if threshold exceeded.

Returns identity unchanged when downgrade is disabled, not applicable, or lookup fails. Returns new AgentIdentity with downgraded ModelConfig otherwise.

Source code in src/synthorg/budget/enforcer.py
async def resolve_model(
    self,
    identity: AgentIdentity,
) -> AgentIdentity:
    """Apply auto-downgrade at task boundary if threshold exceeded.

    Returns identity unchanged when downgrade is disabled, not
    applicable, or lookup fails.  Returns new ``AgentIdentity``
    with downgraded ``ModelConfig`` otherwise.
    """
    cfg = self._budget_config
    downgrade = cfg.auto_downgrade

    if (
        not downgrade.enabled
        or cfg.total_monthly <= 0
        or self._model_resolver is None
    ):
        return identity

    try:
        period_start = billing_period_start(cfg.reset_day)
        monthly_cost = await self._cost_tracker.get_total_cost(
            start=period_start,
        )
    except MemoryError, RecursionError:  # builtin MemoryError (OOM)
        raise
    except Exception as exc:
        logger.error(
            BUDGET_RESOLVE_MODEL_ERROR,
            agent_id=str(identity.id),
            reason="cost_tracker_query_failed",
            error_type=type(exc).__name__,
            error=safe_error_description(exc),
        )
        return identity

    used_pct = round(
        monthly_cost / cfg.total_monthly * 100,
        BUDGET_ROUNDING_PRECISION,
    )

    if used_pct < downgrade.threshold:
        return identity

    return _apply_downgrade(
        identity,
        self._model_resolver,
        downgrade.downgrade_map,
        used_pct,
        downgrade.threshold,
    )

make_budget_checker async

make_budget_checker(task, agent_id, *, project_id=None, project_budget=0.0)

Create a sync BudgetChecker with pre-computed baselines.

Checks task limit, monthly total, agent daily limit, and optionally project budget. Baselines are snapshot-in-time (TOCTOU acceptable). Returns None when all limits are disabled.

Parameters:

Name Type Description Default
task Task

The task being executed.

required
agent_id str

Agent identifier.

required
project_id NotBlankStr | None

Optional project ID for project budget checks.

None
project_budget float

Total project budget (0 = disabled).

0.0
Source code in src/synthorg/budget/enforcer.py
async def make_budget_checker(
    self,
    task: Task,
    agent_id: str,
    *,
    project_id: NotBlankStr | None = None,
    project_budget: float = 0.0,
) -> BudgetChecker | None:
    """Create a sync BudgetChecker with pre-computed baselines.

    Checks task limit, monthly total, agent daily limit, and
    optionally project budget.  Baselines are snapshot-in-time
    (TOCTOU acceptable).  Returns ``None`` when all limits are
    disabled.

    Args:
        task: The task being executed.
        agent_id: Agent identifier.
        project_id: Optional project ID for project budget checks.
        project_budget: Total project budget (0 = disabled).
    """
    cfg = self._budget_config
    task_limit = task.budget_limit
    monthly_budget = cfg.total_monthly
    daily_limit = cfg.per_agent_daily_limit

    # All enforcement disabled.
    if (
        monthly_budget <= 0
        and task_limit <= 0
        and daily_limit <= 0
        and project_budget <= 0
    ):
        return None

    monthly_baseline, daily_baseline = await self._compute_baselines_safe(
        cfg,
        monthly_budget,
        daily_limit,
        agent_id,
    )

    project_baseline = 0.0
    if project_id is not None and project_budget > 0:
        baseline = await self._get_project_cost(
            project_id,
            error_event=BUDGET_BASELINE_ERROR,
        )
        if baseline is not None:
            project_baseline = baseline

    thresholds = _compute_thresholds(cfg, monthly_budget)

    return _build_checker_closure(
        task_limit=task_limit,
        monthly_budget=monthly_budget,
        daily_limit=daily_limit,
        monthly_baseline=monthly_baseline,
        daily_baseline=daily_baseline,
        thresholds=thresholds,
        agent_id=agent_id,
        project_budget=project_budget,
        project_baseline=project_baseline,
        project_id=project_id or None,
    )

Quota

quota

Quota and subscription models for provider cost tracking.

Defines quota windows, subscription configurations, degradation strategies, and quota check result models for providers that operate under subscription plans, local deployments, or pay-as-you-go billing.

QuotaWindow

Bases: StrEnum

Time window for quota enforcement.

QuotaLimit pydantic-model

Bases: BaseModel

A single quota limit for a time window.

Attributes:

Name Type Description
window QuotaWindow

Time window for this limit.

max_requests int

Maximum requests in the window (0 = unlimited).

max_tokens int

Maximum tokens in the window (0 = unlimited).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _at_least_one_limit

window pydantic-field

window

Time window for this limit

max_requests pydantic-field

max_requests = 0

Maximum requests in the window (0 = unlimited)

max_tokens pydantic-field

max_tokens = 0

Maximum tokens in the window (0 = unlimited)

ProviderCostModel

Bases: StrEnum

How a provider charges for usage.

Members

PER_TOKEN: Standard pay-as-you-go; cost computed from cost_per_1k_input/output. SUBSCRIPTION: Monthly flat fee; individual calls are pre-paid. LOCAL: Zero monetary cost; only hardware constraints.

SubscriptionConfig pydantic-model

Bases: BaseModel

Subscription and quota configuration for a provider.

Attributes:

Name Type Description
plan_name NotBlankStr

Name of the subscription plan.

cost_model ProviderCostModel

How the provider charges for usage.

monthly_cost float

Fixed monthly subscription fee in the configured currency.

quotas tuple[QuotaLimit, ...]

Rate/token/request limits per time window.

hardware_limits str | None

Free-text hardware constraints for local models.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_quotas_unique_windows
  • _validate_cost_model_constraints

plan_name pydantic-field

plan_name = 'pay_as_you_go'

Subscription plan name

cost_model pydantic-field

cost_model = PER_TOKEN

How the provider charges for usage

monthly_cost pydantic-field

monthly_cost = 0.0

Fixed monthly subscription fee in the configured currency

quotas pydantic-field

quotas = ()

Rate/token/request limits per time window

hardware_limits pydantic-field

hardware_limits = None

Free-text hardware constraints for local models

DegradationAction

Bases: StrEnum

Action to take when a provider's quota is exhausted.

Members

FALLBACK: Route to a fallback provider. QUEUE: Wait for quota window reset, then retry. ALERT: Raise error and alert user.

DegradationConfig pydantic-model

Bases: BaseModel

Configuration for graceful degradation when quota is exhausted.

Attributes:

Name Type Description
strategy DegradationAction

What to do when quota is exhausted.

fallback_providers tuple[NotBlankStr, ...]

Ordered fallback provider names.

queue_max_wait_seconds int

Max seconds to wait when queueing.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_fallback_providers

strategy pydantic-field

strategy = ALERT

Degradation strategy when quota exhausted

fallback_providers pydantic-field

fallback_providers = ()

Ordered fallback provider names

queue_max_wait_seconds pydantic-field

queue_max_wait_seconds = 300

Max wait seconds when queueing

QuotaSnapshot pydantic-model

Bases: BaseModel

Point-in-time snapshot of quota usage for a provider window.

Attributes:

Name Type Description
provider_name NotBlankStr

Provider this snapshot belongs to.

window QuotaWindow

Time window for this snapshot.

requests_used int

Requests consumed in this window.

requests_limit int

Maximum requests allowed (0 = unlimited).

tokens_used int

Tokens consumed in this window.

tokens_limit int

Maximum tokens allowed (0 = unlimited).

window_resets_at datetime | None

When the current window resets.

captured_at datetime

When this snapshot was captured.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

provider_name pydantic-field

provider_name

Provider name

window pydantic-field

window

Time window

requests_used pydantic-field

requests_used = 0

Requests used

requests_limit pydantic-field

requests_limit = 0

Requests limit (0 = unlimited)

tokens_used pydantic-field

tokens_used = 0

Tokens used

tokens_limit pydantic-field

tokens_limit = 0

Tokens limit (0 = unlimited)

window_resets_at pydantic-field

window_resets_at = None

When the current window resets

captured_at pydantic-field

captured_at

When snapshot was captured

requests_remaining property

requests_remaining

Remaining requests in this window.

Returns None when the limit is not enforced (unlimited). Returns 0 when fully consumed.

tokens_remaining property

tokens_remaining

Remaining tokens in this window.

Returns None when the limit is not enforced (unlimited). Returns 0 when fully consumed.

is_exhausted property

is_exhausted

Whether any enforced limit in this window is exhausted.

QuotaCheckResult pydantic-model

Bases: BaseModel

Result of a pre-flight quota check.

Attributes:

Name Type Description
allowed bool

Whether the request is allowed.

provider_name NotBlankStr

Provider that was checked.

reason str

Human-readable reason (set when denied).

exhausted_windows tuple[QuotaWindow, ...]

Which windows are exhausted (if any).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_denied_has_reason

allowed pydantic-field

allowed

Whether the request is allowed

provider_name pydantic-field

provider_name

Provider checked

reason pydantic-field

reason = ''

Reason (set when denied)

exhausted_windows pydantic-field

exhausted_windows = ()

Exhausted windows

always_allowed_result

always_allowed_result(provider_name)

Build an always-allowed QuotaCheckResult.

Source code in src/synthorg/budget/quota.py
def always_allowed_result(provider_name: NotBlankStr) -> QuotaCheckResult:
    """Build an always-allowed QuotaCheckResult."""
    return QuotaCheckResult(
        allowed=True,
        provider_name=provider_name,
    )

window_start

window_start(window, *, now=None)

Compute the UTC-aware start of the current quota window.

Parameters:

Name Type Description Default
window QuotaWindow

Which time window to compute.

required
now datetime | None

Reference timestamp. Defaults to datetime.now(UTC). Must be timezone-aware; naive datetimes are rejected.

None

Returns:

Type Description
datetime

UTC-aware datetime at the start of the current window.

Raises:

Type Description
ValueError

If now is a naive (timezone-unaware) datetime.

Source code in src/synthorg/budget/quota.py
def window_start(
    window: QuotaWindow,
    *,
    now: datetime | None = None,
) -> datetime:
    """Compute the UTC-aware start of the current quota window.

    Args:
        window: Which time window to compute.
        now: Reference timestamp. Defaults to ``datetime.now(UTC)``.
            Must be timezone-aware; naive datetimes are rejected.

    Returns:
        UTC-aware datetime at the start of the current window.

    Raises:
        ValueError: If *now* is a naive (timezone-unaware) datetime.
    """
    if now is None:
        now = datetime.now(UTC)
    elif now.tzinfo is None:
        msg = "now must be timezone-aware, got naive datetime"
        logger.warning(
            CONFIG_VALIDATION_FAILED,
            model="window_start",
            field="now",
            reason=msg,
        )
        raise ValueError(msg)
    else:
        now = now.astimezone(UTC)

    if window == QuotaWindow.PER_MINUTE:
        return datetime(
            now.year,
            now.month,
            now.day,
            now.hour,
            now.minute,
            tzinfo=UTC,
        )
    if window == QuotaWindow.PER_HOUR:
        return datetime(
            now.year,
            now.month,
            now.day,
            now.hour,
            tzinfo=UTC,
        )
    if window == QuotaWindow.PER_DAY:
        return datetime(
            now.year,
            now.month,
            now.day,
            tzinfo=UTC,
        )
    # PER_MONTH -- first day of the month
    return datetime(now.year, now.month, 1, tzinfo=UTC)

effective_cost_per_1k

effective_cost_per_1k(cost_per_1k_input, cost_per_1k_output, cost_model)

Compute effective cost per 1k tokens based on cost model.

Returns 0.0 for SUBSCRIPTION and LOCAL models (pre-paid / free). Returns cost_per_1k_input + cost_per_1k_output for PER_TOKEN.

Parameters:

Name Type Description Default
cost_per_1k_input float

Cost per 1k input tokens.

required
cost_per_1k_output float

Cost per 1k output tokens.

required
cost_model ProviderCostModel

The provider's cost model.

required

Returns:

Type Description
float

Effective cost per 1k tokens.

Source code in src/synthorg/budget/quota.py
def effective_cost_per_1k(
    cost_per_1k_input: float,
    cost_per_1k_output: float,
    cost_model: ProviderCostModel,
) -> float:
    """Compute effective cost per 1k tokens based on cost model.

    Returns 0.0 for SUBSCRIPTION and LOCAL models (pre-paid / free).
    Returns ``cost_per_1k_input + cost_per_1k_output`` for PER_TOKEN.

    Args:
        cost_per_1k_input: Cost per 1k input tokens.
        cost_per_1k_output: Cost per 1k output tokens.
        cost_model: The provider's cost model.

    Returns:
        Effective cost per 1k tokens.
    """
    if cost_model in (ProviderCostModel.SUBSCRIPTION, ProviderCostModel.LOCAL):
        return 0.0
    return cost_per_1k_input + cost_per_1k_output

quota_tracker

Quota tracking service.

Tracks per-provider request and token usage against configured quota windows. Window-based counters are rotated automatically when a window boundary is crossed.

Concurrency-safe via asyncio.Lock (same pattern as :class:~synthorg.budget.tracker.CostTracker).

QuotaTracker

QuotaTracker(*, subscriptions)

Tracks per-provider quota usage across configured time windows.

Providers without a subscription config are silently ignored (no-op on record, always allowed on check).

Note

check_quota followed by record_usage is subject to a TOCTOU gap: another coroutine could record usage between the check and the record, pushing the counter past the limit. This is by-design for the single-loop asyncio concurrency model -- the gap only exists across await points and is acceptable for quota enforcement (the in-flight budget checker is the true safety net).

Parameters:

Name Type Description Default
subscriptions Mapping[str, SubscriptionConfig]

Mapping of provider name to subscription config.

required
Source code in src/synthorg/budget/quota_tracker.py
def __init__(
    self,
    *,
    subscriptions: Mapping[str, SubscriptionConfig],
) -> None:
    self._subscriptions: dict[str, SubscriptionConfig] = copy.deepcopy(
        dict(subscriptions),
    )
    self._lock = asyncio.Lock()
    self._loop: asyncio.AbstractEventLoop | None = None
    self._usage: dict[str, dict[QuotaWindow, _WindowUsage]] = {}

    # Initialize usage tracking for providers with quotas
    for provider_name, sub_config in self._subscriptions.items():
        if sub_config.quotas:
            self._usage[provider_name] = {}
            for quota in sub_config.quotas:
                ws = window_start(quota.window)
                self._usage[provider_name][quota.window] = _WindowUsage(
                    requests=0,
                    tokens=0,
                    window_start=ws,
                )

    logger.debug(
        QUOTA_TRACKER_CREATED,
        provider_count=len(self._subscriptions),
        tracked_providers=sorted(self._usage),
    )

record_usage async

record_usage(provider_name, *, requests=1, tokens=0)

Record usage against all configured windows for a provider.

Rotates window counters if a window boundary has been crossed. Providers with no subscription config are skipped with a DEBUG log.

Parameters:

Name Type Description Default
provider_name str

Provider to record usage for.

required
requests int

Number of requests to record (must be >= 0).

1
tokens int

Number of tokens to record (must be >= 0).

0

Raises:

Type Description
ValueError

If requests or tokens is negative.

Source code in src/synthorg/budget/quota_tracker.py
async def record_usage(
    self,
    provider_name: str,
    *,
    requests: int = 1,
    tokens: int = 0,
) -> None:
    """Record usage against all configured windows for a provider.

    Rotates window counters if a window boundary has been crossed.
    Providers with no subscription config are skipped with a DEBUG log.

    Args:
        provider_name: Provider to record usage for.
        requests: Number of requests to record (must be >= 0).
        tokens: Number of tokens to record (must be >= 0).

    Raises:
        ValueError: If requests or tokens is negative.
    """
    if requests < 0:
        msg = f"requests must be non-negative, got {requests}"
        logger.warning(
            QUOTA_USAGE_SKIPPED,
            provider=provider_name,
            reason=msg,
        )
        raise ValueError(msg)
    if tokens < 0:
        msg = f"tokens must be non-negative, got {tokens}"
        logger.warning(
            QUOTA_USAGE_SKIPPED,
            provider=provider_name,
            reason=msg,
        )
        raise ValueError(msg)

    if provider_name not in self._usage:
        reason = (
            "no_quotas_configured"
            if provider_name in self._subscriptions
            else "unknown_provider"
        )
        logger.debug(
            QUOTA_USAGE_SKIPPED,
            provider=provider_name,
            reason=reason,
        )
        return

    self._capture_loop()
    async with self._lock:
        now = datetime.now(UTC)
        provider_usage = self._usage[provider_name]

        for window_type in list(provider_usage):
            current = provider_usage[window_type]
            expected_start = window_start(window_type, now=now)

            if expected_start != current.window_start:
                # Window boundary crossed -- rotate
                provider_usage[window_type] = _WindowUsage(
                    requests=requests,
                    tokens=tokens,
                    window_start=expected_start,
                )
                logger.debug(
                    QUOTA_WINDOW_ROTATED,
                    provider=provider_name,
                    window=window_type.value,
                    old_start=str(current.window_start),
                    new_start=str(expected_start),
                )
            else:
                provider_usage[window_type] = _WindowUsage(
                    requests=current.requests + requests,
                    tokens=current.tokens + tokens,
                    window_start=current.window_start,
                )

        logger.debug(
            QUOTA_USAGE_RECORDED,
            provider=provider_name,
            requests=requests,
            tokens=tokens,
        )

check_quota async

check_quota(provider_name, *, estimated_tokens=0)

Pre-flight check: can this provider handle a request?

Providers with no subscription config always return allowed.

Parameters:

Name Type Description Default
provider_name str

Provider to check.

required
estimated_tokens int

Estimated tokens for the request (must be >= 0).

0

Returns:

Type Description
QuotaCheckResult

Check result with allowed status and reason.

Raises:

Type Description
ValueError

If estimated_tokens is negative.

Source code in src/synthorg/budget/quota_tracker.py
async def check_quota(
    self,
    provider_name: str,
    *,
    estimated_tokens: int = 0,
) -> QuotaCheckResult:
    """Pre-flight check: can this provider handle a request?

    Providers with no subscription config always return allowed.

    Args:
        provider_name: Provider to check.
        estimated_tokens: Estimated tokens for the request (must be >= 0).

    Returns:
        Check result with allowed status and reason.

    Raises:
        ValueError: If estimated_tokens is negative.
    """
    if estimated_tokens < 0:
        msg = f"estimated_tokens must be non-negative, got {estimated_tokens}"
        logger.warning(
            QUOTA_CHECK_DENIED,
            provider=provider_name,
            reason=msg,
        )
        raise ValueError(msg)

    if provider_name not in self._usage:
        reason = (
            "no_quotas_configured"
            if provider_name in self._subscriptions
            else "unknown_provider"
        )
        logger.debug(
            QUOTA_CHECK_ALLOWED,
            provider=provider_name,
            reason=reason,
        )
        return QuotaCheckResult(
            allowed=True,
            provider_name=provider_name,
        )

    sub_config = self._subscriptions[provider_name]
    quota_map = {q.window: q for q in sub_config.quotas}

    self._capture_loop()
    async with self._lock:
        now = datetime.now(UTC)
        provider_usage = self._usage[provider_name]
        exhausted: list[QuotaWindow] = []
        reasons: list[str] = []

        for window_type, usage in provider_usage.items():
            expected_start = window_start(window_type, now=now)

            # If window has rotated, counters would be zero
            if expected_start != usage.window_start:
                continue

            quota = quota_map.get(window_type)
            if quota is None:
                continue

            if _is_window_exhausted(
                usage,
                quota,
                estimated_tokens,
            ):
                exhausted.append(window_type)
                reasons.append(
                    _build_exhaustion_reason(
                        provider_name,
                        window_type,
                        usage,
                        quota,
                        estimated_tokens,
                    ),
                )

    if exhausted:
        result = QuotaCheckResult(
            allowed=False,
            provider_name=provider_name,
            reason="; ".join(reasons),
            exhausted_windows=tuple(exhausted),
        )
        logger.info(
            QUOTA_CHECK_DENIED,
            provider=provider_name,
            exhausted_windows=[w.value for w in exhausted],
            reason=result.reason,
        )
        return result

    logger.debug(
        QUOTA_CHECK_ALLOWED,
        provider=provider_name,
    )
    return QuotaCheckResult(
        allowed=True,
        provider_name=provider_name,
    )

get_snapshot async

get_snapshot(provider_name, window=None)

Get current usage snapshots for a provider.

Parameters:

Name Type Description Default
provider_name str

Provider to query.

required
window QuotaWindow | None

Optional specific window to query. If None, returns all windows.

None

Returns:

Type Description
tuple[QuotaSnapshot, ...]

Tuple of quota snapshots.

Source code in src/synthorg/budget/quota_tracker.py
async def get_snapshot(
    self,
    provider_name: str,
    window: QuotaWindow | None = None,
) -> tuple[QuotaSnapshot, ...]:
    """Get current usage snapshots for a provider.

    Args:
        provider_name: Provider to query.
        window: Optional specific window to query. If ``None``,
            returns all windows.

    Returns:
        Tuple of quota snapshots.
    """
    if provider_name not in self._usage:
        reason = (
            "no_quotas_configured"
            if provider_name in self._subscriptions
            else "unknown_provider"
        )
        logger.debug(
            QUOTA_SNAPSHOT_QUERIED,
            provider=provider_name,
            snapshot_count=0,
            reason=reason,
        )
        return ()

    sub_config = self._subscriptions[provider_name]
    quota_map = {q.window: q for q in sub_config.quotas}

    self._capture_loop()
    async with self._lock:
        now = datetime.now(UTC)
        snapshots: list[QuotaSnapshot] = []
        provider_usage = self._usage[provider_name]

        for window_type, usage in provider_usage.items():
            if window is not None and window_type != window:
                continue

            quota = quota_map.get(window_type)
            if quota is None:
                continue

            expected_start = window_start(window_type, now=now)
            # If window has rotated, show zero usage
            if expected_start != usage.window_start:
                req_used = 0
                tok_used = 0
            else:
                req_used = usage.requests
                tok_used = usage.tokens

            snapshots.append(
                QuotaSnapshot(
                    provider_name=provider_name,
                    window=window_type,
                    requests_used=req_used,
                    requests_limit=quota.max_requests,
                    tokens_used=tok_used,
                    tokens_limit=quota.max_tokens,
                    window_resets_at=_window_end(
                        window_type,
                        expected_start,
                    ),
                    captured_at=now,
                ),
            )

    logger.debug(
        QUOTA_SNAPSHOT_QUERIED,
        provider=provider_name,
        snapshot_count=len(snapshots),
    )
    return tuple(snapshots)

get_all_snapshots async

get_all_snapshots()

Get usage snapshots for all tracked providers.

Note

Snapshots are collected per-provider with separate lock acquisitions, so cross-provider consistency is not guaranteed under concurrent writes.

Returns:

Type Description
dict[str, tuple[QuotaSnapshot, ...]]

Dict mapping provider name to tuple of snapshots.

Source code in src/synthorg/budget/quota_tracker.py
async def get_all_snapshots(
    self,
) -> dict[str, tuple[QuotaSnapshot, ...]]:
    """Get usage snapshots for all tracked providers.

    Note:
        Snapshots are collected per-provider with separate lock
        acquisitions, so cross-provider consistency is not guaranteed
        under concurrent writes.

    Returns:
        Dict mapping provider name to tuple of snapshots.
    """
    result: dict[str, tuple[QuotaSnapshot, ...]] = {}
    for provider_name in self._usage:
        result[provider_name] = await self.get_snapshot(provider_name)
    return result

peek_quota_available

peek_quota_available()

Synchronous snapshot of per-provider quota availability.

Reads cached counters without acquiring the lock. This is safe when called synchronously from the asyncio event loop thread (no concurrent mutations during synchronous execution) and acceptable for heuristic selection decisions where TOCTOU is tolerated. Do not call from a thread-pool executor or from outside the event loop -- that would race with record_usage.

Providers without configured quotas are excluded from the result (they are always allowed and do not need selection guidance).

Returns:

Type Description
dict[str, bool]

Dict mapping provider name to availability status.

Raises:

Type Description
RuntimeError

If called from a different event loop than the one used by this tracker's async methods, or from outside an event loop when async methods have already been called (indicating a thread-pool executor context).

Source code in src/synthorg/budget/quota_tracker.py
def peek_quota_available(self) -> dict[str, bool]:
    """Synchronous snapshot of per-provider quota availability.

    Reads cached counters **without acquiring the lock**.  This is
    safe when called synchronously from the ``asyncio`` event loop
    thread (no concurrent mutations during synchronous execution)
    and acceptable for heuristic selection decisions where TOCTOU
    is tolerated.  Do **not** call from a thread-pool executor or
    from outside the event loop -- that would race with
    ``record_usage``.

    Providers without configured quotas are excluded from the
    result (they are always allowed and do not need selection
    guidance).

    Returns:
        Dict mapping provider name to availability status.

    Raises:
        RuntimeError: If called from a different event loop than
            the one used by this tracker's async methods, or from
            outside an event loop when async methods have already
            been called (indicating a thread-pool executor context).
    """
    self._assert_loop_safe()
    now = datetime.now(UTC)
    result: dict[str, bool] = {}

    for provider_name, provider_usage in self._usage.items():
        sub_config = self._subscriptions.get(provider_name)
        if sub_config is None:
            continue
        quota_map = {q.window: q for q in sub_config.quotas}
        exhausted = False

        for window_type, usage in provider_usage.items():
            expected_start = window_start(window_type, now=now)
            if expected_start != usage.window_start:
                continue  # Window rotated -- treat as zero usage
            quota = quota_map.get(window_type)
            if quota is None:
                continue
            # Use estimated_tokens=1 so providers at exactly the
            # token cap are treated as exhausted (zero headroom).
            if _is_window_exhausted(usage, quota, estimated_tokens=1):
                exhausted = True
                break

        result[provider_name] = not exhausted

    return result

Billing

billing

Billing period computation utilities.

Pure functions for determining billing period boundaries based on a configurable reset day. Used by :class:~synthorg.budget.enforcer.BudgetEnforcer to scope cost queries to the current billing cycle.

billing_period_start

billing_period_start(reset_day, *, now=None)

Compute the UTC-aware start of the current billing period.

If now.day >= reset_day, returns current month's reset_day at 00:00 UTC. Otherwise, returns previous month's reset_day at 00:00 UTC.

Parameters:

Name Type Description Default
reset_day int

Day of month when the billing period resets (1-28).

required
now datetime | None

Reference timestamp. Defaults to datetime.now(UTC).

None

Returns:

Type Description
datetime

UTC-aware datetime at midnight on the billing period start day.

Raises:

Type Description
ValueError

If reset_day is not in [1, 28].

Source code in src/synthorg/budget/billing.py
def billing_period_start(
    reset_day: int,
    *,
    now: datetime | None = None,
) -> datetime:
    """Compute the UTC-aware start of the current billing period.

    If ``now.day >= reset_day``, returns current month's ``reset_day``
    at 00:00 UTC.  Otherwise, returns previous month's ``reset_day``
    at 00:00 UTC.

    Args:
        reset_day: Day of month when the billing period resets (1-28).
        now: Reference timestamp.  Defaults to ``datetime.now(UTC)``.

    Returns:
        UTC-aware datetime at midnight on the billing period start day.

    Raises:
        ValueError: If ``reset_day`` is not in ``[1, 28]``.
    """
    if not _RESET_DAY_MIN <= reset_day <= _RESET_DAY_MAX:
        msg = f"reset_day must be {_RESET_DAY_MIN}-{_RESET_DAY_MAX}, got {reset_day}"
        raise ValueError(msg)

    if now is None:
        now = datetime.now(UTC)

    if now.day >= reset_day:
        return datetime(now.year, now.month, reset_day, tzinfo=UTC)

    # Roll back to previous month
    if now.month == _JANUARY_MONTH:
        return datetime(now.year - 1, _DECEMBER_MONTH, reset_day, tzinfo=UTC)
    return datetime(now.year, now.month - 1, reset_day, tzinfo=UTC)

daily_period_start

daily_period_start(*, now=None)

Compute the UTC-aware start of today (midnight UTC).

Parameters:

Name Type Description Default
now datetime | None

Reference timestamp. Defaults to datetime.now(UTC).

None

Returns:

Type Description
datetime

UTC-aware datetime at midnight of the current day.

Source code in src/synthorg/budget/billing.py
def daily_period_start(*, now: datetime | None = None) -> datetime:
    """Compute the UTC-aware start of today (midnight UTC).

    Args:
        now: Reference timestamp.  Defaults to ``datetime.now(UTC)``.

    Returns:
        UTC-aware datetime at midnight of the current day.
    """
    if now is None:
        now = datetime.now(UTC)
    return datetime(now.year, now.month, now.day, tzinfo=UTC)

Optimizer

optimizer

CFO cost optimization service.

Provides spending anomaly detection, cost efficiency analysis, model downgrade recommendations, routing optimization suggestions, and operation approval decisions. Composes :class:~synthorg.budget.tracker.CostTracker and :class:~synthorg.budget.config.BudgetConfig for read-only analytical queries -- the advisory complement to :class:~synthorg.budget.enforcer.BudgetEnforcer.

Service layer backing the CFO role (see Operations design page).

CostOptimizer

CostOptimizer(*, cost_tracker, budget_config, config=None, model_resolver=None)

CFO analytical service for cost optimization.

Composes CostTracker and BudgetConfig for read-only analysis: anomaly detection, efficiency analysis, downgrade recommendations, routing optimization suggestions, and operation approval evaluation.

Parameters:

Name Type Description Default
cost_tracker CostTracker

Cost tracking service for querying spend.

required
budget_config BudgetConfig

Budget configuration for limits and thresholds.

required
config CostOptimizerConfig | None

Optimizer-specific configuration. Defaults to CostOptimizerConfig() when None.

None
model_resolver ModelResolver | None

Optional model resolver for downgrade and routing optimization recommendations.

None
Source code in src/synthorg/budget/optimizer.py
def __init__(
    self,
    *,
    cost_tracker: CostTracker,
    budget_config: BudgetConfig,
    config: CostOptimizerConfig | None = None,
    model_resolver: ModelResolver | None = None,
) -> None:
    self._cost_tracker = cost_tracker
    self._budget_config = budget_config
    self._config = config or CostOptimizerConfig()
    self._model_resolver = model_resolver
    logger.debug(
        CFO_OPTIMIZER_CREATED,
        has_model_resolver=model_resolver is not None,
        anomaly_sigma=self._config.anomaly_sigma_threshold,
    )

detect_anomalies async

detect_anomalies(*, start, end, window_count=_DEFAULT_WINDOW_COUNT)

Detect spending anomalies in the given period.

Divides [start, end) into window_count equal windows, groups records by agent, and flags agents whose last-window spending deviates significantly from their historical mean.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required
window_count int

Number of time windows to divide the period into. Must be >= 2 and <= 1000.

_DEFAULT_WINDOW_COUNT

Returns:

Type Description
AnomalyDetectionResult

Anomaly detection result with any detected anomalies.

Raises:

Type Description
ValueError

If start >= end, window_count < 2, or window_count > 1000.

Source code in src/synthorg/budget/optimizer.py
async def detect_anomalies(
    self,
    *,
    start: datetime,
    end: datetime,
    window_count: int = _DEFAULT_WINDOW_COUNT,
) -> AnomalyDetectionResult:
    """Detect spending anomalies in the given period.

    Divides ``[start, end)`` into ``window_count`` equal windows,
    groups records by agent, and flags agents whose last-window
    spending deviates significantly from their historical mean.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.
        window_count: Number of time windows to divide the period
            into.  Must be >= 2 and <= 1000.

    Returns:
        Anomaly detection result with any detected anomalies.

    Raises:
        ValueError: If ``start >= end``, ``window_count < 2``, or
            ``window_count > 1000``.
    """
    if start >= end:
        logger.warning(
            CFO_ANOMALY_SCAN_COMPLETE,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)
    if window_count < 2:  # noqa: PLR2004
        logger.warning(
            CFO_ANOMALY_SCAN_COMPLETE,
            error="window_count_below_minimum",
            window_count=window_count,
        )
        msg = f"window_count must be >= 2, got {window_count}"
        raise ValueError(msg)
    if window_count > _MAX_WINDOW_COUNT:
        logger.warning(
            CFO_ANOMALY_SCAN_COMPLETE,
            error="window_count_above_maximum",
            window_count=window_count,
        )
        msg = f"window_count must be <= {_MAX_WINDOW_COUNT}, got {window_count}"
        raise ValueError(msg)

    now = datetime.now(UTC)
    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )

    total_duration = end - start
    window_duration = total_duration / window_count
    window_starts = tuple(start + window_duration * i for i in range(window_count))

    # Pre-group records by agent for O(N+M) complexity.
    by_agent = group_by_agent(records)
    agent_ids = sorted(by_agent)
    anomalies: list[SpendingAnomaly] = []

    for agent_id in agent_ids:
        window_costs = _compute_window_costs(
            by_agent[agent_id],
            window_starts,
            window_duration,
        )
        anomaly = _detect_spike_anomaly(
            agent_id,
            window_costs,
            now,
            window_starts,
            window_duration,
            self._config,
            currency=self._budget_config.currency,
        )
        if anomaly is not None:
            logger.warning(
                CFO_ANOMALY_DETECTED,
                agent_id=agent_id,
                anomaly_type=anomaly.anomaly_type.value,
                severity=anomaly.severity.value,
                deviation_factor=anomaly.deviation_factor,
            )
            anomalies.append(anomaly)

    result = AnomalyDetectionResult(
        anomalies=tuple(anomalies),
        scan_period_start=start,
        scan_period_end=end,
        agents_scanned=len(agent_ids),
        scan_timestamp=now,
    )

    logger.info(
        CFO_ANOMALY_SCAN_COMPLETE,
        anomaly_count=len(anomalies),
        agents_scanned=len(agent_ids),
    )

    return result

analyze_efficiency async

analyze_efficiency(*, start, end)

Analyze cost efficiency of all agents in the period.

Computes cost-per-1k-tokens for each agent and rates them relative to the global average.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required

Returns:

Type Description
EfficiencyAnalysis

Efficiency analysis with per-agent ratings.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/optimizer.py
async def analyze_efficiency(
    self,
    *,
    start: datetime,
    end: datetime,
) -> EfficiencyAnalysis:
    """Analyze cost efficiency of all agents in the period.

    Computes cost-per-1k-tokens for each agent and rates them
    relative to the global average.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.

    Returns:
        Efficiency analysis with per-agent ratings.

    Raises:
        ValueError: If ``start >= end``.
    """
    if start >= end:
        logger.warning(
            CFO_EFFICIENCY_ANALYSIS_COMPLETE,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)

    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )

    result = _build_efficiency_from_records(
        records,
        start=start,
        end=end,
        threshold_factor=self._config.inefficiency_threshold_factor,
        lower_bound_factor=self._config.efficiency_lower_bound_factor,
    )

    logger.info(
        CFO_EFFICIENCY_ANALYSIS_COMPLETE,
        agent_count=len(result.agents),
        inefficient_count=result.inefficient_agent_count,
        global_avg_cost_per_1k=result.global_avg_cost_per_1k,
    )

    return result

recommend_downgrades async

recommend_downgrades(*, start, end)

Recommend model downgrades for inefficient agents.

Runs efficiency analysis and uses the model resolver and downgrade map to find cheaper alternatives.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required

Returns:

Type Description
DowngradeAnalysis

Downgrade analysis with recommendations. Empty when no

DowngradeAnalysis

model_resolver is configured.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/optimizer.py
async def recommend_downgrades(
    self,
    *,
    start: datetime,
    end: datetime,
) -> DowngradeAnalysis:
    """Recommend model downgrades for inefficient agents.

    Runs efficiency analysis and uses the model resolver and
    downgrade map to find cheaper alternatives.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.

    Returns:
        Downgrade analysis with recommendations. Empty when no
        model_resolver is configured.

    Raises:
        ValueError: If ``start >= end``.
    """
    if start >= end:
        logger.warning(
            CFO_DOWNGRADE_RECOMMENDED,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)

    if self._model_resolver is None:
        logger.warning(
            CFO_RESOLVER_MISSING,
            reason="no_model_resolver_configured",
        )
        budget_pressure = await self._compute_budget_pressure()
        return DowngradeAnalysis(
            recommendations=(),
            budget_pressure_percent=budget_pressure,
        )

    async with asyncio.TaskGroup() as tg:
        records_task = tg.create_task(
            self._cost_tracker.get_records(start=start, end=end),
        )
        pressure_task = tg.create_task(self._compute_budget_pressure())

    records = records_task.result()
    budget_pressure = pressure_task.result()

    efficiency = _build_efficiency_from_records(
        records,
        start=start,
        end=end,
        threshold_factor=self._config.inefficiency_threshold_factor,
        lower_bound_factor=self._config.efficiency_lower_bound_factor,
    )

    logger.info(
        CFO_EFFICIENCY_ANALYSIS_COMPLETE,
        agent_count=len(efficiency.agents),
        inefficient_count=efficiency.inefficient_agent_count,
        global_avg_cost_per_1k=efficiency.global_avg_cost_per_1k,
    )

    by_agent = group_by_agent(records)
    recommendations = self._build_recommendations(
        efficiency=efficiency,
        by_agent=by_agent,
    )

    return DowngradeAnalysis(
        recommendations=tuple(recommendations),
        budget_pressure_percent=budget_pressure,
    )

suggest_routing_optimizations async

suggest_routing_optimizations(*, start, end)

Suggest routing optimizations based on actual usage patterns.

Analyzes each agent's most-used model and suggests cheaper alternatives available through the model resolver, comparing by cost and context window size.

Unlike recommend_downgrades which only targets INEFFICIENT agents, this method analyzes all agents and suggests cheaper alternatives regardless of efficiency rating -- any agent that could use a cheaper model is a candidate.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required

Returns:

Type Description
RoutingOptimizationAnalysis

Routing optimization analysis with per-agent suggestions.

RoutingOptimizationAnalysis

Empty when no model_resolver is configured.

Raises:

Type Description
ValueError

If start >= end.

Source code in src/synthorg/budget/optimizer.py
async def suggest_routing_optimizations(
    self,
    *,
    start: datetime,
    end: datetime,
) -> RoutingOptimizationAnalysis:
    """Suggest routing optimizations based on actual usage patterns.

    Analyzes each agent's most-used model and suggests cheaper
    alternatives available through the model resolver, comparing by
    cost and context window size.

    Unlike ``recommend_downgrades`` which only targets INEFFICIENT
    agents, this method analyzes all agents and suggests cheaper
    alternatives regardless of efficiency rating -- any agent that
    could use a cheaper model is a candidate.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.

    Returns:
        Routing optimization analysis with per-agent suggestions.
        Empty when no model_resolver is configured.

    Raises:
        ValueError: If ``start >= end``.
    """
    if start >= end:
        logger.warning(
            CFO_ROUTING_OPTIMIZATION_COMPLETE,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)

    if self._model_resolver is None:
        logger.warning(
            CFO_RESOLVER_MISSING,
            reason="no_model_resolver_configured",
        )
        return RoutingOptimizationAnalysis(
            suggestions=(),
            analysis_period_start=start,
            analysis_period_end=end,
            agents_analyzed=0,
        )

    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )

    by_agent = group_by_agent(records)
    all_models = self._model_resolver.all_models_sorted_by_cost()
    suggestions = self._find_routing_suggestions(by_agent, all_models)

    result = RoutingOptimizationAnalysis(
        suggestions=tuple(suggestions),
        analysis_period_start=start,
        analysis_period_end=end,
        agents_analyzed=len(by_agent),
    )

    logger.info(
        CFO_ROUTING_OPTIMIZATION_COMPLETE,
        suggestion_count=len(suggestions),
        agents_analyzed=len(by_agent),
        total_savings_per_1k=result.total_estimated_savings_per_1k,
    )

    return result

evaluate_operation async

evaluate_operation(*, agent_id, estimated_cost, now=None)

Evaluate whether an operation should proceed.

Evaluates three criteria in order:

  1. Rejects negative estimated_cost immediately.
  2. Denies if the projected alert level (after adding the estimated cost) meets or exceeds the auto-deny threshold.
  3. Denies if the projected cost would exceed the hard-stop limit.
  4. Approves with optional warning conditions for high-cost operations or elevated alert levels.

When total_monthly <= 0 budget enforcement is disabled and the operation is always approved with no conditions.

Parameters:

Name Type Description Default
agent_id str

Agent requesting the operation.

required
estimated_cost float

Estimated cost of the operation. Must be >= 0.

required
now datetime | None

Reference timestamp for billing period computation. Defaults to datetime.now(UTC).

None

Returns:

Type Description
ApprovalDecision

Approval decision with reasoning.

Raises:

Type Description
ValueError

If estimated_cost is negative.

Source code in src/synthorg/budget/optimizer.py
async def evaluate_operation(
    self,
    *,
    agent_id: str,
    estimated_cost: float,
    now: datetime | None = None,
) -> ApprovalDecision:
    """Evaluate whether an operation should proceed.

    Evaluates three criteria in order:

    1. Rejects negative ``estimated_cost`` immediately.
    2. Denies if the *projected* alert level (after adding the
       estimated cost) meets or exceeds the auto-deny threshold.
    3. Denies if the projected cost would exceed the hard-stop
       limit.
    4. Approves with optional warning conditions for high-cost
       operations or elevated alert levels.

    When ``total_monthly <= 0`` budget enforcement is disabled and
    the operation is always approved with no conditions.

    Args:
        agent_id: Agent requesting the operation.
        estimated_cost: Estimated cost of the operation.  Must
            be >= 0.
        now: Reference timestamp for billing period computation.
            Defaults to ``datetime.now(UTC)``.

    Returns:
        Approval decision with reasoning.

    Raises:
        ValueError: If ``estimated_cost`` is negative.
    """
    if estimated_cost < 0:
        logger.warning(
            CFO_OPERATION_DENIED,
            agent_id=agent_id,
            estimated_cost=estimated_cost,
            reason="negative_estimated_cost",
        )
        msg = f"estimated_cost must be >= 0, got {estimated_cost}"
        raise ValueError(msg)

    cfg = self._budget_config

    if cfg.total_monthly <= 0:
        return ApprovalDecision(
            approved=True,
            reason="Budget enforcement disabled (no monthly budget)",
            budget_remaining=0.0,
            budget_used_percent=0.0,
            alert_level=BudgetAlertLevel.NORMAL,
            conditions=(),
        )

    period_start = billing_period_start(cfg.reset_day, now=now)
    monthly_cost = await self._cost_tracker.get_total_cost(
        start=period_start,
    )
    remaining = round(
        cfg.total_monthly - monthly_cost,
        BUDGET_ROUNDING_PRECISION,
    )
    used_pct = round(
        monthly_cost / cfg.total_monthly * 100,
        BUDGET_ROUNDING_PRECISION,
    )
    alert_level = _compute_alert_level(used_pct, cfg)

    # Use projected alert level (after cost) for auto-deny check.
    projected_cost = round(
        monthly_cost + estimated_cost,
        BUDGET_ROUNDING_PRECISION,
    )
    projected_pct = round(
        projected_cost / cfg.total_monthly * 100,
        BUDGET_ROUNDING_PRECISION,
    )
    projected_alert = _compute_alert_level(projected_pct, cfg)

    denial = self._check_denial(
        agent_id=agent_id,
        estimated_cost=estimated_cost,
        remaining=remaining,
        used_pct=used_pct,
        alert_level=alert_level,
        projected_cost=projected_cost,
        projected_alert=projected_alert,
    )
    if denial is not None:
        return denial

    conditions = self._build_approval_conditions(
        estimated_cost=estimated_cost,
        projected_alert=projected_alert,
        projected_pct=projected_pct,
    )

    decision = ApprovalDecision(
        approved=True,
        reason="Approved",
        budget_remaining=remaining,
        budget_used_percent=used_pct,
        alert_level=alert_level,
        conditions=conditions,
    )

    logger.info(
        CFO_APPROVAL_EVALUATED,
        agent_id=agent_id,
        approved=True,
        estimated_cost=estimated_cost,
        alert_level=alert_level.value,
        conditions_count=len(conditions),
    )

    return decision

optimizer_models

CFO / CostOptimizer domain models.

Frozen Pydantic models for anomaly detection, cost efficiency analysis, downgrade recommendations, and approval decisions. Used by :class:~synthorg.budget.optimizer.CostOptimizer and :class:~synthorg.budget.reports.ReportGenerator.

AnomalyType

Bases: StrEnum

Type of spending anomaly detected.

SUSTAINED_HIGH and RATE_INCREASE are reserved for future detection algorithms; only SPIKE is currently produced.

AnomalySeverity

Bases: StrEnum

Severity of a detected spending anomaly.

EfficiencyRating

Bases: StrEnum

Cost efficiency rating for an agent.

SpendingAnomaly pydantic-model

Bases: BaseModel

A detected spending anomaly for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent exhibiting the anomaly.

anomaly_type AnomalyType

Classification of the anomaly.

severity AnomalySeverity

Severity level of the anomaly.

description NotBlankStr

Human-readable explanation.

current_value float

Spending in the most recent window.

baseline_value float

Mean spending across historical windows.

deviation_factor float

How many standard deviations above baseline. Set to 0.0 when the baseline is zero (no historical spending).

detected_at datetime

Timestamp when the anomaly was detected.

period_start datetime

Start of the window that triggered the anomaly.

period_end datetime

End of the window that triggered the anomaly.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_period_ordering

agent_id pydantic-field

agent_id

Agent identifier

anomaly_type pydantic-field

anomaly_type

Anomaly classification

severity pydantic-field

severity

Severity level

description pydantic-field

description

Human-readable explanation

current_value pydantic-field

current_value

Spending in the most recent window

baseline_value pydantic-field

baseline_value

Mean spending across historical windows

deviation_factor pydantic-field

deviation_factor

Standard deviations above baseline

detected_at pydantic-field

detected_at

When the anomaly was detected

period_start pydantic-field

period_start

Anomalous window start

period_end pydantic-field

period_end

Anomalous window end

AnomalyDetectionResult pydantic-model

Bases: BaseModel

Result of an anomaly detection scan.

Attributes:

Name Type Description
anomalies tuple[SpendingAnomaly, ...]

Detected anomalies (may be empty).

scan_period_start datetime

Start of the scanned period.

scan_period_end datetime

End of the scanned period.

agents_scanned int

Number of unique agents in the data.

scan_timestamp datetime

When the scan was performed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_period_ordering

anomalies pydantic-field

anomalies = ()

Detected anomalies

scan_period_start pydantic-field

scan_period_start

Scanned period start

scan_period_end pydantic-field

scan_period_end

Scanned period end

agents_scanned pydantic-field

agents_scanned

Unique agents in data

scan_timestamp pydantic-field

scan_timestamp

When the scan ran

AgentEfficiency pydantic-model

Bases: BaseModel

Cost efficiency metrics for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

total_cost float

Total cost in the analysis period.

total_tokens int

Total tokens consumed (input + output).

cost_per_1k_tokens float

Cost per 1000 tokens (computed).

record_count int

Number of cost records.

efficiency_rating EfficiencyRating

Efficiency classification.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent identifier

total_cost pydantic-field

total_cost

Total cost in the analysis period

total_tokens pydantic-field

total_tokens

Total tokens consumed

record_count pydantic-field

record_count

Number of cost records

efficiency_rating pydantic-field

efficiency_rating

Efficiency classification

cost_per_1k_tokens property

cost_per_1k_tokens

Cost per 1000 tokens, derived from total_cost and total_tokens.

EfficiencyAnalysis pydantic-model

Bases: BaseModel

Result of a cost efficiency analysis.

Attributes:

Name Type Description
agents tuple[AgentEfficiency, ...]

Per-agent efficiency metrics (sorted by cost_per_1k desc).

global_avg_cost_per_1k float

Global average cost per 1000 tokens.

analysis_period_start datetime

Start of the analysis period.

analysis_period_end datetime

End of the analysis period.

inefficient_agent_count int

Number of agents rated INEFFICIENT (computed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_period_ordering
  • _validate_agents_sort_order

agents pydantic-field

agents = ()

Per-agent efficiency metrics

global_avg_cost_per_1k pydantic-field

global_avg_cost_per_1k

Global average cost per 1000 tokens

analysis_period_start pydantic-field

analysis_period_start

Analysis period start

analysis_period_end pydantic-field

analysis_period_end

Analysis period end

inefficient_agent_count property

inefficient_agent_count

Number of agents rated INEFFICIENT.

DowngradeRecommendation pydantic-model

Bases: BaseModel

A model downgrade recommendation for a single agent.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

current_model NotBlankStr

Currently used model identifier.

recommended_model NotBlankStr

Recommended cheaper model.

estimated_savings_per_1k float

Estimated savings per 1000 tokens.

reason NotBlankStr

Human-readable explanation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_different_models

agent_id pydantic-field

agent_id

Agent identifier

current_model pydantic-field

current_model

Current model identifier

recommended_model pydantic-field

recommended_model

Recommended cheaper model

estimated_savings_per_1k pydantic-field

estimated_savings_per_1k

Estimated savings per 1000 tokens

reason pydantic-field

reason

Human-readable explanation

DowngradeAnalysis pydantic-model

Bases: BaseModel

Result of a downgrade recommendation analysis.

Attributes:

Name Type Description
recommendations tuple[DowngradeRecommendation, ...]

Per-agent downgrade recommendations.

total_estimated_savings_per_1k float

Aggregate estimated savings per 1000 tokens across all recommendations (computed).

budget_pressure_percent float

Current budget utilization percentage.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

recommendations pydantic-field

recommendations = ()

Per-agent downgrade recommendations

budget_pressure_percent pydantic-field

budget_pressure_percent

Current budget utilization percentage

total_estimated_savings_per_1k property

total_estimated_savings_per_1k

Aggregate estimated savings per 1000 tokens.

ApprovalDecision pydantic-model

Bases: BaseModel

Result of evaluating whether an operation should proceed.

Attributes:

Name Type Description
approved bool

Whether the operation is approved.

reason NotBlankStr

Explanation for the decision.

budget_remaining float

Remaining budget in the configured currency (may be negative if over budget).

budget_used_percent float

Percentage of budget consumed.

alert_level BudgetAlertLevel

Current budget alert level.

conditions tuple[NotBlankStr, ...]

Any conditions attached to approval.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

approved pydantic-field

approved

Whether the operation is approved

reason pydantic-field

reason

Explanation for the decision

budget_remaining pydantic-field

budget_remaining

Remaining budget in the configured currency (negative when over budget)

budget_used_percent pydantic-field

budget_used_percent

Percentage of budget consumed

alert_level pydantic-field

alert_level

Current budget alert level

conditions pydantic-field

conditions = ()

Conditions attached to approval

CostOptimizerConfig pydantic-model

Bases: BaseModel

Configuration for the CostOptimizer service.

Attributes:

Name Type Description
anomaly_sigma_threshold float

Number of standard deviations above mean to flag as anomalous.

anomaly_spike_factor float

Multiplier above mean to flag as spike (independent of stddev).

inefficiency_threshold_factor float

Factor above global average cost_per_1k to flag as inefficient.

efficiency_lower_bound_factor float

Factor below global average cost_per_1k under which an agent is rated EFFICIENT. Default 0.8 means agents spending <=80% of the global average cost-per-1k are rated efficient.

approval_auto_deny_alert_level BudgetAlertLevel

Alert level at or above which operations are automatically denied.

approval_warn_threshold float

Absolute cost threshold (in the configured budget.currency) for adding a warning condition to approval. Operators running with a different currency than the default should tune this to a value meaningful in their currency. When set to 0.0, every approved operation receives a "High-cost operation" condition (effectively "always warn").

min_anomaly_windows int

Minimum number of historical windows required before anomaly detection activates.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

anomaly_sigma_threshold pydantic-field

anomaly_sigma_threshold = 2.0

Sigma threshold for anomaly detection

anomaly_spike_factor pydantic-field

anomaly_spike_factor = 3.0

Spike factor multiplier above mean

inefficiency_threshold_factor pydantic-field

inefficiency_threshold_factor = 1.5

Factor above global avg for inefficiency

efficiency_lower_bound_factor pydantic-field

efficiency_lower_bound_factor = 0.8

Factor below global avg cost_per_1k rated EFFICIENT

approval_auto_deny_alert_level pydantic-field

approval_auto_deny_alert_level = HARD_STOP

Alert level triggering auto-deny

approval_warn_threshold pydantic-field

approval_warn_threshold = 1.0

Absolute cost threshold (in configured currency) for warning condition; tune per operator currency

min_anomaly_windows pydantic-field

min_anomaly_windows = 3

Minimum historical windows for anomaly detection

RoutingSuggestion pydantic-model

Bases: BaseModel

A routing optimization suggestion for a single agent.

Suggests switching an agent's most-used model to a cheaper alternative that provides sufficient context window size.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent identifier.

current_model NotBlankStr

Currently most-used model identifier.

suggested_model NotBlankStr

Suggested cheaper alternative.

current_cost_per_1k float

Current model's total cost per 1k tokens.

suggested_cost_per_1k float

Suggested model's total cost per 1k tokens.

estimated_savings_per_1k float

Estimated savings per 1k tokens (computed).

reason NotBlankStr

Human-readable explanation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_different_models
  • _validate_savings_positive

agent_id pydantic-field

agent_id

Agent identifier

current_model pydantic-field

current_model

Current most-used model

suggested_model pydantic-field

suggested_model

Suggested cheaper model

current_cost_per_1k pydantic-field

current_cost_per_1k

Current model total cost per 1k tokens

suggested_cost_per_1k pydantic-field

suggested_cost_per_1k

Suggested model total cost per 1k tokens

reason pydantic-field

reason

Human-readable explanation

estimated_savings_per_1k property

estimated_savings_per_1k

Estimated savings per 1k tokens.

RoutingOptimizationAnalysis pydantic-model

Bases: BaseModel

Result of a routing optimization analysis.

Attributes:

Name Type Description
suggestions tuple[RoutingSuggestion, ...]

Per-agent routing optimization suggestions.

total_estimated_savings_per_1k float

Aggregate estimated savings per 1k tokens across all suggestions (computed).

analysis_period_start datetime

Start of the analysis period.

analysis_period_end datetime

End of the analysis period.

agents_analyzed int

Number of agents analyzed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_period_ordering

suggestions pydantic-field

suggestions = ()

Per-agent routing optimization suggestions

analysis_period_start pydantic-field

analysis_period_start

Analysis period start

analysis_period_end pydantic-field

analysis_period_end

Analysis period end

agents_analyzed pydantic-field

agents_analyzed

Number of agents analyzed

total_estimated_savings_per_1k property

total_estimated_savings_per_1k

Aggregate estimated savings per 1k tokens.

Reports

reports

CFO spending report generation.

Provides multi-dimensional spending reports with breakdowns by task, provider, model, and time-period comparison. Composes :class:~synthorg.budget.tracker.CostTracker and :class:~synthorg.budget.config.BudgetConfig.

Service layer backing CFO reporting (see Operations design page).

TaskSpending pydantic-model

Bases: BaseModel

Spending aggregation for a single task.

Attributes:

Name Type Description
task_id NotBlankStr

Task identifier.

total_cost float

Total cost for the task.

currency CurrencyCode | None

ISO 4217 currency code shared by every contributing record; None only when record_count == 0.

total_tokens int

Total tokens consumed (input + output).

record_count int

Number of cost records.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_currency_presence

task_id pydantic-field

task_id

Task identifier

total_cost pydantic-field

total_cost

Total cost

currency pydantic-field

currency = None

Currency shared by every contributing record

total_tokens pydantic-field

total_tokens

Total tokens consumed

record_count pydantic-field

record_count

Number of cost records

ProviderDistribution pydantic-model

Bases: BaseModel

Cost distribution for a single provider.

Attributes:

Name Type Description
provider NotBlankStr

Provider name.

total_cost float

Total cost for the provider.

currency CurrencyCode | None

ISO 4217 currency code shared by every contributing record; None only when record_count == 0.

record_count int

Number of cost records.

percentage_of_total float

Percentage of total spending.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_currency_presence

provider pydantic-field

provider

Provider name

total_cost pydantic-field

total_cost

Total cost

currency pydantic-field

currency = None

Currency shared by every contributing record

record_count pydantic-field

record_count

Number of cost records

percentage_of_total pydantic-field

percentage_of_total

Percentage of total spending

ModelDistribution pydantic-model

Bases: BaseModel

Cost distribution for a single model.

Attributes:

Name Type Description
model NotBlankStr

Model identifier.

provider NotBlankStr

Provider name.

total_cost float

Total cost for the model.

currency CurrencyCode | None

ISO 4217 currency code shared by every contributing record; None only when record_count == 0.

record_count int

Number of cost records.

percentage_of_total float

Percentage of total spending.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_currency_presence

model pydantic-field

model

Model identifier

provider pydantic-field

provider

Provider name

total_cost pydantic-field

total_cost

Total cost

currency pydantic-field

currency = None

Currency shared by every contributing record

record_count pydantic-field

record_count

Number of cost records

percentage_of_total pydantic-field

percentage_of_total

Percentage of total spending

PeriodComparison pydantic-model

Bases: BaseModel

Comparison of spending between two consecutive periods.

Attributes:

Name Type Description
current_period_cost float

Cost in the current period.

previous_period_cost float

Cost in the previous period.

cost_change float

Absolute change in cost (computed).

cost_change_percent float | None

Percentage change in cost (computed). None when previous period cost is zero.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

current_period_cost pydantic-field

current_period_cost

Current period cost

previous_period_cost pydantic-field

previous_period_cost

Previous period cost

cost_change property

cost_change

Absolute cost change (current - previous).

cost_change_percent property

cost_change_percent

Percentage cost change. None when previous period cost is zero.

SpendingReport pydantic-model

Bases: BaseModel

Multi-dimensional spending report.

Attributes:

Name Type Description
summary SpendingSummary

Overall spending summary for the period.

by_task tuple[TaskSpending, ...]

Per-task spending breakdown.

by_provider tuple[ProviderDistribution, ...]

Per-provider cost distribution.

by_model tuple[ModelDistribution, ...]

Per-model cost distribution.

period_comparison PeriodComparison | None

Comparison with previous period (optional).

top_agents_by_cost tuple[tuple[NotBlankStr, float], ...]

Top agents by cost (sorted descending).

top_tasks_by_cost tuple[tuple[NotBlankStr, float], ...]

Top tasks by cost (sorted descending).

generated_at datetime

When the report was generated.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_agent_ranking_order
  • _validate_task_ranking_order

summary pydantic-field

summary

Overall spending summary

by_task pydantic-field

by_task = ()

Per-task spending breakdown

by_provider pydantic-field

by_provider = ()

Per-provider cost distribution

by_model pydantic-field

by_model = ()

Per-model cost distribution

period_comparison pydantic-field

period_comparison = None

Comparison with previous period

top_agents_by_cost pydantic-field

top_agents_by_cost = ()

Top agents by cost (agent_id, cost)

top_tasks_by_cost pydantic-field

top_tasks_by_cost = ()

Top tasks by cost (task_id, cost)

generated_at pydantic-field

generated_at

When the report was generated

ReportGenerator

ReportGenerator(*, cost_tracker, budget_config)

Generates multi-dimensional spending reports.

Composes CostTracker and BudgetConfig to produce reports with breakdowns by task, provider, model, and period comparison.

Parameters:

Name Type Description Default
cost_tracker CostTracker

Cost tracking service for querying spend.

required
budget_config BudgetConfig

Budget configuration for context.

required
Source code in src/synthorg/budget/reports.py
def __init__(
    self,
    *,
    cost_tracker: CostTracker,
    budget_config: BudgetConfig,
) -> None:
    self._cost_tracker = cost_tracker
    self._budget_config = budget_config
    logger.debug(
        CFO_REPORT_GENERATOR_CREATED,
        has_budget_config=True,
    )

generate_report async

generate_report(*, start, end, top_n=_DEFAULT_TOP_N, include_period_comparison=True)

Generate a spending report for the given period.

Fetches records and summary concurrently; derives total_cost from the records snapshot for consistent distribution percentages.

Parameters:

Name Type Description Default
start datetime

Inclusive period start.

required
end datetime

Exclusive period end.

required
top_n int

Maximum number of top agents/tasks to include.

_DEFAULT_TOP_N
include_period_comparison bool

Whether to compute a comparison with the previous period of the same duration.

True

Returns:

Type Description
SpendingReport

Multi-dimensional spending report.

Raises:

Type Description
ValueError

If start >= end or top_n < 1.

Source code in src/synthorg/budget/reports.py
async def generate_report(
    self,
    *,
    start: datetime,
    end: datetime,
    top_n: int = _DEFAULT_TOP_N,
    include_period_comparison: bool = True,
) -> SpendingReport:
    """Generate a spending report for the given period.

    Fetches records and summary concurrently; derives ``total_cost``
    from the records snapshot for consistent distribution
    percentages.

    Args:
        start: Inclusive period start.
        end: Exclusive period end.
        top_n: Maximum number of top agents/tasks to include.
        include_period_comparison: Whether to compute a comparison
            with the previous period of the same duration.

    Returns:
        Multi-dimensional spending report.

    Raises:
        ValueError: If ``start >= end`` or ``top_n < 1``.
    """
    if start >= end:
        logger.warning(
            CFO_REPORT_VALIDATION_ERROR,
            error="start_after_end",
            start=start.isoformat(),
            end=end.isoformat(),
        )
        msg = f"start ({start.isoformat()}) must be before end ({end.isoformat()})"
        raise ValueError(msg)
    if top_n < 1:
        logger.warning(
            CFO_REPORT_VALIDATION_ERROR,
            error="top_n_below_minimum",
            top_n=top_n,
        )
        msg = f"top_n must be >= 1, got {top_n}"
        raise ValueError(msg)

    now = datetime.now(UTC)

    records = await self._cost_tracker.get_records(
        start=start,
        end=end,
    )
    self._cost_tracker._log_retention_window(start)  # noqa: SLF001
    summary = self._cost_tracker.build_summary_from_records(
        records,
        start=start,
        end=end,
    )

    # Derive total_cost from the same snapshot the summary used so
    # the percentages, period totals, and breakdowns are guaranteed
    # to agree (a second tracker read here could race a concurrent
    # ``record()`` and produce inconsistent rollups).
    assert_currencies_match(r.currency for r in records)
    total_cost = round(
        math.fsum(r.cost for r in records),
        BUDGET_ROUNDING_PRECISION,
    )
    by_task = _build_task_spendings(records)
    by_provider = _build_provider_distribution(records, total_cost)
    by_model = _build_model_distribution(records, total_cost)

    top_agents = _build_top_agents(summary, top_n)
    top_tasks = _build_top_tasks(by_task, top_n)

    period_comparison: PeriodComparison | None = None
    if include_period_comparison:
        period_comparison = await self._build_period_comparison(
            start,
            end,
            total_cost,
        )

    report = SpendingReport(
        summary=summary,
        by_task=by_task,
        by_provider=by_provider,
        by_model=by_model,
        period_comparison=period_comparison,
        top_agents_by_cost=top_agents,
        top_tasks_by_cost=top_tasks,
        generated_at=now,
    )

    logger.info(
        CFO_REPORT_GENERATED,
        total_cost=total_cost,
        task_count=len(by_task),
        provider_count=len(by_provider),
        model_count=len(by_model),
        has_comparison=period_comparison is not None,
    )

    return report

Enums

enums

Budget-specific enumerations.

BudgetAlertLevel

Bases: StrEnum

Alert severity levels for budget thresholds.

Used by :class:~synthorg.budget.spending_summary.SpendingSummary to indicate the current budget state relative to configured thresholds.

Errors

errors

Budget-layer error hierarchy.

Defines budget-specific exceptions in a leaf module with minimal intra-project imports, preventing circular dependency chains when these exceptions are needed by both the budget enforcer and the engine layer.

BudgetExhaustedError

BudgetExhaustedError(message=None)

Bases: DomainError

Budget exhaustion signal.

Used in two contexts:

  1. Raised directly by :meth:BudgetEnforcer.check_can_execute when pre-flight budget checks fail (e.g., monthly hard stop, daily limit, or provider quota exceeded).
  2. Caught by the engine layer (AgentEngine.run) and used to build an AgentRunResult with TerminationReason.BUDGET_EXHAUSTED.
Class Attributes

status_code: HTTP 402 Payment Required. error_code: BUDGET_EXHAUSTED. error_category: BUDGET_EXHAUSTED. retryable: False -- caller must adjust budget or wait for period reset. default_message: Generic message safe for user-facing responses.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

DailyLimitExceededError

DailyLimitExceededError(message=None)

Bases: BudgetExhaustedError

Per-agent daily spending limit exceeded.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

RiskBudgetExhaustedError

RiskBudgetExhaustedError(
    msg, *, agent_id=None, task_id=None, risk_units_used=0.0, risk_limit=0.0
)

Bases: BudgetExhaustedError

Raised when cumulative risk budget is exhausted.

Subclass of BudgetExhaustedError so existing engine-level catch handlers cover it transparently.

Attributes:

Name Type Description
agent_id

The agent that exceeded the limit, or None.

task_id

The task during which the limit was exceeded, or None.

risk_units_used

Cumulative risk units consumed.

risk_limit

The limit that was exceeded.

Source code in src/synthorg/budget/errors.py
def __init__(
    self,
    msg: str,
    *,
    agent_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
    risk_units_used: float = 0.0,
    risk_limit: float = 0.0,
) -> None:
    super().__init__(msg)
    self.agent_id = agent_id
    self.task_id = task_id
    self.risk_units_used = risk_units_used
    self.risk_limit = risk_limit

ProjectBudgetExhaustedError

ProjectBudgetExhaustedError(msg, *, project_id, project_budget=0.0, project_spent=0.0)

Bases: BudgetExhaustedError

Project-level budget limit exceeded.

Attributes:

Name Type Description
project_id

The project whose budget was exceeded.

project_budget

The project's total budget.

project_spent

Amount already spent on the project.

Source code in src/synthorg/budget/errors.py
def __init__(
    self,
    msg: str,
    *,
    project_id: NotBlankStr,
    project_budget: float = 0.0,
    project_spent: float = 0.0,
) -> None:
    super().__init__(msg)
    self.project_id = project_id
    self.project_budget = project_budget
    self.project_spent = project_spent

MixedCurrencyAggregationError

MixedCurrencyAggregationError(
    msg=None,
    *,
    currencies,
    agent_id=None,
    task_id=None,
    project_id=None,
    department_id=None,
)

Bases: DomainError

Raised when cost values in different currencies would be aggregated.

Cost summation, averaging, and budget checks only produce meaningful results when every contributing row carries the same currency. This error signals that the caller handed an aggregator a mix of currencies; the fix is to partition records by currency first (or apply an FX conversion -- out of scope for the initial release).

Intentionally a sibling of :class:BudgetExhaustedError, not a subclass: this is a data-integrity / caller-contract violation, not a budget-exhaustion signal, so the engine layer's BudgetExhaustedError catch block must not absorb it.

Class Attributes

status_code: HTTP 409 Conflict. error_code: MIXED_CURRENCY_AGGREGATION. error_category: CONFLICT. retryable: False -- retrying without partitioning the input produces the same error. default_message: Generic message safe for user-facing responses.

Instance Attributes

currencies: The set of distinct currency codes observed in the input. Exposed so structured logs and error envelopes can surface the conflicting codes without inspecting the offending records directly. agent_id: Optional agent identifier the aggregation targeted. task_id: Optional task identifier the aggregation targeted. project_id: Optional project identifier the aggregation targeted. department_id: Optional department identifier the aggregation targeted. Distinct from project_id so per-department rollups (DepartmentSpending) can attach the offending department name without pretending it is a project.

Source code in src/synthorg/budget/errors.py
def __init__(  # noqa: PLR0913 -- one optional id per scope dimension
    self,
    msg: str | None = None,
    *,
    currencies: frozenset[str],
    agent_id: NotBlankStr | None = None,
    task_id: NotBlankStr | None = None,
    project_id: NotBlankStr | None = None,
    department_id: NotBlankStr | None = None,
) -> None:
    if not currencies:
        detail = (
            "MixedCurrencyAggregationError requires at least one "
            "currency (or the missing-currency sentinel), got an "
            "empty set"
        )
        raise ValueError(detail)
    super().__init__(msg or self.default_message)
    self.currencies = currencies
    self.agent_id = agent_id
    self.task_id = task_id
    self.project_id = project_id
    self.department_id = department_id

QuotaExhaustedError

QuotaExhaustedError(msg, *, provider_name=None, degradation_action=None)

Bases: BudgetExhaustedError

Raised when provider quota is exhausted and unresolvable.

Covers all terminal degradation outcomes: ALERT strategy (intentional immediate raise), failed FALLBACK (no providers available or all exhausted), and failed QUEUE (wait exceeded or still exhausted after waiting).

Attributes:

Name Type Description
provider_name

The provider whose quota was exhausted, or None when not available.

degradation_action

The degradation strategy that was attempted, or None when not available.

Source code in src/synthorg/budget/errors.py
def __init__(
    self,
    msg: str,
    *,
    provider_name: NotBlankStr | None = None,
    degradation_action: DegradationAction | None = None,
) -> None:
    super().__init__(msg)
    self.provider_name = provider_name
    self.degradation_action = degradation_action