Skip to content

API Layer

Litestar REST + WebSocket API -- controllers, authentication, guards, and channels.

App

app

Litestar application factory.

Creates and configures the Litestar application with all controllers, middleware, exception handlers, plugins, and lifecycle hooks (startup/shutdown).

create_app

create_app(
    *,
    config=None,
    persistence=None,
    message_bus=None,
    cost_tracker=None,
    approval_store=None,
    auth_service=None,
    task_engine=None,
    coordinator=None,
    agent_registry=None,
    meeting_orchestrator=None,
    meeting_scheduler=None,
    performance_tracker=None,
    settings_service=None,
    provider_registry=None,
    provider_health_tracker=None,
    tool_invocation_tracker=None,
    delegation_record_store=None,
    artifact_storage=None,
)

Create and configure the Litestar application.

All parameters are optional for testing -- provide fakes via keyword arguments. Services not explicitly provided are auto-wired from config and environment variables.

Parameters:

Name Type Description Default
config RootConfig | None

Root company configuration.

None
persistence PersistenceBackend | None

Persistence backend.

None
message_bus MessageBus | None

Internal message bus.

None
cost_tracker CostTracker | None

Cost tracking service.

None
approval_store ApprovalStore | None

Approval queue store.

None
auth_service AuthService | None

Pre-built auth service (for testing).

None
task_engine TaskEngine | None

Centralized task state engine.

None
coordinator MultiAgentCoordinator | None

Multi-agent coordinator.

None
agent_registry AgentRegistryService | None

Agent registry service.

None
meeting_orchestrator MeetingOrchestrator | None

Meeting orchestrator.

None
meeting_scheduler MeetingScheduler | None

Meeting scheduler.

None
performance_tracker PerformanceTracker | None

Performance tracking service.

None
settings_service SettingsService | None

Settings service for runtime config.

None
provider_registry ProviderRegistry | None

Provider registry.

None
provider_health_tracker ProviderHealthTracker | None

Provider health tracking service.

None
tool_invocation_tracker ToolInvocationTracker | None

Tool invocation tracking service.

None
delegation_record_store DelegationRecordStore | None

Delegation record store.

None
artifact_storage ArtifactStorageBackend | None

Artifact storage backend.

None

Returns:

Type Description
Litestar

Configured Litestar application.

Source code in src/synthorg/api/app.py
def create_app(  # noqa: PLR0913, PLR0915
    *,
    config: RootConfig | None = None,
    persistence: PersistenceBackend | None = None,
    message_bus: MessageBus | None = None,
    cost_tracker: CostTracker | None = None,
    approval_store: ApprovalStore | None = None,
    auth_service: AuthService | None = None,
    task_engine: TaskEngine | None = None,
    coordinator: MultiAgentCoordinator | None = None,
    agent_registry: AgentRegistryService | None = None,
    meeting_orchestrator: MeetingOrchestrator | None = None,
    meeting_scheduler: MeetingScheduler | None = None,
    performance_tracker: PerformanceTracker | None = None,
    settings_service: SettingsService | None = None,
    provider_registry: ProviderRegistry | None = None,
    provider_health_tracker: ProviderHealthTracker | None = None,
    tool_invocation_tracker: ToolInvocationTracker | None = None,
    delegation_record_store: DelegationRecordStore | None = None,
    artifact_storage: ArtifactStorageBackend | None = None,
) -> Litestar:
    """Create and configure the Litestar application.

    All parameters are optional for testing -- provide fakes via
    keyword arguments.  Services not explicitly provided are
    auto-wired from config and environment variables.

    Args:
        config: Root company configuration.
        persistence: Persistence backend.
        message_bus: Internal message bus.
        cost_tracker: Cost tracking service.
        approval_store: Approval queue store.
        auth_service: Pre-built auth service (for testing).
        task_engine: Centralized task state engine.
        coordinator: Multi-agent coordinator.
        agent_registry: Agent registry service.
        meeting_orchestrator: Meeting orchestrator.
        meeting_scheduler: Meeting scheduler.
        performance_tracker: Performance tracking service.
        settings_service: Settings service for runtime config.
        provider_registry: Provider registry.
        provider_health_tracker: Provider health tracking service.
        tool_invocation_tracker: Tool invocation tracking service.
        delegation_record_store: Delegation record store.
        artifact_storage: Artifact storage backend.

    Returns:
        Configured Litestar application.
    """
    effective_config = config or RootConfig(company_name="default")

    # Activate the structured logging pipeline before any
    # other setup so that auto-wiring, persistence, and bus logs all
    # flow through the configured sinks.  Respects SYNTHORG_LOG_DIR
    # env var for Docker log directory override.
    try:
        effective_config = _bootstrap_app_logging(effective_config)
    except Exception as exc:
        print(  # noqa: T201
            f"CRITICAL: Failed to initialise logging pipeline: {exc}. "
            "Check SYNTHORG_LOG_DIR, SYNTHORG_LOG_LEVEL, and the "
            "'logging' section of your config file.",
            file=sys.stderr,
            flush=True,
        )
        raise

    api_config = effective_config.api

    # Resolve runtime paths for backup service wiring.
    resolved_db_path: Path | None = None
    resolved_config_path_str = (os.environ.get("SYNTHORG_CONFIG_PATH") or "").strip()
    resolved_config_path: Path | None = (
        Path(resolved_config_path_str) if resolved_config_path_str else None
    )

    # Auto-wire persistence from SYNTHORG_DB_PATH env var (set by CLI
    # compose template).  The startup lifecycle handles connect() +
    # migrate() + auth service creation.
    if persistence is None:
        db_path = (os.environ.get("SYNTHORG_DB_PATH") or "").strip()
        if db_path:
            resolved_db_path = Path(db_path)
            try:
                persistence = create_backend(
                    PersistenceConfig(sqlite=SQLiteConfig(path=db_path)),
                )
            except Exception:
                logger.exception(
                    API_APP_STARTUP,
                    error="Failed to create persistence backend from env",
                )
                raise
            logger.info(
                API_APP_STARTUP,
                note="Auto-wired SQLite persistence from SYNTHORG_DB_PATH",
                db_name=Path(db_path).name,
            )
            # Auto-wire artifact storage from the same data directory.
            if artifact_storage is None:
                artifact_storage = FileSystemArtifactStorage(
                    data_dir=resolved_db_path.parent,
                )
                logger.info(
                    API_APP_STARTUP,
                    note="Auto-wired filesystem artifact storage",
                )

    # ── Phase 1 auto-wire: services that don't need connected persistence ──
    phase1 = auto_wire_phase1(
        effective_config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        cost_tracker=cost_tracker,
        task_engine=task_engine,
        provider_registry=provider_registry,
        provider_health_tracker=provider_health_tracker,
    )
    message_bus = phase1.message_bus
    cost_tracker = phase1.cost_tracker
    task_engine = phase1.task_engine
    provider_registry = phase1.provider_registry
    provider_health_tracker = phase1.provider_health_tracker

    # ── Meeting auto-wire: orchestrator + scheduler (Phase 1 level) ──
    meeting_wire = auto_wire_meetings(
        effective_config=effective_config,
        meeting_orchestrator=meeting_orchestrator,
        meeting_scheduler=meeting_scheduler,
        agent_registry=agent_registry,
    )
    meeting_orchestrator = meeting_wire.meeting_orchestrator
    meeting_scheduler = meeting_wire.meeting_scheduler
    ceremony_scheduler = meeting_wire.ceremony_scheduler

    channels_plugin = create_channels_plugin()
    expire_callback = _make_expire_callback(channels_plugin)
    effective_approval_store = approval_store or ApprovalStore(
        on_expire=expire_callback,
    )

    # Wire meeting event publisher to the meetings WS channel.
    if meeting_scheduler is not None and meeting_scheduler._event_publisher is None:  # noqa: SLF001
        meeting_scheduler._event_publisher = _make_meeting_publisher(  # noqa: SLF001
            channels_plugin,
        )

    # Auto-wire performance tracker with composite quality strategy
    # when not explicitly injected (production path).
    # TODO(#1061): pass effective_config.performance once RootConfig
    # has a PerformanceConfig field -- currently uses defaults.
    if performance_tracker is None:
        performance_tracker = _build_performance_tracker(
            cost_tracker=cost_tracker,
            provider_registry=provider_registry,
        )

    app_state = AppState(
        config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        cost_tracker=cost_tracker,
        approval_store=effective_approval_store,
        auth_service=auth_service,
        task_engine=task_engine,
        coordinator=coordinator,
        agent_registry=agent_registry,
        meeting_orchestrator=meeting_orchestrator,
        meeting_scheduler=meeting_scheduler,
        ceremony_scheduler=ceremony_scheduler,
        performance_tracker=performance_tracker,
        settings_service=settings_service,
        provider_registry=provider_registry,
        provider_health_tracker=provider_health_tracker,
        tool_invocation_tracker=tool_invocation_tracker,
        delegation_record_store=delegation_record_store,
        artifact_storage=artifact_storage,
        startup_time=time.monotonic(),
    )

    bridge = (
        MessageBusBridge(message_bus, channels_plugin)
        if message_bus is not None
        else None
    )
    backup_service = build_backup_service(
        effective_config,
        resolved_db_path=resolved_db_path,
        resolved_config_path=resolved_config_path,
    )
    settings_dispatcher = _build_settings_dispatcher(
        message_bus,
        settings_service,
        effective_config,
        app_state,
        backup_service,
    )
    plugins: list[ChannelsPlugin] = [channels_plugin]
    middleware = _build_middleware(api_config)

    api_router = Router(
        path=api_config.api_prefix,
        route_handlers=[*ALL_CONTROLLERS, ws_handler],
        guards=[require_password_changed],
    )

    # Phase 2 auto-wiring flag: persistence being non-None is the
    # enabling condition -- SettingsService needs connected persistence
    # and is created in on_startup after _init_persistence().
    _should_auto_wire = settings_service is None and persistence is not None

    # Review gate service -- transitions tasks from IN_REVIEW on approval.
    review_gate_service = ReviewGateService(task_engine=task_engine)
    app_state.set_review_gate_service(review_gate_service)

    # Approval timeout scheduler -- None here; auto-creation from
    # settings at startup is not yet wired.  Pass explicitly via the
    # lifecycle when a TimeoutChecker is available.
    approval_timeout_scheduler: ApprovalTimeoutScheduler | None = None

    startup, shutdown = _build_lifecycle(
        persistence,
        message_bus,
        bridge,
        settings_dispatcher,
        task_engine,
        meeting_scheduler,
        backup_service,
        approval_timeout_scheduler,
        app_state,
        should_auto_wire_settings=_should_auto_wire,
        effective_config=effective_config,
    )

    return Litestar(
        route_handlers=[api_router],
        # Disable Litestar's built-in logging config to preserve the
        # structlog multi-file-sink pipeline set up by
        # _bootstrap_app_logging() above.  Without this, Litestar calls
        # dictConfig() at startup which triggers _clearExistingHandlers
        # and replaces structlog's file sinks with a stdlib
        # queue_listener, causing all runtime logs to go only to Docker
        # stdout.
        logging_config=None,
        state=State({"app_state": app_state}),
        cors_config=CORSConfig(
            allow_origins=list(api_config.cors.allowed_origins),
            allow_methods=list(api_config.cors.allow_methods),  # type: ignore[arg-type]
            allow_headers=list(api_config.cors.allow_headers),
            allow_credentials=api_config.cors.allow_credentials,
        ),
        compression_config=CompressionConfig(
            backend="brotli",
            minimum_size=1000,
        ),
        # Must be >= artifact API max payload (50 MB) so endpoint-level
        # validation can enforce exact storage limits.
        request_max_body_size=52_428_800,  # 50 MB
        before_send=[security_headers_hook],
        middleware=middleware,
        plugins=plugins,
        exception_handlers=dict(EXCEPTION_HANDLERS),  # type: ignore[arg-type]
        openapi_config=OpenAPIConfig(
            title="SynthOrg API",
            version=__version__,
            path="/docs",
            render_plugins=[
                ScalarRenderPlugin(path="/api"),
            ],
        ),
        on_startup=startup,
        on_shutdown=shutdown,
    )

Config

config

API configuration models.

Frozen Pydantic models for CORS, rate limiting, server, authentication, and the top-level ApiConfig that aggregates them all.

CorsConfig pydantic-model

Bases: BaseModel

CORS configuration for the API.

Attributes:

Name Type Description
allowed_origins tuple[str, ...]

Origins permitted to make cross-origin requests.

allow_methods tuple[str, ...]

HTTP methods permitted in cross-origin requests.

allow_headers tuple[str, ...]

Headers permitted in cross-origin requests.

allow_credentials bool

Whether credentials (cookies, auth) are allowed in cross-origin requests.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_wildcard_credentials

allowed_origins pydantic-field

allowed_origins = ('http://localhost:5173',)

Origins permitted to make cross-origin requests

allow_methods pydantic-field

allow_methods = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS')

HTTP methods permitted in cross-origin requests

allow_headers pydantic-field

allow_headers = ('Content-Type', 'Authorization')

Headers permitted in cross-origin requests

allow_credentials pydantic-field

allow_credentials = False

Whether credentials are allowed

RateLimitTimeUnit

Bases: StrEnum

Valid time windows for rate limiting.

RateLimitConfig pydantic-model

Bases: BaseModel

API rate limiting configuration.

Maps to Litestar's built-in RateLimitConfig middleware.

Attributes:

Name Type Description
max_requests int

Maximum requests per time window.

time_unit RateLimitTimeUnit

Time window (second, minute, hour, day).

exclude_paths tuple[str, ...]

Paths excluded from rate limiting.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

max_requests pydantic-field

max_requests = 100

Maximum requests per time window

time_unit pydantic-field

time_unit = MINUTE

Time window (second, minute, hour, day)

exclude_paths pydantic-field

exclude_paths = ('/api/v1/health',)

Paths excluded from rate limiting

ServerConfig pydantic-model

Bases: BaseModel

Uvicorn server configuration.

Attributes:

Name Type Description
host str

Bind address.

port int

Bind port.

reload bool

Enable auto-reload for development.

workers int

Number of worker processes.

ws_ping_interval float

WebSocket ping interval in seconds (0 to disable).

ws_ping_timeout float

WebSocket pong timeout in seconds.

ssl_certfile str | None

Path to SSL certificate file (PEM format).

ssl_keyfile str | None

Path to SSL private key file (PEM format).

ssl_ca_certs str | None

Path to CA bundle for client cert verification.

trusted_proxies tuple[str, ...]

IP addresses/CIDRs trusted as reverse proxies for X-Forwarded-For/X-Forwarded-Proto header processing.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _normalize_empty_tls
  • _validate_tls_pair

host pydantic-field

host = '127.0.0.1'

Bind address

port pydantic-field

port = 3001

Bind port

reload pydantic-field

reload = False

Enable auto-reload for development

workers pydantic-field

workers = 1

Number of worker processes

ws_ping_interval pydantic-field

ws_ping_interval = 20.0

WebSocket ping interval in seconds (0 to disable)

ws_ping_timeout pydantic-field

ws_ping_timeout = 20.0

WebSocket pong timeout in seconds

ssl_certfile pydantic-field

ssl_certfile = None

Path to SSL certificate file (PEM format)

ssl_keyfile pydantic-field

ssl_keyfile = None

Path to SSL private key file (PEM format)

ssl_ca_certs pydantic-field

ssl_ca_certs = None

Path to CA bundle for client certificate verification

trusted_proxies pydantic-field

trusted_proxies = ()

IP addresses/CIDRs trusted as reverse proxies for X-Forwarded-For/Proto header processing

ApiConfig pydantic-model

Bases: BaseModel

Top-level API configuration aggregating all sub-configs.

Attributes:

Name Type Description
cors CorsConfig

CORS configuration.

rate_limit RateLimitConfig

Rate limiting configuration.

server ServerConfig

Uvicorn server configuration.

auth AuthConfig

Authentication configuration.

api_prefix NotBlankStr

URL prefix for all API routes.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

cors pydantic-field

cors

CORS configuration

rate_limit pydantic-field

rate_limit

Rate limiting configuration

server pydantic-field

server

Uvicorn server configuration

auth pydantic-field

auth

Authentication configuration

api_prefix pydantic-field

api_prefix = '/api/v1'

URL prefix for all API routes

DTOs

dto

Request/response DTOs and envelope models.

Response envelopes wrap all API responses in a consistent structure. Request DTOs define write-operation payloads (separate from domain models because they omit server-generated fields).

ErrorDetail pydantic-model

Bases: BaseModel

Structured error metadata (RFC 9457).

Self-contained so agents can parse it without referencing the parent envelope.

Attributes:

Name Type Description
detail NotBlankStr

Human-readable occurrence-specific explanation.

error_code ErrorCode

Machine-readable error code (by convention, 4-digit category-grouped; see ErrorCode).

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

instance NotBlankStr

Request correlation ID for log tracing.

title NotBlankStr

Static per-category title (e.g. "Authentication Error").

type NotBlankStr

Documentation URI for the error category.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_retry_after_consistency

ProblemDetail pydantic-model

Bases: BaseModel

Bare RFC 9457 application/problem+json response body.

Returned when the client sends Accept: application/problem+json.

Attributes:

Name Type Description
type NotBlankStr

Documentation URI for the error category.

title NotBlankStr

Static per-category title.

status int

HTTP status code.

detail NotBlankStr

Human-readable occurrence-specific explanation.

instance NotBlankStr

Request correlation ID for log tracing.

error_code ErrorCode

Machine-readable 4-digit error code.

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_retry_after_consistency

ApiResponse pydantic-model

Bases: BaseModel

Standard API response envelope.

Attributes:

Name Type Description
data T | None

Response payload (None on error).

error str | None

Error message (None on success).

error_detail ErrorDetail | None

Structured error metadata (None on success).

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • data (T | None)
  • error (str | None)
  • error_detail (ErrorDetail | None)

Validators:

  • _validate_error_detail_consistency

success property

success

Whether the request succeeded (derived from error).

PaginationMeta pydantic-model

Bases: BaseModel

Pagination metadata for list responses.

Attributes:

Name Type Description
total int

Total number of items matching the query.

offset int

Starting offset of the returned page.

limit int

Maximum items per page.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

total pydantic-field

total

Total matching items

offset pydantic-field

offset

Starting offset

limit pydantic-field

limit

Maximum items per page

PaginatedResponse pydantic-model

Bases: BaseModel

Paginated API response envelope.

Attributes:

Name Type Description
data tuple[T, ...]

Page of items.

error str | None

Error message (None on success).

error_detail ErrorDetail | None

Structured error metadata (None on success).

pagination PaginationMeta

Pagination metadata.

degraded_sources tuple[NotBlankStr, ...]

Data sources that failed gracefully, resulting in partial data. Empty when all sources responded normally.

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_error_detail_consistency

degraded_sources pydantic-field

degraded_sources = ()

Data sources that failed gracefully (partial data)

success property

success

Whether the request succeeded (derived from error).

CreateArtifactRequest pydantic-model

Bases: BaseModel

Payload for creating a new artifact.

Attributes:

Name Type Description
type ArtifactType

Artifact type (code, tests, documentation).

path NotBlankStr

Logical file/directory path of the artifact.

task_id NotBlankStr

ID of the originating task.

created_by NotBlankStr

Agent ID of the creator.

description str

Human-readable description.

content_type str

MIME content type (empty if no content stored).

project_id NotBlankStr | None

Optional project ID to link the artifact to.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

CreateProjectRequest pydantic-model

Bases: BaseModel

Payload for creating a new project.

Attributes:

Name Type Description
name NotBlankStr

Project display name.

description str

Detailed project description.

team tuple[NotBlankStr, ...]

Agent IDs assigned to the project.

lead NotBlankStr | None

Agent ID of the project lead.

deadline str | None

Optional deadline (ISO 8601 string).

budget float

Total budget in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_request

CreateTaskRequest pydantic-model

Bases: BaseModel

Payload for creating a new task.

Attributes:

Name Type Description
title NotBlankStr

Short task title.

description NotBlankStr

Detailed task description.

type TaskType

Task work type.

priority Priority

Task priority level.

project NotBlankStr

Project ID.

created_by NotBlankStr

Agent name of the creator.

assigned_to NotBlankStr | None

Optional assignee agent ID.

estimated_complexity Complexity

Complexity estimate.

budget_limit float

Maximum spend in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

UpdateTaskRequest pydantic-model

Bases: BaseModel

Payload for updating task fields.

All fields are optional -- only provided fields are updated.

Attributes:

Name Type Description
title NotBlankStr | None

New title.

description NotBlankStr | None

New description.

priority Priority | None

New priority.

assigned_to NotBlankStr | None

New assignee.

budget_limit float | None

New budget limit.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

TransitionTaskRequest pydantic-model

Bases: BaseModel

Payload for a task status transition.

Attributes:

Name Type Description
target_status TaskStatus

The desired target status.

assigned_to NotBlankStr | None

Optional assignee override for the transition.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

target_status pydantic-field

target_status

Desired target status

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

CancelTaskRequest pydantic-model

Bases: BaseModel

Payload for cancelling a task.

Attributes:

Name Type Description
reason NotBlankStr

Reason for cancellation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

reason pydantic-field

reason

Reason for cancellation

CreateApprovalRequest pydantic-model

Bases: BaseModel

Payload for creating a new approval item.

Attributes:

Name Type Description
action_type NotBlankStr

Kind of action requiring approval (category:action format).

title NotBlankStr

Short summary.

description NotBlankStr

Detailed explanation.

risk_level ApprovalRiskLevel

Assessed risk level.

ttl_seconds int | None

Optional time-to-live in seconds (min 60, max 604 800 = 7 days).

task_id NotBlankStr | None

Optional associated task.

metadata dict[str, str]

Additional key-value pairs.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_action_type_formataction_type
  • _validate_metadata_bounds

ApproveRequest pydantic-model

Bases: BaseModel

Payload for approving an approval item.

Attributes:

Name Type Description
comment NotBlankStr | None

Optional comment explaining the approval.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

RejectRequest pydantic-model

Bases: BaseModel

Payload for rejecting an approval item.

Attributes:

Name Type Description
reason NotBlankStr

Mandatory reason for rejection.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

CoordinateTaskRequest pydantic-model

Bases: BaseModel

Payload for triggering multi-agent coordination on a task.

Attributes:

Name Type Description
agent_names tuple[NotBlankStr, ...] | None

Agent names to coordinate with (None = all active). When provided, must be non-empty and unique.

max_subtasks int

Maximum subtasks for decomposition.

max_concurrency_per_wave int | None

Override for max concurrency per wave.

fail_fast bool | None

Override for fail-fast behaviour (None = use section config default).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • agent_names (tuple[NotBlankStr, ...] | None)
  • max_subtasks (int)
  • max_concurrency_per_wave (int | None)
  • fail_fast (bool | None)

Validators:

  • _validate_unique_agent_names

agent_names pydantic-field

agent_names = None

Agent names to coordinate with (None = all active)

CoordinationPhaseResponse pydantic-model

Bases: BaseModel

Response model for a single coordination phase.

Attributes:

Name Type Description
phase NotBlankStr

Phase name.

success bool

Whether the phase completed successfully.

duration_seconds float

Wall-clock duration of the phase.

error NotBlankStr | None

Error description if the phase failed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_success_error_consistency

CoordinationResultResponse pydantic-model

Bases: BaseModel

Response model for a complete coordination run.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

topology NotBlankStr

Resolved coordination topology.

total_duration_seconds float

Total wall-clock duration.

total_cost_usd float

Total cost across all waves.

phases tuple[CoordinationPhaseResponse, ...]

Phase results in execution order.

wave_count int

Number of execution waves.

is_success bool

Whether all phases succeeded (computed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

currency pydantic-field

currency = DEFAULT_CURRENCY

ISO 4217 currency code

is_success property

is_success

True when every phase completed successfully.

CreateFromPresetRequest pydantic-model

Bases: BaseModel

Payload for creating a provider from a preset.

Attributes:

Name Type Description
preset_name NotBlankStr

Name of the preset to create from.

name NotBlankStr

Unique provider name (2-64 chars, lowercase + hyphens).

auth_type AuthType | None

Override the preset's default auth type (optional).

subscription_token NotBlankStr | None

Bearer token for subscription-based auth.

tos_accepted bool

Whether the user accepted the subscription ToS.

base_url NotBlankStr | None

Override the preset's default base URL (optional).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_namename
  • _validate_base_urlbase_url

CreateProviderRequest pydantic-model

Bases: BaseModel

Payload for creating a new provider.

Attributes:

Name Type Description
name NotBlankStr

Unique provider name (2-64 chars, lowercase + hyphens).

driver NotBlankStr

Driver backend name (default "litellm").

litellm_provider NotBlankStr | None

LiteLLM routing identifier override.

auth_type AuthType

Authentication mechanism for this provider.

api_key NotBlankStr | None

API key credential (optional, depends on auth_type).

subscription_token NotBlankStr | None

Bearer token for subscription-based auth.

tos_accepted bool

Whether the user accepted the subscription ToS.

base_url NotBlankStr | None

Provider API base URL.

models tuple[ProviderModelConfig, ...]

Pre-configured model definitions.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_namename
  • _validate_base_urlbase_url

DiscoverModelsResponse pydantic-model

Bases: BaseModel

Result of provider model auto-discovery.

Attributes:

Name Type Description
discovered_models tuple[ProviderModelConfig, ...]

Models found on the provider endpoint.

provider_name NotBlankStr

Name of the provider that was queried.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

ProbePresetRequest pydantic-model

Bases: BaseModel

Request to probe a preset's candidate URLs for reachability.

Attributes:

Name Type Description
preset_name NotBlankStr

Preset identifier to probe.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

ProbePresetResponse pydantic-model

Bases: BaseModel

Result of probing a preset's candidate URLs.

Attributes:

Name Type Description
url NotBlankStr | None

The first reachable base URL, or None if none responded.

model_count int

Number of models discovered at the URL.

candidates_tried int

Number of candidate URLs attempted.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • url (NotBlankStr | None)
  • model_count (int)
  • candidates_tried (int)

ProviderResponse pydantic-model

Bases: BaseModel

Safe provider config for API responses -- secrets stripped.

Non-secret auth fields are included for frontend edit form UX. Boolean has_* indicators signal credential presence without exposing values.

Attributes:

Name Type Description
driver NotBlankStr

Driver backend name.

litellm_provider NotBlankStr | None

LiteLLM routing identifier override.

auth_type AuthType

Authentication mechanism.

base_url NotBlankStr | None

Provider API base URL.

models tuple[ProviderModelConfig, ...]

Configured model definitions.

has_api_key bool

Whether an API key is set.

has_oauth_credentials bool

Whether OAuth credentials are configured.

has_custom_header bool

Whether a custom auth header is configured.

has_subscription_token bool

Whether a subscription token is set.

tos_accepted_at str | None

ISO timestamp of ToS acceptance (or None).

preset_name NotBlankStr | None

Preset used to create this provider (if any).

supports_model_pull bool

Whether pulling models is supported.

supports_model_delete bool

Whether deleting models is supported.

supports_model_config bool

Whether per-model config is supported.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

TestConnectionRequest pydantic-model

Bases: BaseModel

Payload for testing a provider connection.

Attributes:

Name Type Description
model NotBlankStr | None

Model to test (defaults to first model in config).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

TestConnectionResponse pydantic-model

Bases: BaseModel

Result of a provider connection test.

Attributes:

Name Type Description
success bool

Whether the connection test succeeded.

latency_ms float | None

Round-trip latency in milliseconds.

error NotBlankStr | None

Error message on failure.

model_tested NotBlankStr | None

Model ID that was tested.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_success_error_consistency

UpdateProviderRequest pydantic-model

Bases: BaseModel

Payload for updating a provider (partial update).

All fields are optional -- only provided fields are updated. tos_accepted: only True re-stamps the timestamp; False and None are no-ops (cannot be retracted).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_base_urlbase_url
  • _validate_credential_clear_consistency

CreateWorkflowDefinitionRequest pydantic-model

Bases: BaseModel

Payload for creating a new workflow definition.

Attributes:

Name Type Description
name NotBlankStr

Workflow name.

description str

Optional description.

workflow_type WorkflowType

Target execution topology.

nodes tuple[dict[str, object], ...]

Nodes in the workflow graph (serialized as dicts).

edges tuple[dict[str, object], ...]

Edges connecting nodes (serialized as dicts).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

name pydantic-field

name

Workflow name

description pydantic-field

description = ''

Description

workflow_type pydantic-field

workflow_type

Target execution topology

nodes pydantic-field

nodes

Workflow nodes

edges pydantic-field

edges

Workflow edges

UpdateWorkflowDefinitionRequest pydantic-model

Bases: BaseModel

Payload for updating an existing workflow definition.

All fields are optional -- only provided fields are updated.

Attributes:

Name Type Description
name NotBlankStr | None

New name.

description str | None

New description.

workflow_type WorkflowType | None

New workflow type.

nodes tuple[dict[str, object], ...] | None

New nodes.

edges tuple[dict[str, object], ...] | None

New edges.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • name (NotBlankStr | None)
  • description (str | None)
  • workflow_type (WorkflowType | None)
  • nodes (tuple[dict[str, object], ...] | None)
  • edges (tuple[dict[str, object], ...] | None)
  • expected_version (int | None)

expected_version pydantic-field

expected_version = None

Optimistic concurrency guard

ActivateWorkflowRequest pydantic-model

Bases: BaseModel

Request body for activating a workflow definition.

Attributes:

Name Type Description
project NotBlankStr

Project ID for all created tasks.

context dict[str, str | int | float | bool | None]

Runtime context for condition expression evaluation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

project pydantic-field

project

Project ID for created tasks

context pydantic-field

context

Runtime context for condition evaluation

to_provider_response

to_provider_response(config)

Convert a ProviderConfig to a safe ProviderResponse.

Strips all secrets and provides boolean credential indicators. Resolves local model management capabilities from the preset when preset_name is set.

Parameters:

Name Type Description Default
config ProviderConfig

Provider configuration (may contain secrets).

required

Returns:

Type Description
ProviderResponse

Safe response DTO with secrets stripped.

Source code in src/synthorg/api/dto_providers.py
def to_provider_response(config: ProviderConfig) -> ProviderResponse:
    """Convert a ProviderConfig to a safe ProviderResponse.

    Strips all secrets and provides boolean credential indicators.
    Resolves local model management capabilities from the preset
    when ``preset_name`` is set.

    Args:
        config: Provider configuration (may contain secrets).

    Returns:
        Safe response DTO with secrets stripped.
    """
    from synthorg.providers.presets import get_preset  # noqa: PLC0415

    tos_str = (
        config.tos_accepted_at.isoformat()
        if config.tos_accepted_at is not None
        else None
    )
    preset = get_preset(config.preset_name) if config.preset_name else None
    return ProviderResponse(
        driver=config.driver,
        litellm_provider=config.litellm_provider,
        auth_type=config.auth_type,
        base_url=config.base_url,
        models=config.models,
        has_api_key=config.api_key is not None,
        has_oauth_credentials=(
            config.oauth_client_id is not None
            and config.oauth_client_secret is not None
            and config.oauth_token_url is not None
        ),
        has_custom_header=(
            config.custom_header_name is not None
            and config.custom_header_value is not None
        ),
        has_subscription_token=config.subscription_token is not None,
        tos_accepted_at=tos_str,
        oauth_token_url=config.oauth_token_url,
        oauth_client_id=config.oauth_client_id,
        oauth_scope=config.oauth_scope,
        custom_header_name=config.custom_header_name,
        preset_name=config.preset_name,
        supports_model_pull=preset.supports_model_pull if preset else False,
        supports_model_delete=preset.supports_model_delete if preset else False,
        supports_model_config=preset.supports_model_config if preset else False,
    )

Errors

errors

API error hierarchy and RFC 9457 error taxonomy.

All API-specific errors inherit from ApiError so callers can catch the entire family with a single except clause.

ErrorCategory and ErrorCode provide machine-readable error metadata for structured error responses (RFC 9457).

ErrorCategory

Bases: StrEnum

High-level error category for structured error responses.

Values are lowercase strings suitable for JSON serialization.

ErrorCode

Bases: IntEnum

Machine-readable error codes (4-digit, category-grouped).

First digit encodes the category: 1xxx = auth, 2xxx = validation, 3xxx = not_found, 4xxx = conflict, 5xxx = rate_limit, 6xxx = budget_exhausted, 7xxx = provider_error, 8xxx = internal.

ApiError

ApiError(message=None, *, status_code=500)

Bases: Exception

Base exception for API-layer errors.

Class Attributes

default_message: Fallback error message used when none is provided and for 5xx response scrubbing. error_category: RFC 9457 error category. error_code: RFC 9457 machine-readable error code. retryable: Whether the client should retry the request.

Instance Attributes

status_code: HTTP status code (set via __init__, fixed per subclass).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None, *, status_code: int = 500) -> None:
    super().__init__(message or self.default_message)
    self.status_code = status_code

__init_subclass__

__init_subclass__(**kwargs)

Validate error_code/error_category consistency at class creation.

Source code in src/synthorg/api/errors.py
def __init_subclass__(cls, **kwargs: object) -> None:
    """Validate error_code/error_category consistency at class creation."""
    super().__init_subclass__(**kwargs)
    prefix = cls.error_code.value // 1000
    expected = _CODE_CATEGORY_PREFIX.get(prefix)
    if expected is not None and cls.error_category != expected:
        msg = (
            f"{cls.__name__}: error_code {cls.error_code.name} "
            f"(prefix {prefix}) implies category {expected.name}, "
            f"but error_category is {cls.error_category.name}"
        )
        raise TypeError(msg)

NotFoundError

NotFoundError(message=None)

Bases: ApiError

Raised when a requested resource does not exist (404).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=404)

ApiValidationError

ApiValidationError(message=None)

Bases: ApiError

Raised when request data fails validation (422).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=422)

ConflictError

ConflictError(message=None)

Bases: ApiError

Raised when a resource conflict occurs (409).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=409)

VersionConflictError

VersionConflictError(message=None)

Bases: ApiError

Raised when an ETag/If-Match version check fails (409).

Used for ETag/If-Match optimistic concurrency checks -- currently on settings endpoints.

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=409)

ForbiddenError

ForbiddenError(message=None)

Bases: ApiError

Raised when access is denied (403).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=403)

SessionRevokedError

SessionRevokedError(message=None)

Bases: ApiError

Raised when a revoked session token is used (401).

Gives clients a distinct error code (SESSION_REVOKED) so they can show a "you were logged out" message instead of a generic auth failure.

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=401)

UnauthorizedError

UnauthorizedError(message=None)

Bases: ApiError

Raised when authentication is required or invalid (401).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=401)

ServiceUnavailableError

ServiceUnavailableError(message=None)

Bases: ApiError

Raised when a required service is not configured (503).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=503)

category_title

category_title(cat)

Return the RFC 9457 title for a category.

Parameters:

Name Type Description Default
cat ErrorCategory

Error category.

required

Returns:

Type Description
str

Human-readable title string.

Source code in src/synthorg/api/errors.py
def category_title(cat: ErrorCategory) -> str:
    """Return the RFC 9457 ``title`` for a category.

    Args:
        cat: Error category.

    Returns:
        Human-readable title string.
    """
    return CATEGORY_TITLES[cat]

category_type_uri

category_type_uri(cat)

Return the RFC 9457 type URI for a category.

Parameters:

Name Type Description Default
cat ErrorCategory

Error category.

required

Returns:

Type Description
str

Documentation URI with fragment anchor for the error category.

Source code in src/synthorg/api/errors.py
def category_type_uri(cat: ErrorCategory) -> str:
    """Return the RFC 9457 ``type`` URI for a category.

    Args:
        cat: Error category.

    Returns:
        Documentation URI with fragment anchor for the error category.
    """
    return f"{_ERROR_DOCS_BASE}#{cat.value}"

Guards

guards

Route guards for access control.

Guards read the authenticated user identity from connection.user (populated by the auth middleware) and check role-based permissions.

The require_roles factory creates guards for arbitrary role sets. Pre-built constants cover common patterns::

require_ceo              -- CEO only
require_ceo_or_manager   -- CEO or Manager
require_approval_roles   -- CEO, Manager, or Board Member

require_ceo module-attribute

require_ceo = require_roles(CEO)

Guard allowing only the CEO role.

require_ceo_or_manager module-attribute

require_ceo_or_manager = require_roles(CEO, MANAGER)

Guard allowing CEO or Manager roles.

require_approval_roles module-attribute

require_approval_roles = require_roles(CEO, MANAGER, BOARD_MEMBER)

Guard allowing roles that can approve or reject actions.

HumanRole

Bases: StrEnum

Recognised human roles for access control.

has_write_role

has_write_role(role)

Return True if the role grants write access.

Use this for inline role checks instead of importing _WRITE_ROLES directly. The write set includes CEO, Manager, and Pair Programmer.

Source code in src/synthorg/api/guards.py
def has_write_role(role: HumanRole) -> bool:
    """Return True if the role grants write access.

    Use this for inline role checks instead of importing ``_WRITE_ROLES``
    directly.  The write set includes CEO, Manager, and Pair Programmer.
    """
    return role in _WRITE_ROLES

require_write_access

require_write_access(connection, _)

Guard that allows only write-capable human roles.

Checks connection.user.role for ceo, manager, or pair_programmer. Board members are excluded (they may only observe and approve). The system role is intentionally excluded -- use require_roles() with the desired roles for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_write_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows only write-capable human roles.

    Checks ``connection.user.role`` for ``ceo``, ``manager``,
    or ``pair_programmer``.  Board members are excluded (they
    may only observe and approve).  The ``system`` role is
    intentionally excluded -- use ``require_roles()`` with the
    desired roles for endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _WRITE_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_write_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Write access denied")

require_read_access

require_read_access(connection, _)

Guard that allows all human roles (excludes SYSTEM).

Checks connection.user.role for any human role including observer and board_member. The internal system role is excluded -- use require_roles() for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_read_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows all human roles (excludes SYSTEM).

    Checks ``connection.user.role`` for any human role
    including ``observer`` and ``board_member``.  The internal
    ``system`` role is excluded -- use ``require_roles()`` for
    endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _READ_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_read_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Read access denied")

require_roles

require_roles(*roles)

Create a guard that allows only the specified roles.

Parameters:

Name Type Description Default
*roles HumanRole

One or more HumanRole members to permit.

()

Returns:

Type Description
Callable[[ASGIConnection, object], None]

A guard function compatible with Litestar's guard protocol.

Raises:

Type Description
ValueError

If no roles are provided.

Source code in src/synthorg/api/guards.py
def require_roles(
    *roles: HumanRole,
) -> Callable[[ASGIConnection, object], None]:  # type: ignore[type-arg]
    """Create a guard that allows only the specified roles.

    Args:
        *roles: One or more ``HumanRole`` members to permit.

    Returns:
        A guard function compatible with Litestar's guard protocol.

    Raises:
        ValueError: If no roles are provided.
    """
    if not roles:
        msg = "require_roles() requires at least one role"
        raise ValueError(msg)

    allowed = frozenset(roles)
    label = ",".join(sorted(r.value for r in allowed))

    def guard(
        connection: ASGIConnection,  # type: ignore[type-arg]
        _: object,
    ) -> None:
        role = _get_role(connection)
        if role not in allowed:
            logger.warning(
                API_GUARD_DENIED,
                guard=f"require_roles({label})",
                role=role,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(detail="Access denied")

    guard.__name__ = f"require_roles({label})"
    guard.__qualname__ = f"require_roles({label})"
    return guard

Middleware

middleware

Request middleware and before-send hooks.

Provides ASGI middleware for request logging, and a before_send hook that injects security headers (CSP, CORP, HSTS, Cache-Control, etc.) into every HTTP response -- including exception-handler and unmatched-route (404/405) responses.

Why before_send instead of ASGI middleware? Litestar's before_send hook wraps the ASGI send callback at the outermost layer (before the middleware stack), so it fires for all responses. By contrast, user-defined ASGI middleware only runs for matched routes -- 404 and 405 responses from the router bypass it.

RequestLoggingMiddleware

RequestLoggingMiddleware(app)

ASGI middleware that logs request start and completion.

Uses time.perf_counter() for high-resolution duration measurement. Only logs HTTP requests (non-HTTP scopes like WebSocket and lifespan are passed through without logging).

Source code in src/synthorg/api/middleware.py
def __init__(self, app: ASGIApp) -> None:
    self.app = app

__call__ async

__call__(scope, receive, send)

Process an ASGI request, logging start and completion.

Source code in src/synthorg/api/middleware.py
async def __call__(
    self,
    scope: Scope,
    receive: Receive,
    send: Send,
) -> None:
    """Process an ASGI request, logging start and completion."""
    if scope["type"] != ScopeType.HTTP:
        await self.app(scope, receive, send)
        return

    request: Request[Any, Any, Any] = Request(scope)
    method = request.method
    path = str(request.url.path)

    bind_correlation_id(request_id=generate_correlation_id())
    logger.info(API_REQUEST_STARTED, method=method, path=path)
    start = time.perf_counter()

    status_code: int | None = None
    original_send = send

    async def capture_send(message: Any) -> None:
        nonlocal status_code
        if (
            isinstance(message, dict)
            and message.get("type") == "http.response.start"
        ):
            raw_status = message.get("status")
            if raw_status is None:
                logger.warning(
                    API_ASGI_MISSING_STATUS,
                    type=message.get("type"),
                )
                status_code = 500
            else:
                status_code = raw_status
        await original_send(message)  # pyright: ignore[reportArgumentType]

    try:
        await self.app(scope, receive, capture_send)
    finally:
        duration_ms = round((time.perf_counter() - start) * 1000, 2)
        _log_request_completion(method, path, status_code, duration_ms)
        clear_correlation_ids()

security_headers_hook async

security_headers_hook(message, scope)

Inject security headers into every HTTP response.

Registered as a Litestar before_send hook so it fires for all HTTP responses -- successful, exception-handler, and router-level 404/405.

Adds static security headers (CORP, HSTS, X-Content-Type-Options, etc.) and path-aware Content-Security-Policy (strict for API, relaxed for /docs/ to allow Scalar UI resources) and Cache-Control (no-store for API, public, max-age=300 for /docs/ since it serves public, non-user-specific content).

Uses __setitem__ (not add) so that if any handler or middleware already set a header, the known-good value overwrites it rather than creating a duplicate.

Parameters:

Name Type Description Default
message Message

ASGI message dict (only http.response.start is processed).

required
scope Scope

ASGI connection scope.

required
Source code in src/synthorg/api/middleware.py
async def security_headers_hook(message: Message, scope: Scope) -> None:
    """Inject security headers into every HTTP response.

    Registered as a Litestar ``before_send`` hook so it fires for
    **all** HTTP responses -- successful, exception-handler, and
    router-level 404/405.

    Adds static security headers (CORP, HSTS, X-Content-Type-Options,
    etc.) and path-aware Content-Security-Policy (strict for API,
    relaxed for ``/docs/`` to allow Scalar UI resources) and
    Cache-Control (``no-store`` for API, ``public, max-age=300``
    for ``/docs/`` since it serves public, non-user-specific content).

    Uses ``__setitem__`` (not ``add``) so that if any handler or
    middleware already set a header, the known-good value overwrites
    it rather than creating a duplicate.

    Args:
        message: ASGI message dict (only ``http.response.start``
            is processed).
        scope: ASGI connection scope.
    """
    if scope.get("type") != ScopeType.HTTP:
        return
    if message.get("type") != "http.response.start":
        return

    headers = MutableScopeHeaders.from_message(message)

    # Static security headers -- overwrite to prevent duplicates.
    for name, value in _SECURITY_HEADERS.items():
        headers[name] = value

    # Path-aware headers
    path: str = scope.get("path", "")
    is_docs = path == "/docs" or path.startswith("/docs/")
    headers["Content-Security-Policy"] = _DOCS_CSP if is_docs else _API_CSP

    # Relax COOP for /docs -- Scalar UI may open cross-origin popups
    # for OAuth/API proxy features via proxy.scalar.com.
    # same-origin-allow-popups: allows the page to open popups but
    # blocks cross-origin pages from retaining an opener reference,
    # preventing XS-Leak side-channel attacks via window.opener.
    # Allow brief caching for docs -- public, non-user-specific content.
    if is_docs:
        headers["Cross-Origin-Opener-Policy"] = "same-origin-allow-popups"
        headers["Cache-Control"] = _DOCS_CACHE_CONTROL

Pagination

pagination

In-memory pagination helper.

Applies offset/limit slicing to tuples and produces PaginationMeta for the response envelope.

PaginationOffset module-attribute

PaginationOffset = Annotated[int, Parameter(ge=0, description='Pagination offset')]

Query parameter type for pagination offset (>= 0).

PaginationLimit module-attribute

PaginationLimit = Annotated[int, Parameter(ge=1, le=MAX_LIMIT, description="Page size")]

Query parameter type for pagination limit (1-200).

paginate

paginate(items, *, offset, limit, total=None)

Slice a tuple and produce pagination metadata.

Clamps offset to [0, len(items)] and limit to [1, MAX_LIMIT] as a safety net.

Parameters:

Name Type Description Default
items tuple[T, ...]

Full collection to paginate.

required
offset int

Zero-based starting index.

required
limit int

Maximum items to return.

required
total int | None

True total count when items has been truncated upstream (e.g. by a safety cap). Defaults to len(items).

None

Returns:

Type Description
tuple[tuple[T, ...], PaginationMeta]

A tuple of (page_items, pagination_meta).

Source code in src/synthorg/api/pagination.py
def paginate[T](
    items: tuple[T, ...],
    *,
    offset: int,
    limit: int,
    total: int | None = None,
) -> tuple[tuple[T, ...], PaginationMeta]:
    """Slice a tuple and produce pagination metadata.

    Clamps ``offset`` to ``[0, len(items)]`` and ``limit`` to
    ``[1, MAX_LIMIT]`` as a safety net.

    Args:
        items: Full collection to paginate.
        offset: Zero-based starting index.
        limit: Maximum items to return.
        total: True total count when *items* has been truncated
            upstream (e.g. by a safety cap).  Defaults to
            ``len(items)``.

    Returns:
        A tuple of (page_items, pagination_meta).
    """
    effective_total = total if total is not None else len(items)
    offset = max(0, min(offset, len(items)))
    limit = max(1, min(limit, MAX_LIMIT))
    page = items[offset : offset + limit]
    meta = PaginationMeta(
        total=effective_total,
        offset=offset,
        limit=limit,
    )
    return page, meta

WebSocket Models

ws_models

WebSocket event models for real-time feeds.

Defines event types and the WsEvent payload that is serialised to JSON and pushed to WebSocket subscribers.

WsEventType

Bases: StrEnum

Types of real-time WebSocket events.

WsEvent pydantic-model

Bases: BaseModel

A real-time event pushed over WebSocket.

Callers must not mutate the payload dict after construction -- the dict is a mutable reference inside a frozen model.

Attributes:

Name Type Description
event_type WsEventType

Classification of the event.

channel NotBlankStr

Target channel name.

timestamp AwareDatetime

When the event occurred.

payload dict[str, object]

Event-specific data.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

event_type pydantic-field

event_type

Event classification

channel pydantic-field

channel

Target channel name

timestamp pydantic-field

timestamp

When the event occurred

payload pydantic-field

payload

Event-specific data

Auth

config

Authentication configuration.

AuthConfig pydantic-model

Bases: BaseModel

JWT and authentication configuration.

The jwt_secret is resolved at application startup via a priority chain:

  1. SYNTHORG_JWT_SECRET environment variable (for multi-instance deployments sharing a common secret).
  2. Previously persisted secret in the settings table.
  3. Auto-generate a new secret and persist it for future runs.

At construction time the secret may be empty -- it is populated before the first request is served.

Attributes:

Name Type Description
jwt_secret str

HMAC signing key for JWT tokens and API key hashing (resolved at startup, repr-hidden). Rotating this invalidates all stored API key hashes.

jwt_algorithm Literal['HS256', 'HS384', 'HS512']

JWT signing algorithm (HMAC family only).

jwt_expiry_minutes int

Token lifetime in minutes.

min_password_length int

Minimum password length for setup/change.

exclude_paths tuple[str, ...] | None

URL paths excluded from auth middleware.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_secret_length

jwt_secret pydantic-field

jwt_secret = ''

JWT signing secret (resolved at startup). Also used as the HMAC key for API key hash computation -- rotating this secret invalidates all stored API key hashes.

jwt_algorithm pydantic-field

jwt_algorithm = 'HS256'

JWT signing algorithm (HMAC family)

jwt_expiry_minutes pydantic-field

jwt_expiry_minutes = 1440

Token lifetime in minutes (default 24h)

min_password_length pydantic-field

min_password_length = 12

Minimum password length for setup and password change

exclude_paths pydantic-field

exclude_paths = None

Regex patterns for paths excluded from authentication. When None (default), paths are auto-derived from the API prefix (health, auth/setup, auth/login, docs, scalar UI). Use ^ to anchor at the start of the path and add $ when an exact match (rather than a prefix match) is required.

with_secret

with_secret(secret)

Return a copy with the JWT secret set.

Parameters:

Name Type Description Default
secret str

Resolved JWT signing secret.

required

Returns:

Type Description
AuthConfig

New AuthConfig with the secret populated.

Raises:

Type Description
ValueError

If the secret is too short.

Source code in src/synthorg/api/auth/config.py
def with_secret(self, secret: str) -> AuthConfig:
    """Return a copy with the JWT secret set.

    Args:
        secret: Resolved JWT signing secret.

    Returns:
        New ``AuthConfig`` with the secret populated.

    Raises:
        ValueError: If the secret is too short.
    """
    _require_valid_secret(secret)
    return self.model_copy(update={"jwt_secret": secret})

models

Authentication domain models.

AuthMethod

Bases: StrEnum

Authentication method used for a request.

User pydantic-model

Bases: BaseModel

Persisted user account.

Attributes:

Name Type Description
id NotBlankStr

Unique user identifier.

username NotBlankStr

Login username.

password_hash str

Argon2id hash (excluded from repr).

role HumanRole

Access control role.

must_change_password bool

Whether the user must change password.

created_at AwareDatetime

Account creation timestamp.

updated_at AwareDatetime

Last modification timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • id (NotBlankStr)
  • username (NotBlankStr)
  • password_hash (str)
  • role (HumanRole)
  • must_change_password (bool)
  • created_at (AwareDatetime)
  • updated_at (AwareDatetime)

ApiKey pydantic-model

Bases: BaseModel

Persisted API key (hash-only storage).

Attributes:

Name Type Description
id NotBlankStr

Unique key identifier (UUID).

key_hash NotBlankStr

HMAC-SHA256 hex digest of the raw key.

name NotBlankStr

Human-readable label.

role HumanRole

Access control role.

user_id NotBlankStr

Owner user ID.

created_at AwareDatetime

Key creation timestamp (timezone-aware).

expires_at AwareDatetime | None

Optional expiry timestamp (timezone-aware).

revoked bool

Whether the key has been revoked.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

AuthenticatedUser pydantic-model

Bases: BaseModel

Lightweight identity attached to connection.user.

Populated by the auth middleware after successful authentication.

Attributes:

Name Type Description
user_id NotBlankStr

User's unique identifier.

username NotBlankStr

User's login name.

role HumanRole

Access control role.

auth_method AuthMethod

How the user authenticated.

must_change_password bool

Whether forced password change is pending.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

service

Authentication service -- password hashing, JWT ops, API key hashing.

SecretNotConfiguredError

Bases: RuntimeError

Raised when the JWT secret is required but not configured.

AuthService

AuthService(config)

Immutable authentication operations.

Parameters:

Name Type Description Default
config AuthConfig

Authentication configuration (carries JWT secret).

required
Source code in src/synthorg/api/auth/service.py
def __init__(self, config: AuthConfig) -> None:
    self._config = config

hash_password

hash_password(password)

Hash a password with Argon2id.

Parameters:

Name Type Description Default
password str

Plaintext password.

required

Returns:

Type Description
str

Argon2id hash string.

Source code in src/synthorg/api/auth/service.py
def hash_password(self, password: str) -> str:
    """Hash a password with Argon2id.

    Args:
        password: Plaintext password.

    Returns:
        Argon2id hash string.
    """
    return _hasher.hash(password)

verify_password

verify_password(password, password_hash)

Verify a password against an Argon2id hash.

Parameters:

Name Type Description Default
password str

Plaintext password to check.

required
password_hash str

Stored Argon2id hash.

required

Returns:

Type Description
bool

True if the password matches.

Raises:

Type Description
VerificationError

On non-mismatch verification failures (e.g. unsupported parameters).

InvalidHashError

If the stored hash is corrupted or malformed (data integrity issue).

Source code in src/synthorg/api/auth/service.py
def verify_password(self, password: str, password_hash: str) -> bool:
    """Verify a password against an Argon2id hash.

    Args:
        password: Plaintext password to check.
        password_hash: Stored Argon2id hash.

    Returns:
        ``True`` if the password matches.

    Raises:
        argon2.exceptions.VerificationError: On non-mismatch
            verification failures (e.g. unsupported parameters).
        argon2.exceptions.InvalidHashError: If the stored hash
            is corrupted or malformed (data integrity issue).
    """
    try:
        return _hasher.verify(password_hash, password)
    except argon2.exceptions.VerifyMismatchError:
        return False
    except argon2.exceptions.VerificationError:
        logger.warning(
            API_AUTH_FAILED,
            reason="hash_verification_error",
            exc_info=True,
        )
        raise
    except argon2.exceptions.InvalidHashError:
        logger.error(
            API_AUTH_FAILED,
            reason="invalid_hash_data_corruption",
            exc_info=True,
        )
        raise

hash_password_async async

hash_password_async(password)

Hash a password with Argon2id in a thread executor.

Offloads the CPU-intensive hashing to avoid blocking the event loop.

Parameters:

Name Type Description Default
password str

Plaintext password.

required

Returns:

Type Description
str

Argon2id hash string.

Source code in src/synthorg/api/auth/service.py
async def hash_password_async(self, password: str) -> str:
    """Hash a password with Argon2id in a thread executor.

    Offloads the CPU-intensive hashing to avoid blocking the
    event loop.

    Args:
        password: Plaintext password.

    Returns:
        Argon2id hash string.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, self.hash_password, password)

verify_password_async async

verify_password_async(password, password_hash)

Verify a password against an Argon2id hash in a thread executor.

Offloads the CPU-intensive verification to avoid blocking the event loop.

Parameters:

Name Type Description Default
password str

Plaintext password to check.

required
password_hash str

Stored Argon2id hash.

required

Returns:

Type Description
bool

True if the password matches.

Source code in src/synthorg/api/auth/service.py
async def verify_password_async(
    self,
    password: str,
    password_hash: str,
) -> bool:
    """Verify a password against an Argon2id hash in a thread executor.

    Offloads the CPU-intensive verification to avoid blocking the
    event loop.

    Args:
        password: Plaintext password to check.
        password_hash: Stored Argon2id hash.

    Returns:
        ``True`` if the password matches.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None, self.verify_password, password, password_hash
    )

create_token

create_token(user)

Create a JWT for the given user.

The token includes a pwd_sig claim -- a 16-character truncated SHA-256 of the stored password hash. This is plain SHA-256, not HMAC -- the password hash is already a high-entropy Argon2id output, and the claim is protected by the JWT signature. The auth middleware validates this claim on every request so that tokens issued before a password change are automatically rejected.

A jti (JWT ID) claim is included for per-token session tracking and revocation.

Parameters:

Name Type Description Default
user User

Authenticated user.

required

Returns:

Type Description
tuple[str, int, str]

Tuple of (encoded JWT, expiry seconds, session ID).

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

Source code in src/synthorg/api/auth/service.py
def create_token(
    self,
    user: User,
) -> tuple[str, int, str]:
    """Create a JWT for the given user.

    The token includes a ``pwd_sig`` claim -- a 16-character
    truncated SHA-256 of the stored password hash.  This is
    plain SHA-256, not HMAC -- the password hash is already a
    high-entropy Argon2id output, and the claim is protected
    by the JWT signature.  The auth middleware validates this
    claim on every request so that tokens issued before a
    password change are automatically rejected.

    A ``jti`` (JWT ID) claim is included for per-token session
    tracking and revocation.

    Args:
        user: Authenticated user.

    Returns:
        Tuple of (encoded JWT, expiry seconds, session ID).

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
    """
    secret = self._require_secret("create_token")
    now = datetime.now(UTC)
    expiry_seconds = self._config.jwt_expiry_minutes * 60
    session_id = uuid.uuid4().hex
    pwd_sig = hashlib.sha256(
        user.password_hash.encode(),
    ).hexdigest()[:16]
    payload: dict[str, Any] = {
        "sub": user.id,
        "username": user.username,
        "role": user.role.value,
        "must_change_password": user.must_change_password,
        "pwd_sig": pwd_sig,
        "jti": session_id,
        "iat": now,
        "exp": now + timedelta(seconds=expiry_seconds),
    }
    token = jwt.encode(
        payload,
        secret,
        algorithm=self._config.jwt_algorithm,
    )
    return token, expiry_seconds, session_id

decode_token

decode_token(token)

Decode and validate a JWT.

Audience (aud) verification is intentionally disabled here (verify_aud=False) because audience validation is performed per-role in the auth middleware's _resolve_jwt_user. System-user tokens require aud=synthorg-backend; regular user tokens omit aud.

Parameters:

Name Type Description Default
token str

Encoded JWT string.

required

Returns:

Type Description
dict[str, Any]

Decoded claims dictionary.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

InvalidTokenError

If the token is invalid or expired.

Source code in src/synthorg/api/auth/service.py
def decode_token(self, token: str) -> dict[str, Any]:
    """Decode and validate a JWT.

    Audience (``aud``) verification is intentionally disabled
    here (``verify_aud=False``) because audience validation is
    performed per-role in the auth middleware's
    ``_resolve_jwt_user``.  System-user tokens require
    ``aud=synthorg-backend``; regular user tokens omit ``aud``.

    Args:
        token: Encoded JWT string.

    Returns:
        Decoded claims dictionary.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
        jwt.InvalidTokenError: If the token is invalid or expired.
    """
    secret = self._require_secret("decode_token")
    return jwt.decode(
        token,
        secret,
        algorithms=[self._config.jwt_algorithm],
        options={"require": ["exp", "iat", "sub", "jti"], "verify_aud": False},
    )

hash_api_key

hash_api_key(raw_key)

Compute HMAC-SHA256 hex digest of a raw API key.

Uses the server-side JWT secret as the HMAC key so that an attacker with read access to stored hashes cannot brute-force API keys offline.

Parameters:

Name Type Description Default
raw_key str

The plaintext API key.

required

Returns:

Type Description
str

Lowercase hex digest.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

Source code in src/synthorg/api/auth/service.py
def hash_api_key(self, raw_key: str) -> str:
    """Compute HMAC-SHA256 hex digest of a raw API key.

    Uses the server-side JWT secret as the HMAC key so that
    an attacker with read access to stored hashes cannot
    brute-force API keys offline.

    Args:
        raw_key: The plaintext API key.

    Returns:
        Lowercase hex digest.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
    """
    secret = self._require_secret("hash_api_key")
    return hmac.digest(
        secret.encode(),
        raw_key.encode(),
        "sha256",
    ).hex()

generate_api_key staticmethod

generate_api_key()

Generate a cryptographically secure API key.

Returns:

Type Description
str

URL-safe base64 string (43 chars).

Source code in src/synthorg/api/auth/service.py
@staticmethod
def generate_api_key() -> str:
    """Generate a cryptographically secure API key.

    Returns:
        URL-safe base64 string (43 chars).
    """
    return secrets.token_urlsafe(32)

middleware

JWT + API key authentication middleware.

ApiAuthMiddleware

Bases: AbstractAuthenticationMiddleware

Authenticate requests via JWT or API key.

Reads Authorization: Bearer <token> from the request. Tokens containing . are treated exclusively as JWTs. Tokens without dots are tried as API keys via HMAC-SHA256 hash lookup.

Requires auth_service, persistence backend on app.state["app_state"].

authenticate_request async

authenticate_request(connection)

Validate the Authorization header.

Parameters:

Name Type Description Default
connection ASGIConnection[Any, Any, Any, Any]

Incoming ASGI connection.

required

Returns:

Type Description
AuthenticationResult

AuthenticationResult with AuthenticatedUser.

Raises:

Type Description
NotAuthorizedException

If authentication fails.

Source code in src/synthorg/api/auth/middleware.py
async def authenticate_request(
    self,
    connection: ASGIConnection[Any, Any, Any, Any],
) -> AuthenticationResult:
    """Validate the Authorization header.

    Args:
        connection: Incoming ASGI connection.

    Returns:
        AuthenticationResult with AuthenticatedUser.

    Raises:
        NotAuthorizedException: If authentication fails.
    """
    token = _validate_auth_header(connection)
    app_state = connection.app.state["app_state"]
    auth_service: AuthService = app_state.auth_service
    path = str(connection.url.path)

    if "." in token:
        user = await _try_jwt_auth(
            token,
            auth_service,
            app_state,
            path,
        )
        if user is not None:
            return AuthenticationResult(user=user, auth=token)
        raise NotAuthorizedException(detail="Invalid JWT token")

    user = await _try_api_key_auth(
        token,
        auth_service,
        app_state,
        path,
    )
    if user is not None:
        return AuthenticationResult(user=user, auth=token)
    raise NotAuthorizedException(detail="Invalid credentials")

create_auth_middleware_class

create_auth_middleware_class(auth_config)

Create a middleware class with excluded paths baked in.

Litestar's AbstractAuthenticationMiddleware.__init__ takes exclude as a parameter (default None). We create a subclass whose __init__ forwards the configured exclude list to super().__init__.

The middleware is restricted to ScopeType.HTTP only -- WebSocket connections use ticket-based auth handled entirely inside the WS handler (see controllers/ws.py).

Parameters:

Name Type Description Default
auth_config AuthConfig

Auth configuration with exclude_paths.

required

Returns:

Type Description
type[ApiAuthMiddleware]

Middleware class ready for use in the Litestar middleware stack.

Source code in src/synthorg/api/auth/middleware.py
def create_auth_middleware_class(
    auth_config: AuthConfig,
) -> type[ApiAuthMiddleware]:
    """Create a middleware class with excluded paths baked in.

    Litestar's ``AbstractAuthenticationMiddleware.__init__`` takes
    ``exclude`` as a parameter (default ``None``).  We create a
    subclass whose ``__init__`` forwards the configured exclude
    list to ``super().__init__``.

    The middleware is restricted to ``ScopeType.HTTP`` only --
    WebSocket connections use ticket-based auth handled entirely
    inside the WS handler (see ``controllers/ws.py``).

    Args:
        auth_config: Auth configuration with exclude_paths.

    Returns:
        Middleware class ready for use in the Litestar middleware stack.
    """
    exclude_paths = (
        list(auth_config.exclude_paths) if auth_config.exclude_paths else None
    )

    class ConfiguredAuthMiddleware(ApiAuthMiddleware):
        """Auth middleware with pre-configured exclude paths."""

        def __init__(self, app: Any) -> None:
            super().__init__(
                app,
                exclude=exclude_paths,
                scopes={ScopeType.HTTP},
            )

    return ConfiguredAuthMiddleware