Skip to content

API Layer

Litestar REST + WebSocket API -- controllers, authentication, guards, and channels.

App

app

Litestar application factory.

Creates and configures the Litestar application with all controllers, middleware, exception handlers, plugins, and lifecycle hooks (startup/shutdown).

create_app

create_app(
    *,
    config=None,
    persistence=None,
    message_bus=None,
    cost_tracker=None,
    approval_store=None,
    auth_service=None,
    task_engine=None,
    coordinator=None,
    agent_registry=None,
    meeting_orchestrator=None,
    meeting_scheduler=None,
    performance_tracker=None,
    settings_service=None,
    provider_registry=None,
    provider_health_tracker=None,
    tool_invocation_tracker=None,
    delegation_record_store=None,
    artifact_storage=None,
    audit_log=None,
    trust_service=None,
    coordination_metrics_store=None,
    training_service=None,
    event_stream_hub=None,
    interrupt_store=None,
    _skip_lifecycle_shutdown=False,
)

Create and configure the Litestar application.

All parameters are optional for testing -- provide fakes via keyword arguments. Services not explicitly provided are auto-wired from config and environment variables.

Parameters:

Name Type Description Default
config RootConfig | None

Root company configuration.

None
persistence PersistenceBackend | None

Persistence backend.

None
message_bus MessageBus | None

Internal message bus.

None
cost_tracker CostTracker | None

Cost tracking service.

None
approval_store ApprovalStoreProtocol | None

Approval queue store.

None
auth_service AuthService | None

Pre-built auth service (for testing).

None
task_engine TaskEngine | None

Centralized task state engine.

None
coordinator MultiAgentCoordinator | None

Multi-agent coordinator.

None
agent_registry AgentRegistryService | None

Agent registry service.

None
meeting_orchestrator MeetingOrchestrator | None

Meeting orchestrator.

None
meeting_scheduler MeetingScheduler | None

Meeting scheduler.

None
performance_tracker PerformanceTracker | None

Performance tracking service.

None
settings_service SettingsService | None

Settings service for runtime config.

None
provider_registry ProviderRegistry | None

Provider registry.

None
provider_health_tracker ProviderHealthTracker | None

Provider health tracking service.

None
tool_invocation_tracker ToolInvocationTracker | None

Tool invocation tracking service.

None
delegation_record_store DelegationRecordStore | None

Delegation record store.

None
artifact_storage ArtifactStorageBackend | None

Artifact storage backend.

None
audit_log AuditLog | None

Pre-built audit log (auto-wired if None).

None
trust_service TrustService | None

Pre-built trust service.

None
coordination_metrics_store CoordinationMetricsStore | None

Pre-built metrics store (auto-wired if None).

None
training_service TrainingService | None

Pre-built training service (auto-wired in startup if None and dependencies are available).

None
event_stream_hub EventStreamHub | None

Pre-built event stream hub (auto-created if None).

None
interrupt_store InterruptStore | None

Pre-built interrupt store (auto-created if None).

None
_skip_lifecycle_shutdown bool

Test-only flag. When True, the Litestar app is built with an empty on_shutdown list so the lifespan exit is a no-op. Used by the session-scoped test fixture in tests/unit/api/conftest.py to reuse the same app across tests without tearing down the task engine, message bus, and persistence between each one. Never use in production: shutdown hooks perform critical cleanup (task-engine drain, persistence disconnect, health prober stop, etc.).

False

Returns:

Type Description
Litestar

Configured Litestar application.

Source code in src/synthorg/api/app.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
def create_app(  # noqa: C901, PLR0912, PLR0913, PLR0915
    *,
    config: RootConfig | None = None,
    persistence: PersistenceBackend | None = None,
    message_bus: MessageBus | None = None,
    cost_tracker: CostTracker | None = None,
    approval_store: ApprovalStoreProtocol | None = None,
    auth_service: AuthService | None = None,
    task_engine: TaskEngine | None = None,
    coordinator: MultiAgentCoordinator | None = None,
    agent_registry: AgentRegistryService | None = None,
    meeting_orchestrator: MeetingOrchestrator | None = None,
    meeting_scheduler: MeetingScheduler | None = None,
    performance_tracker: PerformanceTracker | None = None,
    settings_service: SettingsService | None = None,
    provider_registry: ProviderRegistry | None = None,
    provider_health_tracker: ProviderHealthTracker | None = None,
    tool_invocation_tracker: ToolInvocationTracker | None = None,
    delegation_record_store: DelegationRecordStore | None = None,
    artifact_storage: ArtifactStorageBackend | None = None,
    audit_log: AuditLog | None = None,
    trust_service: TrustService | None = None,
    coordination_metrics_store: CoordinationMetricsStore | None = None,
    training_service: TrainingService | None = None,
    event_stream_hub: EventStreamHub | None = None,
    interrupt_store: InterruptStore | None = None,
    _skip_lifecycle_shutdown: bool = False,
) -> Litestar:
    """Create and configure the Litestar application.

    All parameters are optional for testing -- provide fakes via
    keyword arguments.  Services not explicitly provided are
    auto-wired from config and environment variables.

    Args:
        config: Root company configuration.
        persistence: Persistence backend.
        message_bus: Internal message bus.
        cost_tracker: Cost tracking service.
        approval_store: Approval queue store.
        auth_service: Pre-built auth service (for testing).
        task_engine: Centralized task state engine.
        coordinator: Multi-agent coordinator.
        agent_registry: Agent registry service.
        meeting_orchestrator: Meeting orchestrator.
        meeting_scheduler: Meeting scheduler.
        performance_tracker: Performance tracking service.
        settings_service: Settings service for runtime config.
        provider_registry: Provider registry.
        provider_health_tracker: Provider health tracking service.
        tool_invocation_tracker: Tool invocation tracking service.
        delegation_record_store: Delegation record store.
        artifact_storage: Artifact storage backend.
        audit_log: Pre-built audit log (auto-wired if None).
        trust_service: Pre-built trust service.
        coordination_metrics_store: Pre-built metrics store
            (auto-wired if None).
        training_service: Pre-built training service (auto-wired
            in startup if None and dependencies are available).
        event_stream_hub: Pre-built event stream hub (auto-created
            if None).
        interrupt_store: Pre-built interrupt store (auto-created
            if None).
        _skip_lifecycle_shutdown: Test-only flag.  When ``True``, the
            Litestar app is built with an empty ``on_shutdown`` list so
            the lifespan exit is a no-op.  Used by the session-scoped
            test fixture in ``tests/unit/api/conftest.py`` to reuse the
            same app across tests without tearing down the task engine,
            message bus, and persistence between each one.  Never use
            in production: shutdown hooks perform critical cleanup
            (task-engine drain, persistence disconnect, health prober
            stop, etc.).

    Returns:
        Configured Litestar application.
    """
    effective_config = config or RootConfig(company_name="default")

    # Activate the structured logging pipeline before any
    # other setup so that auto-wiring, persistence, and bus logs all
    # flow through the configured sinks.  Respects SYNTHORG_LOG_DIR
    # env var for Docker log directory override.
    try:
        effective_config = _bootstrap_app_logging(effective_config)
    except Exception as exc:
        print(  # noqa: T201
            f"CRITICAL: Failed to initialise logging pipeline: {exc}. "
            "Check SYNTHORG_LOG_DIR, SYNTHORG_LOG_LEVEL, and the "
            "'logging' section of your config file.",
            file=sys.stderr,
            flush=True,
        )
        raise

    api_config = effective_config.api

    # Resolve runtime paths for backup service wiring.
    resolved_db_path: Path | None = None
    resolved_config_path_str = (os.environ.get("SYNTHORG_CONFIG_PATH") or "").strip()
    resolved_config_path: Path | None = (
        Path(resolved_config_path_str) if resolved_config_path_str else None
    )

    # Read persistence env vars unconditionally so downstream code
    # (e.g. the secret-backend gate below) can still observe which
    # environment choice won, even when ``persistence`` was injected
    # by the caller rather than auto-wired here.
    db_url = (os.environ.get("SYNTHORG_DATABASE_URL") or "").strip()
    db_path = (os.environ.get("SYNTHORG_DB_PATH") or "").strip()

    # Auto-wire persistence from CLI-provided env vars. The CLI compose
    # template sets ONE of these per init choice:
    #   * SYNTHORG_DATABASE_URL=postgresql://user:pass@host:port/db   (postgres)
    #   * SYNTHORG_DB_PATH=/data/synthorg.db                          (sqlite)
    # Postgres takes precedence so a half-converted state (both env
    # vars present) does not silently fall back to SQLite. The startup
    # lifecycle handles connect() + migrate() + auth service creation.
    if persistence is None:
        if db_url:
            try:
                pg_config = _postgres_config_from_url(db_url)
                persistence = create_backend(
                    PersistenceConfig(backend="postgres", postgres=pg_config),
                )
            except MemoryError, RecursionError:
                raise
            except Exception:
                logger.exception(
                    API_APP_STARTUP,
                    error="Postgres persistence creation failed",
                )
                raise
            logger.info(
                API_APP_STARTUP,
                note="Auto-wired Postgres persistence from SYNTHORG_DATABASE_URL",
                host=pg_config.host,
                database=pg_config.database,
            )
            # Postgres has no on-disk artifact directory tied to the DB
            # path, so default artifact storage to /data (the standard
            # data volume in the CLI compose template) when not set.
            if artifact_storage is None:
                artifact_dir_str = _resolve_artifact_dir_env()
                artifact_storage = FileSystemArtifactStorage(
                    data_dir=Path(artifact_dir_str),
                )
                logger.info(
                    API_APP_STARTUP,
                    note="Auto-wired filesystem artifact storage (postgres mode)",
                    data_dir=artifact_dir_str,
                )
        elif db_path:
            resolved_db_path = Path(db_path)
            try:
                persistence = create_backend(
                    PersistenceConfig(sqlite=SQLiteConfig(path=db_path)),
                )
            except MemoryError, RecursionError:
                raise
            except Exception:
                logger.exception(
                    API_APP_STARTUP,
                    error="Failed to create persistence backend from env",
                )
                raise
            logger.info(
                API_APP_STARTUP,
                note="Auto-wired SQLite persistence from SYNTHORG_DB_PATH",
                db_name=Path(db_path).name,
            )
            # Auto-wire artifact storage from the same data directory.
            if artifact_storage is None:
                artifact_storage = FileSystemArtifactStorage(
                    data_dir=resolved_db_path.parent,
                )
                logger.info(
                    API_APP_STARTUP,
                    note="Auto-wired filesystem artifact storage",
                )

    # ── Phase 1 auto-wire: services that don't need connected persistence ──
    phase1 = auto_wire_phase1(
        effective_config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        cost_tracker=cost_tracker,
        task_engine=task_engine,
        provider_registry=provider_registry,
        provider_health_tracker=provider_health_tracker,
    )
    message_bus = phase1.message_bus
    cost_tracker = phase1.cost_tracker
    task_engine = phase1.task_engine
    provider_registry = phase1.provider_registry
    provider_health_tracker = phase1.provider_health_tracker
    distributed_task_queue = phase1.distributed_task_queue

    # ── Meeting auto-wire: orchestrator + scheduler (Phase 1 level) ──
    meeting_wire = auto_wire_meetings(
        effective_config=effective_config,
        meeting_orchestrator=meeting_orchestrator,
        meeting_scheduler=meeting_scheduler,
        agent_registry=agent_registry,
        provider_registry=provider_registry,
    )
    meeting_orchestrator = meeting_wire.meeting_orchestrator
    meeting_scheduler = meeting_wire.meeting_scheduler
    ceremony_scheduler = meeting_wire.ceremony_scheduler

    channels_plugin = create_channels_plugin()
    expire_callback = _make_expire_callback(channels_plugin)
    effective_approval_store: ApprovalStoreProtocol = (
        approval_store
        if approval_store is not None
        else ApprovalStore(on_expire=expire_callback)
    )

    # Wire meeting event publisher to the meetings WS channel.
    if meeting_scheduler is not None and meeting_scheduler._event_publisher is None:  # noqa: SLF001
        meeting_scheduler._event_publisher = _make_meeting_publisher(  # noqa: SLF001
            channels_plugin,
        )

    # Auto-wire performance tracker with composite quality strategy
    # when not explicitly injected (production path).
    if performance_tracker is None:
        performance_tracker = _build_performance_tracker(
            cost_tracker=cost_tracker,
            provider_registry=provider_registry,
            perf_config=effective_config.performance,
        )

    # Construct the agent registry without versioning here.  The versioning
    # service requires a *connected* persistence backend, but
    # ``persistence.identity_versions`` is only available after
    # ``persistence.connect()`` runs inside ``_safe_startup()``.  The
    # registry is auto-wired with ``VersioningService[AgentIdentity]`` from
    # the startup hook (see ``on_startup`` in ``_build_lifecycle``)
    # so every register/update/evolve call produces an audited
    # ``VersionSnapshot`` in production.
    if agent_registry is None:
        agent_registry = AgentRegistryService()
        logger.info(API_SERVICE_AUTO_WIRED, service="agent_registry")

    notification_dispatcher = build_notification_dispatcher(
        effective_config.notifications,
    )

    # -- Integration services auto-wire ──────────────────────────────────
    integrations = auto_wire_integrations(
        effective_config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        api_config=api_config,
        ceremony_scheduler=ceremony_scheduler,
        db_url=db_url,
        resolved_db_path=resolved_db_path,
    )
    connection_catalog = integrations.connection_catalog
    oauth_token_manager = integrations.oauth_token_manager
    health_prober_service = integrations.health_prober_service
    tunnel_provider = integrations.tunnel_provider
    webhook_event_bridge = integrations.webhook_event_bridge
    mcp_catalog_service = integrations.mcp_catalog_service
    mcp_installations_repo = integrations.mcp_installations_repo

    # Auto-wire control-plane services when not injected.
    if audit_log is None:
        audit_log = AuditLog()
    if coordination_metrics_store is None:
        coordination_metrics_store = CoordinationMetricsStore()
    if trust_service is None:
        trust_service = _build_default_trust_service()

    app_state = AppState(
        config=effective_config,
        persistence=persistence,
        message_bus=message_bus,
        cost_tracker=cost_tracker,
        approval_store=effective_approval_store,
        auth_service=auth_service,
        task_engine=task_engine,
        coordinator=coordinator,
        agent_registry=agent_registry,
        meeting_orchestrator=meeting_orchestrator,
        meeting_scheduler=meeting_scheduler,
        ceremony_scheduler=ceremony_scheduler,
        performance_tracker=performance_tracker,
        settings_service=settings_service,
        provider_registry=provider_registry,
        provider_health_tracker=provider_health_tracker,
        tool_invocation_tracker=tool_invocation_tracker,
        delegation_record_store=delegation_record_store,
        artifact_storage=artifact_storage,
        notification_dispatcher=notification_dispatcher,
        audit_log=audit_log,
        trust_service=trust_service,
        coordination_metrics_store=coordination_metrics_store,
        event_stream_hub=event_stream_hub or EventStreamHub(),
        interrupt_store=interrupt_store or InterruptStore(),
        connection_catalog=connection_catalog,
        oauth_token_manager=oauth_token_manager,
        health_prober_service=health_prober_service,
        tunnel_provider=tunnel_provider,
        webhook_event_bridge=webhook_event_bridge,
        mcp_catalog_service=mcp_catalog_service,
        mcp_installations_repo=mcp_installations_repo,
        training_service=training_service,
        startup_time=time.monotonic(),
    )
    if distributed_task_queue is not None:
        app_state.set_distributed_task_queue(distributed_task_queue)

    # Human escalation approval queue (#1418).  Builds the pluggable
    # store + processor + Future registry and attaches them to
    # ``AppState`` so the escalations controller and the
    # ``HumanEscalationResolver`` share a single instance.
    escalation_config = effective_config.communication.conflict_resolution.escalation
    _escalation_store = build_escalation_queue_store(
        escalation_config,
        persistence,
    )
    app_state.set_escalation_store(_escalation_store)
    app_state.set_escalation_processor(build_decision_processor(escalation_config))
    _escalation_registry = PendingFuturesRegistry()
    app_state.set_escalation_registry(_escalation_registry)
    app_state.set_escalation_sweeper(
        EscalationExpirationSweeper(
            _escalation_store,
            interval_seconds=escalation_config.sweeper_interval_seconds,
        ),
    )
    # Cross-instance wake-up subscriber (#1418).  No-op unless the
    # queue backend is Postgres and ``cross_instance_notify`` is
    # enabled; otherwise the sweeper and per-resolver timeout cover
    # eventual consistency on their own.
    app_state.set_escalation_notify_subscriber(
        build_escalation_notify_subscriber(
            escalation_config,
            _escalation_store,
            _escalation_registry,
        ),
    )

    bridge = (
        MessageBusBridge(
            message_bus,
            channels_plugin,
            config_resolver=(
                app_state.config_resolver if app_state.has_config_resolver else None
            ),
        )
        if message_bus is not None
        else None
    )
    backup_service = build_backup_service(
        effective_config,
        resolved_db_path=resolved_db_path,
        resolved_config_path=resolved_config_path,
    )
    settings_dispatcher = _build_settings_dispatcher(
        message_bus,
        settings_service,
        effective_config,
        app_state,
        backup_service,
    )
    plugins: list[ChannelsPlugin] = [channels_plugin]
    middleware = _build_middleware(
        api_config,
        a2a_enabled=effective_config.a2a.enabled,
    )

    # Integration controllers add ~20 routes (~0.7s of Litestar
    # registration per create_app). Skip them entirely when the
    # integrations subsystem is disabled, so unit tests that do not
    # exercise integration endpoints pay no registration cost.
    #
    # When enabled, gate each controller by its own collaborators
    # instead of a single boolean. ``MCPCatalogController`` only
    # needs ``mcp_catalog_service``; ``WebhooksController`` needs a
    # bus; ``TunnelController`` needs ``tunnel_provider``. A single
    # global gate either under-exposes controllers that are ready
    # or over-exposes ones whose dependencies failed to auto-wire.
    integration_controllers: tuple[type[Controller], ...] = ()
    if effective_config.integrations.enabled:
        from synthorg.api.controllers.connections import (  # noqa: PLC0415
            ConnectionsController,
        )
        from synthorg.api.controllers.integration_health import (  # noqa: PLC0415
            IntegrationHealthController,
        )
        from synthorg.api.controllers.mcp_catalog import (  # noqa: PLC0415
            MCPCatalogController,
        )
        from synthorg.api.controllers.oauth import OAuthController  # noqa: PLC0415
        from synthorg.api.controllers.tunnel import (  # noqa: PLC0415
            TunnelController,
        )
        from synthorg.api.controllers.webhooks import (  # noqa: PLC0415
            WebhooksController,
        )

        controller_readiness: tuple[
            tuple[type[Controller], tuple[tuple[str, object], ...]], ...
        ] = (
            (
                ConnectionsController,
                (("connection_catalog", connection_catalog),),
            ),
            (
                IntegrationHealthController,
                (("connection_catalog", connection_catalog),),
            ),
            (
                OAuthController,
                (
                    ("connection_catalog", connection_catalog),
                    ("persistence", persistence),
                ),
            ),
            (
                WebhooksController,
                (
                    ("connection_catalog", connection_catalog),
                    ("message_bus", message_bus),
                ),
            ),
            (
                MCPCatalogController,
                (("mcp_catalog_service", mcp_catalog_service),),
            ),
            (
                TunnelController,
                (("tunnel_provider", tunnel_provider),),
            ),
        )
        ready: list[type[Controller]] = []
        for controller_cls, deps in controller_readiness:
            missing = [name for name, value in deps if value is None]
            if missing:
                logger.warning(
                    API_APP_STARTUP,
                    note="skipping integration controller (missing deps)",
                    controller=controller_cls.__name__,
                    missing=missing,
                )
                continue
            ready.append(controller_cls)
        integration_controllers = tuple(ready)

    # ── A2A gateway auto-wire ─────────────────────────────────────
    a2a_controllers: tuple[type[Controller], ...] = ()
    a2a_root_controllers: tuple[type[Controller], ...] = ()
    if effective_config.a2a.enabled:
        try:
            from synthorg.a2a.agent_card import (  # noqa: PLC0415
                AgentCardBuilder,
            )
            from synthorg.a2a.models import A2AAuthSchemeInfo  # noqa: PLC0415
            from synthorg.a2a.well_known import (  # noqa: PLC0415
                WellKnownAgentCardController,
            )

            auth_schemes = (
                A2AAuthSchemeInfo(
                    scheme=str(
                        effective_config.a2a.auth.inbound_scheme,
                    ),
                ),
            )
            card_builder = AgentCardBuilder(
                default_auth_schemes=auth_schemes,
            )
            app_state.set_a2a_card_builder(card_builder)
            a2a_root_controllers = (WellKnownAgentCardController,)

            # Outbound client + JSON-RPC gateway need the connection
            # catalog and integrations enabled.
            if effective_config.integrations.enabled and connection_catalog is not None:
                import httpx  # noqa: PLC0415

                from synthorg.a2a.client import A2AClient  # noqa: PLC0415
                from synthorg.a2a.gateway import (  # noqa: PLC0415
                    A2AGatewayController,
                )
                from synthorg.a2a.peer_registry import (  # noqa: PLC0415
                    PeerRegistry,
                )

                peer_registry = PeerRegistry()
                a2a_http_client = httpx.AsyncClient(
                    timeout=effective_config.a2a.client_timeout_seconds
                )
                from synthorg.tools.network_validator import (  # noqa: PLC0415
                    NetworkPolicy,
                )

                a2a_network_policy = NetworkPolicy()
                a2a_client = A2AClient(
                    connection_catalog,
                    network_validator=a2a_network_policy,
                    http_client=a2a_http_client,
                    timeout_seconds=effective_config.a2a.client_timeout_seconds,
                )

                app_state.set_a2a_peer_registry(peer_registry)
                app_state.set_a2a_client(a2a_client)
                a2a_controllers = (A2AGatewayController,)

            logger.info(
                API_SERVICE_AUTO_WIRED,
                service="a2a_gateway",
            )
        except MemoryError, RecursionError:
            raise
        except Exception:
            logger.warning(
                API_APP_STARTUP,
                error="A2A gateway auto-wire failed (non-fatal)",
                exc_info=True,
            )

    api_router = Router(
        path=api_config.api_prefix,
        route_handlers=[
            *BASE_CONTROLLERS,
            *integration_controllers,
            *a2a_controllers,
            ws_handler,
        ],
        guards=[require_password_changed],
    )

    # Phase 2 auto-wiring flag: persistence being non-None is the
    # enabling condition -- SettingsService needs connected persistence
    # and is created in on_startup after _init_persistence().
    _should_auto_wire = settings_service is None and persistence is not None

    # Review gate service -- transitions tasks from IN_REVIEW on approval.
    # Needs ``task_engine`` for self-review enforcement (preflight) and
    # state transitions; ``persistence`` is OPTIONAL and only used for
    # the auditable decisions drop-box.  Construct the service whenever
    # ``task_engine`` exists so the fail-fast self-review / missing-task
    # preflight still runs in task-engine-only deployments; decision
    # recording gracefully degrades to a WARNING-level no-op when
    # persistence is absent.
    if task_engine is not None:
        review_gate_service = ReviewGateService(
            task_engine=task_engine,
            persistence=persistence,
        )
        app_state.set_review_gate_service(review_gate_service)

    # Approval timeout scheduler -- None here; auto-creation from
    # settings at startup is not yet wired.  Pass explicitly via the
    # lifecycle when a TimeoutChecker is available.
    approval_timeout_scheduler: ApprovalTimeoutScheduler | None = None

    startup, shutdown = _build_lifecycle(
        persistence,
        message_bus,
        bridge,
        settings_dispatcher,
        task_engine,
        meeting_scheduler,
        backup_service,
        approval_timeout_scheduler,
        app_state,
        should_auto_wire_settings=_should_auto_wire,
        effective_config=effective_config,
    )

    # Project telemetry: build collector (reads SYNTHORG_TELEMETRY env for
    # opt-in, defaults to disabled). Attach to app_state so the health
    # endpoint can report the state, and hook start()/shutdown() into the
    # Litestar lifespan. Telemetry is SynthOrg-owned and silent on
    # failure: a broken reporter falls back to noop and never affects
    # the app.
    #
    # Shutdown is appended (runs LAST), not prepended: critical
    # infrastructure (task engine drain, persistence disconnect, bus
    # stop) must complete first so the session-summary event emitted
    # by ``telemetry_collector.shutdown`` reflects final state, and so
    # a hanging Logfire flush never blocks cleanup of load-bearing
    # resources.
    telemetry_collector = _build_telemetry_collector(effective_config.telemetry)
    app_state.set_telemetry_collector(telemetry_collector)
    startup = [*startup, telemetry_collector.start]
    shutdown = [*shutdown, telemetry_collector.shutdown]

    if _skip_lifecycle_shutdown:
        shutdown = []

    # Per-operation rate limiter.  Layered on top of the global
    # two-tier limiter; read from app state by ``per_op_rate_limit``
    # guards.  The store is built unconditionally so that operators who
    # toggle ``api.per_op_rate_limit_enabled`` at runtime (the setting
    # is marked runtime-editable) do not land on a wired-but-uncapped
    # request path; the config's ``enabled`` flag short-circuits the
    # guard when disabled.  Store construction is cheap (empty dicts +
    # per-key locks materialise lazily on first acquire).
    per_op_rate_limit_store: SlidingWindowStore = build_sliding_window_store(
        api_config.per_op_rate_limit,
    )
    app_state.set_per_op_rate_limit_config(api_config.per_op_rate_limit)
    # Honour ``_skip_lifecycle_shutdown`` so tests that share an
    # app across multiple lifespans do not tear down the store
    # (and its background GC) on the first teardown.
    if not _skip_lifecycle_shutdown:
        shutdown = [*shutdown, per_op_rate_limit_store.close]

    # Per-operation inflight-concurrency limiter.
    # Layered on top of the sliding-window per-op limiter; caps
    # simultaneous long-running requests per (operation, subject).
    # Enforced by ``PerOpConcurrencyMiddleware`` registered in the
    # middleware stack.  Built unconditionally (same rationale as the
    # sliding-window store): runtime toggling of
    # ``api.per_op_concurrency_enabled`` must not encounter a missing
    # store.  The middleware short-circuits when
    # ``config.enabled`` is False without ever touching the store.
    per_op_inflight_store: InflightStore = build_inflight_store(
        api_config.per_op_concurrency,
    )
    app_state.set_per_op_concurrency_config(api_config.per_op_concurrency)
    if not _skip_lifecycle_shutdown:
        shutdown = [*shutdown, per_op_inflight_store.close]

    return Litestar(
        route_handlers=[api_router, *a2a_root_controllers],
        # Disable Litestar's built-in logging config to preserve the
        # structlog multi-file-sink pipeline set up by
        # _bootstrap_app_logging() above.  Without this, Litestar calls
        # dictConfig() at startup which triggers _clearExistingHandlers
        # and replaces structlog's file sinks with a stdlib
        # queue_listener, causing all runtime logs to go only to Docker
        # stdout.
        logging_config=None,
        state=State(
            {
                "app_state": app_state,
                "per_op_rate_limit_store": per_op_rate_limit_store,
                "per_op_rate_limit_config": api_config.per_op_rate_limit,
                # Inflight-concurrency state used by
                # ``PerOpConcurrencyMiddleware``; mirrors the
                # sliding-window store's wiring.
                "per_op_inflight_store": per_op_inflight_store,
                "per_op_inflight_config": api_config.per_op_concurrency,
                # Mirrors the global limiter's trusted-proxy set so the
                # per-op guard extracts the same "real" client IP behind
                # reverse proxies instead of bucketing all traffic by
                # the proxy's IP.  The raw frozenset is kept for
                # diagnostic reads; the parsed tuple beside it is what
                # the guards consult per-request.
                "per_op_trusted_proxies": frozenset(
                    api_config.server.trusted_proxies,
                ),
                "per_op_trusted_networks": parse_trusted_networks(
                    frozenset(api_config.server.trusted_proxies),
                ),
            },
        ),
        cors_config=CORSConfig(
            allow_origins=list(api_config.cors.allowed_origins),
            allow_methods=list(api_config.cors.allow_methods),  # type: ignore[arg-type]
            allow_headers=list(api_config.cors.allow_headers),
            allow_credentials=api_config.cors.allow_credentials,
        ),
        compression_config=CompressionConfig(
            backend="brotli",
            minimum_size=api_config.server.compression_minimum_size_bytes,
        ),
        # Must be >= artifact API max payload (50 MB) so endpoint-level
        # validation can enforce exact storage limits.
        request_max_body_size=api_config.server.request_max_body_size_bytes,
        before_send=[security_headers_hook],
        middleware=middleware,
        plugins=plugins,
        exception_handlers=dict(EXCEPTION_HANDLERS),  # type: ignore[arg-type]
        openapi_config=OpenAPIConfig(
            title="SynthOrg API",
            version=__version__,
            path="/docs",
            render_plugins=[
                ScalarRenderPlugin(path="/api"),
            ],
        ),
        on_startup=startup,
        on_shutdown=shutdown,
    )

Config

config

API configuration models.

Frozen Pydantic models for CORS, rate limiting, server, authentication, and the top-level ApiConfig that aggregates them all.

CorsConfig pydantic-model

Bases: BaseModel

CORS configuration for the API.

Attributes:

Name Type Description
allowed_origins tuple[str, ...]

Origins permitted to make cross-origin requests.

allow_methods tuple[str, ...]

HTTP methods permitted in cross-origin requests.

allow_headers tuple[str, ...]

Headers permitted in cross-origin requests.

allow_credentials bool

Whether credentials (cookies, auth) are allowed in cross-origin requests.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_wildcard_credentials

allowed_origins pydantic-field

allowed_origins = ('http://localhost:5173',)

Origins permitted to make cross-origin requests

allow_methods pydantic-field

allow_methods = ('GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS')

HTTP methods permitted in cross-origin requests

allow_headers pydantic-field

allow_headers = ('Content-Type', 'Authorization', 'X-CSRF-Token')

Headers permitted in cross-origin requests

allow_credentials pydantic-field

allow_credentials = True

Whether credentials (cookies) are allowed

RateLimitTimeUnit

Bases: StrEnum

Valid time windows for rate limiting.

RateLimitConfig pydantic-model

Bases: BaseModel

API rate limiting configuration.

Three tiers stacked around the auth middleware:

  • IP floor (outermost, un-gated): keyed by client IP, applies to every request -- including ones the auth middleware rejects with 401. Guards against flood attacks that burn auth-validation cycles on protected endpoints with forged tokens.
  • Unauthenticated (middle, only when scope["user"] is None): keyed by client IP, aggressive cap on brute-force against login/setup/logout.
  • Authenticated (innermost, only when scope["user"] is set): keyed by user ID, generous cap for normal dashboard use.

Keying authenticated limits by user ID instead of IP prevents multi-user deployments behind a shared gateway or NAT from collectively exhausting a single per-IP budget.

Attributes:

Name Type Description
floor_max_requests int

Maximum total requests per time window (by IP) across the whole API. Catches traffic that auth_middleware rejects before the unauth tier sees it.

unauth_max_requests int

Maximum unauthenticated requests per time window (by IP).

auth_max_requests int

Maximum authenticated requests per time window (by user ID).

time_unit RateLimitTimeUnit

Time window (second, minute, hour, day).

exclude_paths tuple[str, ...]

Paths excluded from rate limiting.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_floor_above_user_tiers
  • _reject_legacy_max_requests

floor_max_requests pydantic-field

floor_max_requests = 10000

Maximum total requests per time window (by IP) across the whole API, including requests rejected by the auth middleware. Defense-in-depth against floods of invalid auth attempts on protected endpoints. The floor wraps both user-gated tiers in the middleware stack, so it must be >= auth_max_requests AND >= unauth_max_requests -- a lower floor would silently cap either the authenticated per-user budget or the unauthenticated per-IP budget below its documented value (especially behind a shared NAT where many users share one IP). Enforced by :meth:_validate_floor_above_user_tiers.

unauth_max_requests pydantic-field

unauth_max_requests = 20

Maximum unauthenticated requests per time window (by IP)

auth_max_requests pydantic-field

auth_max_requests = 6000

Maximum authenticated requests per time window (by user ID)

time_unit pydantic-field

time_unit = MINUTE

Time window (second, minute, hour, day)

exclude_paths pydantic-field

exclude_paths = ('/api/v1/health',)

Paths excluded from rate limiting

max_rpm_default pydantic-field

max_rpm_default = 60

Fallback requests-per-minute applied to per-connection coordinators when the catalog does not provide a limiter (mirrors the api.max_rpm_default setting; restart required)

ServerConfig pydantic-model

Bases: BaseModel

Uvicorn server configuration.

Attributes:

Name Type Description
host str

Bind address.

port int

Bind port.

reload bool

Enable auto-reload for development.

workers int

Number of worker processes.

ws_ping_interval float

WebSocket ping interval in seconds (0 to disable).

ws_ping_timeout float

WebSocket pong timeout in seconds.

ssl_certfile str | None

Path to SSL certificate file (PEM format).

ssl_keyfile str | None

Path to SSL private key file (PEM format).

ssl_ca_certs str | None

Path to CA bundle for client cert verification.

trusted_proxies tuple[str, ...]

IP addresses/CIDRs trusted as reverse proxies for X-Forwarded-For/X-Forwarded-Proto header processing.

compression_minimum_size_bytes int

Minimum response body size in bytes before brotli compression kicks in. Mirrors the api.compression_minimum_size_bytes setting (restart required); the API startup hook resolves the current value and threads it in here so operator tuning via the settings database takes effect on next boot.

request_max_body_size_bytes int

Maximum accepted HTTP request body size in bytes. Mirrors the api.request_max_body_size_bytes setting (restart required); populated the same way as compression_minimum_size_bytes.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _normalize_empty_tls
  • _validate_tls_pair

host pydantic-field

host = '127.0.0.1'

Bind address

port pydantic-field

port = 3001

Bind port

reload pydantic-field

reload = False

Enable auto-reload for development

workers pydantic-field

workers = 1

Number of worker processes

ws_ping_interval pydantic-field

ws_ping_interval = 20.0

WebSocket ping interval in seconds (0 to disable)

ws_ping_timeout pydantic-field

ws_ping_timeout = 20.0

WebSocket pong timeout in seconds

ssl_certfile pydantic-field

ssl_certfile = None

Path to SSL certificate file (PEM format)

ssl_keyfile pydantic-field

ssl_keyfile = None

Path to SSL private key file (PEM format)

ssl_ca_certs pydantic-field

ssl_ca_certs = None

Path to CA bundle for client certificate verification

trusted_proxies pydantic-field

trusted_proxies = ()

IP addresses/CIDRs trusted as reverse proxies for X-Forwarded-For/Proto header processing

compression_minimum_size_bytes pydantic-field

compression_minimum_size_bytes = 1000

Minimum response body size in bytes before brotli compression is applied (mirrors the api.compression_minimum_size_bytes setting; restart required)

request_max_body_size_bytes pydantic-field

request_max_body_size_bytes = 52428800

Maximum accepted HTTP request body size in bytes (mirrors the api.request_max_body_size_bytes setting; restart required)

ApiConfig pydantic-model

Bases: BaseModel

Top-level API configuration aggregating all sub-configs.

Attributes:

Name Type Description
cors CorsConfig

CORS configuration.

rate_limit RateLimitConfig

Global three-tier rate limiting configuration (IP floor un-gated, unauthenticated by IP, authenticated by user ID).

per_op_rate_limit PerOpRateLimitConfig

Per-operation throttling configuration (layered on top of the global three-tier limiter).

per_op_concurrency PerOpConcurrencyConfig

Per-operation inflight concurrency capping (layered on top of the sliding-window per-op limiter; caps simultaneous long-running requests per operation per subject).

server ServerConfig

Uvicorn server configuration.

auth AuthConfig

Authentication configuration.

api_prefix NotBlankStr

URL prefix for all API routes.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

cors pydantic-field

cors

CORS configuration

rate_limit pydantic-field

rate_limit

Global three-tier rate limiting configuration: un-gated IP floor, unauthenticated by IP, authenticated by user ID

per_op_rate_limit pydantic-field

per_op_rate_limit

Per-operation throttling (layered on the global limiter)

per_op_concurrency pydantic-field

per_op_concurrency

Per-operation inflight concurrency capping (layered on the sliding-window per-op limiter; caps simultaneous long-running requests per (operation, subject))

server pydantic-field

server

Uvicorn server configuration

auth pydantic-field

auth

Authentication configuration

api_prefix pydantic-field

api_prefix = '/api/v1'

URL prefix for all API routes

DTOs

dto

Request/response DTOs and envelope models.

Response envelopes wrap all API responses in a consistent structure. Request DTOs define write-operation payloads (separate from domain models because they omit server-generated fields).

ErrorDetail pydantic-model

Bases: BaseModel

Structured error metadata (RFC 9457).

Self-contained so agents can parse it without referencing the parent envelope.

Attributes:

Name Type Description
detail NotBlankStr

Human-readable occurrence-specific explanation.

error_code ErrorCode

Machine-readable error code (by convention, 4-digit category-grouped; see ErrorCode).

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

instance NotBlankStr

Request correlation ID for log tracing.

title NotBlankStr

Static per-category title (e.g. "Authentication Error").

type NotBlankStr

Documentation URI for the error category.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_retry_after_consistency

retry_after pydantic-field

retry_after = None

Seconds to wait before retrying (null when not applicable).

ProblemDetail pydantic-model

Bases: BaseModel

Bare RFC 9457 application/problem+json response body.

Returned when the client sends Accept: application/problem+json.

Attributes:

Name Type Description
type NotBlankStr

Documentation URI for the error category.

title NotBlankStr

Static per-category title.

status int

HTTP status code.

detail NotBlankStr

Human-readable occurrence-specific explanation.

instance NotBlankStr

Request correlation ID for log tracing.

error_code ErrorCode

Machine-readable 4-digit error code.

error_category ErrorCategory

High-level error category.

retryable bool

Whether the client should retry the request.

retry_after int | None

Seconds to wait before retrying (None when not applicable).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_retry_after_consistency

retry_after pydantic-field

retry_after = None

Seconds to wait before retrying (null when not applicable).

ApiResponse pydantic-model

Bases: BaseModel

Standard API response envelope.

Attributes:

Name Type Description
data T | None

Response payload (None on error).

error str | None

Error message (None on success).

error_detail ErrorDetail | None

Structured error metadata (None on success).

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • data (T | None)
  • error (str | None)
  • error_detail (ErrorDetail | None)

Validators:

  • _validate_error_detail_consistency

success property

success

Whether the request succeeded (derived from error).

PaginationMeta pydantic-model

Bases: BaseModel

Pagination metadata for list responses.

Attributes:

Name Type Description
total int

Total number of items matching the query.

offset int

Starting offset of the returned page.

limit int

Maximum items per page.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

total pydantic-field

total

Total matching items

offset pydantic-field

offset

Starting offset

limit pydantic-field

limit

Maximum items per page

PaginatedResponse pydantic-model

Bases: BaseModel

Paginated API response envelope.

Attributes:

Name Type Description
data tuple[T, ...]

Page of items.

error str | None

Error message (None on success).

error_detail ErrorDetail | None

Structured error metadata (None on success).

pagination PaginationMeta

Pagination metadata.

degraded_sources tuple[NotBlankStr, ...]

Data sources that failed gracefully, resulting in partial data. Empty when all sources responded normally.

success bool

Whether the request succeeded (computed from error).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_error_detail_consistency

degraded_sources pydantic-field

degraded_sources = ()

Data sources that failed gracefully (partial data)

success property

success

Whether the request succeeded (derived from error).

CreateArtifactRequest pydantic-model

Bases: BaseModel

Payload for creating a new artifact.

Attributes:

Name Type Description
type ArtifactType

Artifact type (code, tests, documentation).

path NotBlankStr

Logical file/directory path of the artifact.

task_id NotBlankStr

ID of the originating task.

created_by NotBlankStr

Agent ID of the creator.

description str

Human-readable description.

content_type str

MIME content type (empty if no content stored).

project_id NotBlankStr | None

Optional project ID to link the artifact to.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

type pydantic-field

type

Artifact category (code, tests, documentation).

path pydantic-field

path

File path or artifact identifier within the workspace.

task_id pydantic-field

task_id

Originating task identifier.

created_by pydantic-field

created_by

Agent identifier of the artifact creator.

description pydantic-field

description = ''

Human-readable artifact description.

content_type pydantic-field

content_type = ''

MIME type of the artifact content (empty when no content is stored).

project_id pydantic-field

project_id = None

Optional project identifier to link the artifact to.

CreateProjectRequest pydantic-model

Bases: BaseModel

Payload for creating a new project.

Attributes:

Name Type Description
name NotBlankStr

Project display name.

description str

Detailed project description.

team tuple[NotBlankStr, ...]

Agent IDs assigned to the project.

lead NotBlankStr | None

Agent ID of the project lead.

deadline str | None

Optional deadline (ISO 8601 string).

budget float

Total budget in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_request

CreateTaskRequest pydantic-model

Bases: BaseModel

Payload for creating a new task.

Attributes:

Name Type Description
title NotBlankStr

Short task title.

description NotBlankStr

Detailed task description.

type TaskType

Task work type.

priority Priority

Task priority level.

project NotBlankStr

Project ID.

created_by NotBlankStr

Agent name of the creator.

assigned_to NotBlankStr | None

Optional assignee agent ID.

estimated_complexity Complexity

Complexity estimate.

budget_limit float

Maximum spend in base currency.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

UpdateTaskRequest pydantic-model

Bases: BaseModel

Payload for updating task fields.

All fields are optional -- only provided fields are updated.

Attributes:

Name Type Description
title NotBlankStr | None

New title.

description NotBlankStr | None

New description.

priority Priority | None

New priority.

assigned_to NotBlankStr | None

New assignee.

budget_limit float | None

New budget limit.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

TransitionTaskRequest pydantic-model

Bases: BaseModel

Payload for a task status transition.

Attributes:

Name Type Description
target_status TaskStatus

The desired target status.

assigned_to NotBlankStr | None

Optional assignee override for the transition.

expected_version int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

target_status pydantic-field

target_status

Desired target status

expected_version pydantic-field

expected_version = None

Optimistic concurrency version guard

CancelTaskRequest pydantic-model

Bases: BaseModel

Payload for cancelling a task.

Attributes:

Name Type Description
reason NotBlankStr

Reason for cancellation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

reason pydantic-field

reason

Reason for cancellation

CreateApprovalRequest pydantic-model

Bases: BaseModel

Payload for creating a new approval item.

Attributes:

Name Type Description
action_type NotBlankStr

Kind of action requiring approval (category:action format).

title NotBlankStr

Short summary.

description NotBlankStr

Detailed explanation.

risk_level ApprovalRiskLevel

Assessed risk level.

ttl_seconds int | None

Optional time-to-live in seconds (min 60, max 604 800 = 7 days).

task_id NotBlankStr | None

Optional associated task.

metadata dict[str, str]

Additional key-value pairs.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_action_type_formataction_type
  • _validate_metadata_bounds

action_type pydantic-field

action_type

Kind of action requiring approval in category:action format.

title pydantic-field

title

Short human-readable summary of the approval.

description pydantic-field

description

Detailed explanation of the action and why it requires approval.

risk_level pydantic-field

risk_level

Assessed risk level for the action.

ttl_seconds pydantic-field

ttl_seconds = None

Optional time-to-live in seconds before the approval auto-expires (minimum 60, maximum 604800 = 7 days).

task_id pydantic-field

task_id = None

Optional associated task identifier.

ApproveRequest pydantic-model

Bases: BaseModel

Payload for approving an approval item.

Attributes:

Name Type Description
comment NotBlankStr | None

Optional comment explaining the approval.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

RejectRequest pydantic-model

Bases: BaseModel

Payload for rejecting an approval item.

Attributes:

Name Type Description
reason NotBlankStr

Mandatory reason for rejection.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

CoordinateTaskRequest pydantic-model

Bases: BaseModel

Payload for triggering multi-agent coordination on a task.

Attributes:

Name Type Description
agent_names tuple[NotBlankStr, ...] | None

Agent names to coordinate with (None = all active). When provided, must be non-empty and unique.

max_subtasks int

Maximum subtasks for decomposition.

max_concurrency_per_wave int | None

Override for max concurrency per wave.

fail_fast bool | None

Override for fail-fast behaviour (None = use section config default).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • agent_names (tuple[NotBlankStr, ...] | None)
  • max_subtasks (int)
  • max_concurrency_per_wave (int | None)
  • fail_fast (bool | None)

Validators:

  • _validate_unique_agent_names

agent_names pydantic-field

agent_names = None

Agent names to coordinate with (None = all active)

CoordinationPhaseResponse pydantic-model

Bases: BaseModel

Response model for a single coordination phase.

Attributes:

Name Type Description
phase NotBlankStr

Phase name.

success bool

Whether the phase completed successfully.

duration_seconds float

Wall-clock duration of the phase.

error NotBlankStr | None

Error description if the phase failed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_success_error_consistency

CoordinationResultResponse pydantic-model

Bases: BaseModel

Response model for a complete coordination run.

Attributes:

Name Type Description
parent_task_id NotBlankStr

ID of the parent task.

topology NotBlankStr

Resolved coordination topology.

total_duration_seconds float

Total wall-clock duration.

total_cost float

Total cost across all waves.

phases tuple[CoordinationPhaseResponse, ...]

Phase results in execution order.

wave_count int

Number of execution waves.

is_success bool

Whether all phases succeeded (computed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

currency pydantic-field

currency = DEFAULT_CURRENCY

ISO 4217 currency code

is_success property

is_success

True when every phase completed successfully.

CreateFromPresetRequest pydantic-model

Bases: BaseModel

Payload for creating a provider from a preset.

Attributes:

Name Type Description
preset_name NotBlankStr

Name of the preset to create from.

name NotBlankStr

Unique provider name (2-64 chars, lowercase + hyphens).

auth_type AuthType | None

Override the preset's default auth type (optional).

subscription_token NotBlankStr | None

Bearer token for subscription-based auth.

tos_accepted bool

Whether the user accepted the subscription ToS.

base_url NotBlankStr | None

Override the preset's default base URL (optional).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_namename
  • _validate_base_urlbase_url

CreateProviderRequest pydantic-model

Bases: BaseModel

Payload for creating a new provider.

Attributes:

Name Type Description
name NotBlankStr

Unique provider name (2-64 chars, lowercase + hyphens).

driver NotBlankStr

Driver backend name (default "litellm").

litellm_provider NotBlankStr | None

LiteLLM routing identifier override.

auth_type AuthType

Authentication mechanism for this provider.

api_key NotBlankStr | None

API key credential (optional, depends on auth_type).

subscription_token NotBlankStr | None

Bearer token for subscription-based auth.

tos_accepted bool

Whether the user accepted the subscription ToS.

base_url NotBlankStr | None

Provider API base URL.

models tuple[ProviderModelConfig, ...]

Pre-configured model definitions.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_namename
  • _validate_base_urlbase_url

DiscoverModelsResponse pydantic-model

Bases: BaseModel

Result of provider model auto-discovery.

Attributes:

Name Type Description
discovered_models tuple[ProviderModelConfig, ...]

Models found on the provider endpoint.

provider_name NotBlankStr

Name of the provider that was queried.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

ProbePresetRequest pydantic-model

Bases: BaseModel

Request to probe a preset's candidate URLs for reachability.

Attributes:

Name Type Description
preset_name NotBlankStr

Preset identifier to probe.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

ProbePresetResponse pydantic-model

Bases: BaseModel

Result of probing a preset's candidate URLs.

Attributes:

Name Type Description
url NotBlankStr | None

The first reachable base URL, or None if none responded.

model_count int

Number of models discovered at the URL.

candidates_tried int

Number of candidate URLs attempted.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • url (NotBlankStr | None)
  • model_count (int)
  • candidates_tried (int)

ProviderResponse pydantic-model

Bases: BaseModel

Safe provider config for API responses -- secrets stripped.

Non-secret auth fields are included for frontend edit form UX. Boolean has_* indicators signal credential presence without exposing values.

Attributes:

Name Type Description
driver NotBlankStr

Driver backend name.

litellm_provider NotBlankStr | None

LiteLLM routing identifier override.

auth_type AuthType

Authentication mechanism.

base_url NotBlankStr | None

Provider API base URL.

models tuple[ProviderModelConfig, ...]

Configured model definitions.

has_api_key bool

Whether an API key is set.

has_oauth_credentials bool

Whether OAuth credentials are configured.

has_custom_header bool

Whether a custom auth header is configured.

has_subscription_token bool

Whether a subscription token is set.

tos_accepted_at str | None

ISO timestamp of ToS acceptance (or None).

preset_name NotBlankStr | None

Preset used to create this provider (if any).

supports_model_pull bool

Whether pulling models is supported.

supports_model_delete bool

Whether deleting models is supported.

supports_model_config bool

Whether per-model config is supported.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

TestConnectionRequest pydantic-model

Bases: BaseModel

Payload for testing a provider connection.

Attributes:

Name Type Description
model NotBlankStr | None

Model to test (defaults to first model in config).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

TestConnectionResponse pydantic-model

Bases: BaseModel

Result of a provider connection test.

Attributes:

Name Type Description
success bool

Whether the connection test succeeded.

latency_ms float | None

Round-trip latency in milliseconds.

error NotBlankStr | None

Error message on failure.

model_tested NotBlankStr | None

Model ID that was tested.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_success_error_consistency

UpdateProviderRequest pydantic-model

Bases: BaseModel

Payload for updating a provider (partial update).

All fields are optional -- only provided fields are updated. tos_accepted: only True re-stamps the timestamp; False and None are no-ops (cannot be retracted).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_base_urlbase_url
  • _validate_credential_clear_consistency

ActivateWorkflowRequest pydantic-model

Bases: BaseModel

Request body for activating a workflow definition.

Attributes:

Name Type Description
project NotBlankStr

Project ID for all created tasks.

context dict[str, str | int | float | bool | None]

Runtime context for condition expression evaluation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

project pydantic-field

project

Project ID for created tasks

context pydantic-field

context

Runtime context for condition evaluation

BlueprintInfoResponse pydantic-model

Bases: BaseModel

Response body for a single workflow blueprint entry.

Attributes:

Name Type Description
name NotBlankStr

Blueprint identifier.

display_name NotBlankStr

Human-readable name.

description str

Short description.

source Literal['builtin', 'user']

Origin of the blueprint.

tags tuple[NotBlankStr, ...]

Categorization tags.

workflow_type WorkflowType

Target execution topology.

node_count int

Number of nodes in the graph.

edge_count int

Number of edges in the graph.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

name pydantic-field

name

Blueprint identifier

display_name pydantic-field

display_name

Human-readable name

description pydantic-field

description = ''

Short description

source pydantic-field

source

Origin: builtin or user

tags pydantic-field

tags = ()

Tags

workflow_type pydantic-field

workflow_type

Target workflow type

node_count pydantic-field

node_count

Number of nodes

edge_count pydantic-field

edge_count

Number of edges

CreateFromBlueprintRequest pydantic-model

Bases: BaseModel

Request body for creating a workflow from a blueprint.

Attributes:

Name Type Description
blueprint_name NotBlankStr

Name of the blueprint to instantiate.

name NotBlankStr | None

Optional name override (defaults to blueprint display_name).

description str | None

Optional description override.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

blueprint_name pydantic-field

blueprint_name

Blueprint to instantiate

name pydantic-field

name = None

Workflow name override

description pydantic-field

description = None

Description override

CreateWorkflowDefinitionRequest pydantic-model

Bases: BaseModel

Payload for creating a new workflow definition.

Attributes:

Name Type Description
name NotBlankStr

Workflow name.

description str

Optional description.

workflow_type WorkflowType

Target execution topology.

nodes tuple[dict[str, object], ...]

Nodes in the workflow graph (serialized as dicts).

edges tuple[dict[str, object], ...]

Edges connecting nodes (serialized as dicts).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

name pydantic-field

name

Workflow name

description pydantic-field

description = ''

Description

workflow_type pydantic-field

workflow_type

Target execution topology

version pydantic-field

version = '1.0.0'

Semver version string

inputs pydantic-field

inputs = ()

Typed input declarations

outputs pydantic-field

outputs = ()

Typed output declarations

is_subworkflow pydantic-field

is_subworkflow = False

Whether this definition is a reusable subworkflow

nodes pydantic-field

nodes

Workflow nodes

edges pydantic-field

edges

Workflow edges

RollbackWorkflowRequest pydantic-model

Bases: BaseModel

Request body for rolling back a workflow to a previous version.

Attributes:

Name Type Description
target_version int

Snapshot version number to restore content from (monotonic counter in the workflow_definition_versions table).

expected_revision int

Current definition revision for optimistic concurrency on the live workflow_definitions row.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

target_version pydantic-field

target_version

Snapshot version to rollback to

expected_revision pydantic-field

expected_revision

Optimistic concurrency guard on the definition revision

UpdateWorkflowDefinitionRequest pydantic-model

Bases: BaseModel

Payload for updating an existing workflow definition.

All fields are optional -- only provided fields are updated.

Attributes:

Name Type Description
name NotBlankStr | None

New name.

description str | None

New description.

workflow_type WorkflowType | None

New workflow type.

version NotBlankStr | None

New semver version string.

inputs tuple[WorkflowIODeclarationRequest, ...] | None

New typed input contract.

outputs tuple[WorkflowIODeclarationRequest, ...] | None

New typed output contract.

is_subworkflow bool | None

New publishing flag.

nodes tuple[dict[str, object], ...] | None

New nodes.

edges tuple[dict[str, object], ...] | None

New edges.

expected_revision int | None

Optimistic concurrency guard.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

version pydantic-field

version = None

Semver string override

expected_revision pydantic-field

expected_revision = None

Optimistic concurrency guard (revision counter)

WorkflowIODeclarationRequest pydantic-model

Bases: BaseModel

Typed input/output declaration for workflow creation/update.

Field names mirror WorkflowIODeclaration so that model_validate pass-through works without renaming.

Attributes:

Name Type Description
name NotBlankStr

Identifier for this input or output.

type WorkflowValueType

The declared data type.

required bool

Whether this declaration is mandatory.

default object

Default when not required (must be None when required).

description str

Human-readable description.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_default_with_required

name pydantic-field

name

Declaration name

type pydantic-field

type

Declared data type

required pydantic-field

required = True

Whether mandatory

default pydantic-field

default = None

Default value

description pydantic-field

description = ''

Human-readable description

RollbackAgentIdentityRequest pydantic-model

Bases: BaseModel

Request body for rolling back an agent identity to a previous version.

Attributes:

Name Type Description
target_version int

Snapshot version number to restore content from (monotonic counter in the agent_identity_versions table).

reason NotBlankStr | None

Optional human-readable justification recorded alongside the evolution event for audit purposes.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

target_version pydantic-field

target_version

Snapshot version to rollback to

reason pydantic-field

reason = None

Optional rollback justification for the audit trail

to_provider_response

to_provider_response(config)

Convert a ProviderConfig to a safe ProviderResponse.

Strips all secrets and provides boolean credential indicators. Resolves local model management capabilities from the preset when preset_name is set.

Parameters:

Name Type Description Default
config ProviderConfig

Provider configuration (may contain secrets).

required

Returns:

Type Description
ProviderResponse

Safe response DTO with secrets stripped.

Source code in src/synthorg/api/dto_providers.py
def to_provider_response(config: ProviderConfig) -> ProviderResponse:
    """Convert a ProviderConfig to a safe ProviderResponse.

    Strips all secrets and provides boolean credential indicators.
    Resolves local model management capabilities from the preset
    when ``preset_name`` is set.

    Args:
        config: Provider configuration (may contain secrets).

    Returns:
        Safe response DTO with secrets stripped.
    """
    from synthorg.providers.presets import get_preset  # noqa: PLC0415

    tos_str = (
        config.tos_accepted_at.isoformat()
        if config.tos_accepted_at is not None
        else None
    )
    preset = get_preset(config.preset_name) if config.preset_name else None
    return ProviderResponse(
        driver=config.driver,
        litellm_provider=config.litellm_provider,
        auth_type=config.auth_type,
        base_url=config.base_url,
        models=config.models,
        has_api_key=config.api_key is not None,
        has_oauth_credentials=(
            config.oauth_client_id is not None
            and config.oauth_client_secret is not None
            and config.oauth_token_url is not None
        ),
        has_custom_header=(
            config.custom_header_name is not None
            and config.custom_header_value is not None
        ),
        has_subscription_token=config.subscription_token is not None,
        tos_accepted_at=tos_str,
        oauth_token_url=config.oauth_token_url,
        oauth_client_id=config.oauth_client_id,
        oauth_scope=config.oauth_scope,
        custom_header_name=config.custom_header_name,
        preset_name=config.preset_name,
        supports_model_pull=preset.supports_model_pull if preset else False,
        supports_model_delete=preset.supports_model_delete if preset else False,
        supports_model_config=preset.supports_model_config if preset else False,
    )

Errors

errors

API error hierarchy and RFC 9457 error taxonomy.

All API-specific errors inherit from ApiError so callers can catch the entire family with a single except clause.

ErrorCategory and ErrorCode provide machine-readable error metadata for structured error responses (RFC 9457).

ErrorCategory

Bases: StrEnum

High-level error category for structured error responses.

Values are lowercase strings suitable for JSON serialization.

ErrorCode

Bases: IntEnum

Machine-readable error codes (4-digit, category-grouped).

First digit encodes the category: 1xxx = auth, 2xxx = validation, 3xxx = not_found, 4xxx = conflict, 5xxx = rate_limit, 6xxx = budget_exhausted, 7xxx = provider_error, 8xxx = internal.

ApiError

ApiError(message=None, *, status_code=500)

Bases: Exception

Base exception for API-layer errors.

Class Attributes

default_message: Fallback error message used when none is provided and for 5xx response scrubbing. error_category: RFC 9457 error category. error_code: RFC 9457 machine-readable error code. retryable: Whether the client should retry the request.

Instance Attributes

status_code: HTTP status code (set via __init__, fixed per subclass).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None, *, status_code: int = 500) -> None:
    super().__init__(message or self.default_message)
    self.status_code = status_code

__init_subclass__

__init_subclass__(**kwargs)

Validate error_code/error_category consistency at class creation.

Source code in src/synthorg/api/errors.py
def __init_subclass__(cls, **kwargs: object) -> None:
    """Validate error_code/error_category consistency at class creation."""
    super().__init_subclass__(**kwargs)
    prefix = cls.error_code.value // 1000
    expected = _CODE_CATEGORY_PREFIX.get(prefix)
    if expected is not None and cls.error_category != expected:
        msg = (
            f"{cls.__name__}: error_code {cls.error_code.name} "
            f"(prefix {prefix}) implies category {expected.name}, "
            f"but error_category is {cls.error_category.name}"
        )
        raise TypeError(msg)

NotFoundError

NotFoundError(message=None)

Bases: ApiError

Raised when a requested resource does not exist (404).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=404)

ApiValidationError

ApiValidationError(message=None)

Bases: ApiError

Raised when request data fails validation (422).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=422)

ConflictError

ConflictError(message=None)

Bases: ApiError

Raised when a resource conflict occurs (409).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=409)

VersionConflictError

VersionConflictError(message=None)

Bases: ApiError

Raised when an ETag/If-Match version check fails (409).

Used for ETag/If-Match optimistic concurrency checks -- currently on settings endpoints.

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=409)

ForbiddenError

ForbiddenError(message=None)

Bases: ApiError

Raised when access is denied (403).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=403)

SessionRevokedError

SessionRevokedError(message=None)

Bases: ApiError

Raised when a revoked session token is used (401).

Gives clients a distinct error code (SESSION_REVOKED) so they can show a "you were logged out" message instead of a generic auth failure.

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=401)

UnauthorizedError

UnauthorizedError(message=None)

Bases: ApiError

Raised when authentication is required or invalid (401).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=401)

AccountLockedError

AccountLockedError(message=None, *, retry_after=0)

Bases: ApiError

Raised when login is blocked by account lockout (429).

Uses HTTP 429 (Too Many Requests) with an optional Retry-After header indicating when the lockout expires.

Source code in src/synthorg/api/errors.py
def __init__(
    self,
    message: str | None = None,
    *,
    retry_after: int = 0,
) -> None:
    super().__init__(message, status_code=429)
    self.retry_after = max(0, int(retry_after))

ServiceUnavailableError

ServiceUnavailableError(message=None)

Bases: ApiError

Raised when a required service is not configured (503).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=503)

ArtifactTooLargeApiError

ArtifactTooLargeApiError(message=None)

Bases: ApiError

Raised when an artifact upload exceeds the size limit (413).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=413)

ArtifactStorageFullApiError

ArtifactStorageFullApiError(message=None)

Bases: ApiError

Raised when the artifact storage backend is full (507).

Source code in src/synthorg/api/errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message, status_code=507)

PerOperationRateLimitError

PerOperationRateLimitError(message=None, *, retry_after=1)

Bases: ApiError

Raised when a per-operation rate limit is exceeded (429).

Produced by :func:synthorg.api.rate_limits.guard.per_op_rate_limit guards. Flows through handle_api_error to produce an RFC 9457 response with Retry-After set.

Source code in src/synthorg/api/errors.py
def __init__(
    self,
    message: str | None = None,
    *,
    retry_after: int = 1,
) -> None:
    super().__init__(message, status_code=429)
    self.retry_after = max(1, int(retry_after))

ConcurrencyLimitExceededError

ConcurrencyLimitExceededError(message=None, *, retry_after=1)

Bases: PerOperationRateLimitError

Raised when a per-operation concurrency (inflight) cap is hit (429).

Produced by the PerOpConcurrencyMiddleware when a user already has max_inflight requests running for the guarded operation. Inherits from :class:PerOperationRateLimitError so the existing 429 / Retry-After / RFC 9457 handling applies unchanged. A distinct error_code lets clients discriminate concurrency denials ("you already have one running") from window denials ("try again after the bucket refills").

Source code in src/synthorg/api/errors.py
def __init__(
    self,
    message: str | None = None,
    *,
    retry_after: int = 1,
) -> None:
    super().__init__(message, status_code=429)
    self.retry_after = max(1, int(retry_after))

category_title

category_title(cat)

Return the RFC 9457 title for a category.

Parameters:

Name Type Description Default
cat ErrorCategory

Error category.

required

Returns:

Type Description
str

Human-readable title string.

Source code in src/synthorg/api/errors.py
def category_title(cat: ErrorCategory) -> str:
    """Return the RFC 9457 ``title`` for a category.

    Args:
        cat: Error category.

    Returns:
        Human-readable title string.
    """
    return CATEGORY_TITLES[cat]

category_type_uri

category_type_uri(cat)

Return the RFC 9457 type URI for a category.

Parameters:

Name Type Description Default
cat ErrorCategory

Error category.

required

Returns:

Type Description
str

Documentation URI with fragment anchor for the error category.

Source code in src/synthorg/api/errors.py
def category_type_uri(cat: ErrorCategory) -> str:
    """Return the RFC 9457 ``type`` URI for a category.

    Args:
        cat: Error category.

    Returns:
        Documentation URI with fragment anchor for the error category.
    """
    return f"{_ERROR_DOCS_BASE}#{cat.value}"

Guards

guards

Route guards for access control.

Guards read the authenticated user identity from connection.user (populated by the auth middleware) and check role-based permissions.

The require_roles factory creates guards for arbitrary role sets. Pre-built constants cover common patterns::

require_ceo              -- CEO only
require_ceo_or_manager   -- CEO or Manager
require_approval_roles   -- CEO, Manager, or Board Member

require_ceo module-attribute

require_ceo = require_roles(CEO)

Guard allowing only the CEO role.

require_ceo_or_manager module-attribute

require_ceo_or_manager = require_roles(CEO, MANAGER)

Guard allowing CEO or Manager roles.

require_approval_roles module-attribute

require_approval_roles = require_roles(CEO, MANAGER, BOARD_MEMBER)

Guard allowing roles that can approve or reject actions.

HumanRole

Bases: StrEnum

Recognised human roles for access control.

has_write_role

has_write_role(role)

Return True if the role grants write access.

Use this for inline role checks instead of importing _WRITE_ROLES directly. The write set includes CEO, Manager, and Pair Programmer.

Source code in src/synthorg/api/guards.py
def has_write_role(role: HumanRole) -> bool:
    """Return True if the role grants write access.

    Use this for inline role checks instead of importing ``_WRITE_ROLES``
    directly.  The write set includes CEO, Manager, and Pair Programmer.
    """
    return role in _WRITE_ROLES

require_write_access

require_write_access(connection, _)

Guard that allows only write-capable human roles.

Checks connection.user.role for ceo, manager, or pair_programmer. Board members are excluded (they may only observe and approve). The system role is intentionally excluded -- use require_roles() with the desired roles for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_write_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows only write-capable human roles.

    Checks ``connection.user.role`` for ``ceo``, ``manager``,
    or ``pair_programmer``.  Board members are excluded (they
    may only observe and approve).  The ``system`` role is
    intentionally excluded -- use ``require_roles()`` with the
    desired roles for endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _WRITE_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_write_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Write access denied")

require_read_access

require_read_access(connection, _)

Guard that allows all human roles (excludes SYSTEM).

Checks connection.user.role for any human role including observer and board_member. The internal system role is excluded -- use require_roles() for endpoints the CLI needs to reach.

Parameters:

Name Type Description Default
connection ASGIConnection

The incoming connection.

required
_ object

Route handler (unused).

required

Raises:

Type Description
PermissionDeniedException

If the role is not permitted.

Source code in src/synthorg/api/guards.py
def require_read_access(
    connection: ASGIConnection,  # type: ignore[type-arg]
    _: object,
) -> None:
    """Guard that allows all human roles (excludes SYSTEM).

    Checks ``connection.user.role`` for any human role
    including ``observer`` and ``board_member``.  The internal
    ``system`` role is excluded -- use ``require_roles()`` for
    endpoints the CLI needs to reach.

    Args:
        connection: The incoming connection.
        _: Route handler (unused).

    Raises:
        PermissionDeniedException: If the role is not permitted.
    """
    role = _get_role(connection)
    if role not in _READ_ROLES:
        logger.warning(
            API_GUARD_DENIED,
            guard="require_read_access",
            role=role,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Read access denied")

require_roles

require_roles(*roles)

Create a guard that allows only the specified roles.

Parameters:

Name Type Description Default
*roles HumanRole

One or more HumanRole members to permit.

()

Returns:

Type Description
Callable[[ASGIConnection, object], None]

A guard function compatible with Litestar's guard protocol.

Raises:

Type Description
ValueError

If no roles are provided.

Source code in src/synthorg/api/guards.py
def require_roles(
    *roles: HumanRole,
) -> Callable[[ASGIConnection, object], None]:  # type: ignore[type-arg]
    """Create a guard that allows only the specified roles.

    Args:
        *roles: One or more ``HumanRole`` members to permit.

    Returns:
        A guard function compatible with Litestar's guard protocol.

    Raises:
        ValueError: If no roles are provided.
    """
    if not roles:
        msg = "require_roles() requires at least one role"
        raise ValueError(msg)

    allowed = frozenset(roles)
    label = ",".join(sorted(r.value for r in allowed))

    def guard(
        connection: ASGIConnection,  # type: ignore[type-arg]
        _: object,
    ) -> None:
        role = _get_role(connection)
        if role not in allowed:
            logger.warning(
                API_GUARD_DENIED,
                guard=f"require_roles({label})",
                role=role,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(detail="Access denied")

    guard.__name__ = f"require_roles({label})"
    guard.__qualname__ = f"require_roles({label})"
    return guard

require_org_mutation

require_org_mutation(department_param=None)

Guard factory for org config mutations.

Access is granted if the user has one of:

  • OrgRole.OWNER -- always allowed
  • OrgRole.EDITOR -- always allowed
  • OrgRole.DEPARTMENT_ADMIN -- allowed only when the target department (read from the path parameter named department_param) is in the user's scoped_departments

If the user has no org_roles (empty tuple), falls back to the existing HumanRole write-access check for backward compatibility with pre-#1082 installations.

Parameters:

Name Type Description Default
department_param str | None

Path parameter name containing the target department (e.g. "name"). None skips department scope checking (company-level endpoints).

None

Returns:

Type Description
Callable[[ASGIConnection, object], None]

A guard function compatible with Litestar's guard protocol.

Source code in src/synthorg/api/guards.py
def require_org_mutation(
    department_param: str | None = None,
) -> Callable[[ASGIConnection, object], None]:  # type: ignore[type-arg]
    """Guard factory for org config mutations.

    Access is granted if the user has one of:

    - ``OrgRole.OWNER`` -- always allowed
    - ``OrgRole.EDITOR`` -- always allowed
    - ``OrgRole.DEPARTMENT_ADMIN`` -- allowed only when the
      target department (read from the path parameter named
      *department_param*) is in the user's ``scoped_departments``

    If the user has no ``org_roles`` (empty tuple), falls back to
    the existing ``HumanRole`` write-access check for backward
    compatibility with pre-#1082 installations.

    Args:
        department_param: Path parameter name containing the target
            department (e.g. ``"name"``).  ``None`` skips department
            scope checking (company-level endpoints).

    Returns:
        A guard function compatible with Litestar's guard protocol.
    """

    def guard(
        connection: ASGIConnection,  # type: ignore[type-arg]
        _: object,
    ) -> None:
        org_roles = _get_org_roles(connection)

        # Backward compat: if no org_roles set, fall back to HumanRole
        if not org_roles:
            role = _get_role(connection)
            if role in _WRITE_ROLES:
                return
            logger.warning(
                API_GUARD_DENIED,
                guard="require_org_mutation(fallback)",
                role=role,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(detail="Write access denied")

        # Owner and editor always allowed
        if _ORG_ROLE_OWNER in org_roles or _ORG_ROLE_EDITOR in org_roles:
            return

        # Department admin: check scope
        if _ORG_ROLE_DEPARTMENT_ADMIN in org_roles:
            if department_param is None:
                # Company-level endpoint -- dept_admin cannot modify
                logger.warning(
                    API_GUARD_DENIED,
                    guard="require_org_mutation(dept_admin_no_scope)",
                    path=str(connection.url.path),
                )
                raise PermissionDeniedException(
                    detail="Department admins cannot modify company-level settings",
                )
            target_dept = connection.path_params.get(department_param, "")
            scoped = _get_scoped_departments(connection)
            if target_dept.lower() in (d.lower() for d in scoped):
                return
            logger.warning(
                API_GUARD_DENIED,
                guard="require_org_mutation(dept_admin_out_of_scope)",
                target_department=target_dept,
                scoped_departments=scoped,
                path=str(connection.url.path),
            )
            raise PermissionDeniedException(
                detail=f"Department admin access denied for {target_dept!r}",
            )

        # Viewer or unrecognised role
        logger.warning(
            API_GUARD_DENIED,
            guard="require_org_mutation(insufficient_org_role)",
            org_roles=org_roles,
            path=str(connection.url.path),
        )
        raise PermissionDeniedException(detail="Org mutation access denied")

    guard.__name__ = "require_org_mutation"
    guard.__qualname__ = "require_org_mutation"
    return guard

Middleware

middleware

Request middleware and before-send hooks.

Provides ASGI middleware for request logging, and a before_send hook that injects security headers (CSP, CORP, HSTS, Cache-Control, etc.) into every HTTP response -- including exception-handler and unmatched-route (404/405) responses.

Why before_send instead of ASGI middleware? Litestar's before_send hook wraps the ASGI send callback at the outermost layer (before the middleware stack), so it fires for all responses. By contrast, user-defined ASGI middleware only runs for matched routes -- 404 and 405 responses from the router bypass it.

RequestLoggingMiddleware

RequestLoggingMiddleware(app)

ASGI middleware that logs request start and completion.

Uses time.perf_counter() for high-resolution duration measurement. Only logs HTTP requests (non-HTTP scopes like WebSocket and lifespan are passed through without logging).

Source code in src/synthorg/api/middleware.py
def __init__(self, app: ASGIApp) -> None:
    self.app = app

__call__ async

__call__(scope, receive, send)

Process an ASGI request, logging start and completion.

Source code in src/synthorg/api/middleware.py
async def __call__(
    self,
    scope: Scope,
    receive: Receive,
    send: Send,
) -> None:
    """Process an ASGI request, logging start and completion."""
    if scope["type"] != ScopeType.HTTP:
        await self.app(scope, receive, send)
        return

    request: Request[Any, Any, Any] = Request(scope)
    method = request.method
    path = str(request.url.path)

    bind_correlation_id(request_id=generate_correlation_id())
    logger.info(API_REQUEST_STARTED, method=method, path=path)
    start = time.perf_counter()

    status_code: int | None = None
    original_send = send

    async def capture_send(message: Any) -> None:
        nonlocal status_code
        if (
            isinstance(message, dict)
            and message.get("type") == "http.response.start"
        ):
            raw_status = message.get("status")
            if raw_status is None:
                logger.warning(
                    API_ASGI_MISSING_STATUS,
                    type=message.get("type"),
                )
                status_code = 500
            else:
                status_code = raw_status
        await original_send(message)  # pyright: ignore[reportArgumentType]

    try:
        await self.app(scope, receive, capture_send)
    finally:
        elapsed_sec = time.perf_counter() - start
        duration_ms = round(elapsed_sec * 1000, 2)
        _log_request_completion(method, path, status_code, duration_ms)
        _record_request_metric(scope, method, status_code, elapsed_sec)
        clear_correlation_ids()

security_headers_hook async

security_headers_hook(message, scope)

Inject security headers into every HTTP response.

Registered as a Litestar before_send hook so it fires for all HTTP responses -- successful, exception-handler, and router-level 404/405.

Adds static security headers (CORP, HSTS, X-Content-Type-Options, etc.) and path-aware Content-Security-Policy (strict for API, relaxed for /docs/ to allow Scalar UI resources) and Cache-Control (no-store for API, public, max-age=300 for /docs/ since it serves public, non-user-specific content).

Uses __setitem__ (not add) so that if any handler or middleware already set a header, the known-good value overwrites it rather than creating a duplicate.

Parameters:

Name Type Description Default
message Message

ASGI message dict (only http.response.start is processed).

required
scope Scope

ASGI connection scope.

required
Source code in src/synthorg/api/middleware.py
async def security_headers_hook(message: Message, scope: Scope) -> None:
    """Inject security headers into every HTTP response.

    Registered as a Litestar ``before_send`` hook so it fires for
    **all** HTTP responses -- successful, exception-handler, and
    router-level 404/405.

    Adds static security headers (CORP, HSTS, X-Content-Type-Options,
    etc.) and path-aware Content-Security-Policy (strict for API,
    relaxed for ``/docs/`` to allow Scalar UI resources) and
    Cache-Control (``no-store`` for API, ``public, max-age=300``
    for ``/docs/`` since it serves public, non-user-specific content).

    Uses ``__setitem__`` (not ``add``) so that if any handler or
    middleware already set a header, the known-good value overwrites
    it rather than creating a duplicate.

    Args:
        message: ASGI message dict (only ``http.response.start``
            is processed).
        scope: ASGI connection scope.
    """
    if scope.get("type") != ScopeType.HTTP:
        return
    if message.get("type") != "http.response.start":
        return

    headers = MutableScopeHeaders.from_message(message)

    # Static security headers -- overwrite to prevent duplicates.
    for name, value in _SECURITY_HEADERS.items():
        headers[name] = value

    # Path-aware headers
    path: str = scope.get("path", "")
    is_docs = path == "/docs" or path.startswith("/docs/")
    headers["Content-Security-Policy"] = _DOCS_CSP if is_docs else _API_CSP

    # Relax COOP for /docs -- Scalar UI may open cross-origin popups
    # for OAuth/API proxy features via proxy.scalar.com.
    # same-origin-allow-popups: allows the page to open popups but
    # blocks cross-origin pages from retaining an opener reference,
    # preventing XS-Leak side-channel attacks via window.opener.
    # Allow brief caching for docs -- public, non-user-specific content.
    if is_docs:
        headers["Cross-Origin-Opener-Policy"] = "same-origin-allow-popups"
        headers["Cache-Control"] = _DOCS_CACHE_CONTROL

Pagination

pagination

In-memory pagination helper.

Applies offset/limit slicing to tuples and produces PaginationMeta for the response envelope.

PaginationOffset module-attribute

PaginationOffset = Annotated[int, Parameter(ge=0, description='Pagination offset')]

Query parameter type for pagination offset (>= 0).

PaginationLimit module-attribute

PaginationLimit = Annotated[int, Parameter(ge=1, le=MAX_LIMIT, description="Page size")]

Query parameter type for pagination limit (1-200).

paginate

paginate(items, *, offset, limit, total=None)

Slice a tuple and produce pagination metadata.

Clamps offset to [0, len(items)] and limit to [1, MAX_LIMIT] as a safety net.

Parameters:

Name Type Description Default
items tuple[T, ...]

Full collection to paginate.

required
offset int

Zero-based starting index.

required
limit int

Maximum items to return.

required
total int | None

True total count when items has been truncated upstream (e.g. by a safety cap). Defaults to len(items).

None

Returns:

Type Description
tuple[tuple[T, ...], PaginationMeta]

A tuple of (page_items, pagination_meta).

Source code in src/synthorg/api/pagination.py
def paginate[T](
    items: tuple[T, ...],
    *,
    offset: int,
    limit: int,
    total: int | None = None,
) -> tuple[tuple[T, ...], PaginationMeta]:
    """Slice a tuple and produce pagination metadata.

    Clamps ``offset`` to ``[0, len(items)]`` and ``limit`` to
    ``[1, MAX_LIMIT]`` as a safety net.

    Args:
        items: Full collection to paginate.
        offset: Zero-based starting index.
        limit: Maximum items to return.
        total: True total count when *items* has been truncated
            upstream (e.g. by a safety cap).  Defaults to
            ``len(items)``.

    Returns:
        A tuple of (page_items, pagination_meta).
    """
    effective_total = total if total is not None else len(items)
    offset = max(0, min(offset, len(items)))
    limit = max(1, min(limit, MAX_LIMIT))
    page = items[offset : offset + limit]
    meta = PaginationMeta(
        total=effective_total,
        offset=offset,
        limit=limit,
    )
    return page, meta

WebSocket Models

ws_models

WebSocket event models for real-time feeds.

Defines event types and the WsEvent payload that is serialised to JSON and pushed to WebSocket subscribers.

WsEventType

Bases: StrEnum

Types of real-time WebSocket events.

WsEvent pydantic-model

Bases: BaseModel

A real-time event pushed over WebSocket.

Callers must not mutate the payload dict after construction -- the dict is a mutable reference inside a frozen model.

Attributes:

Name Type Description
version int

Wire-protocol version. Clients MUST ignore events whose version they do not understand. Bump only when introducing a breaking change to WsEvent -- coordinate with the WS_PROTOCOL_VERSION constant in web/src/utils/constants.ts.

event_type WsEventType

Classification of the event.

channel NotBlankStr

Target channel name.

timestamp AwareDatetime

When the event occurred.

payload dict[str, object]

Event-specific data.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _deep_copy_payload

version pydantic-field

version = WS_PROTOCOL_VERSION

WS wire-protocol version (clients ignore unknown)

event_type pydantic-field

event_type

Event classification

channel pydantic-field

channel

Target channel name

timestamp pydantic-field

timestamp

When the event occurred

payload pydantic-field

payload

Event-specific data

Auth

config

Authentication configuration.

AuthConfig pydantic-model

Bases: BaseModel

JWT and authentication configuration.

The jwt_secret is resolved at application startup via a priority chain:

  1. SYNTHORG_JWT_SECRET environment variable (for multi-instance deployments sharing a common secret).
  2. Previously persisted secret in the settings table.
  3. Auto-generate a new secret and persist it for future runs.

At construction time the secret may be empty -- it is populated before the first request is served.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_secret_length
  • _validate_refresh_expiry
  • _validate_cookie_settings

jwt_secret pydantic-field

jwt_secret = ''

JWT signing secret (resolved at startup). Also used as the HMAC key for API key hash computation -- rotating this secret invalidates all stored API key hashes.

jwt_algorithm pydantic-field

jwt_algorithm = 'HS256'

JWT signing algorithm (HMAC family)

jwt_expiry_minutes pydantic-field

jwt_expiry_minutes = 1440

Token lifetime in minutes (default 24h)

min_password_length pydantic-field

min_password_length = 12

Minimum password length for setup and password change

exclude_paths pydantic-field

exclude_paths = None

Regex patterns for paths excluded from authentication. When None (default), paths are auto-derived from the API prefix (health, auth/setup, auth/login, docs, scalar UI). Use ^ to anchor at the start of the path and add $ when an exact match (rather than a prefix match) is required.

cookie_name pydantic-field

cookie_name = DEFAULT_COOKIE_NAME

Session cookie name

cookie_secure pydantic-field

cookie_secure = True

Secure flag on session cookies (HTTPS-only)

cookie_samesite pydantic-field

cookie_samesite = 'strict'

SameSite attribute for session cookies

cookie_path pydantic-field

cookie_path = '/api'

Path scope for the session cookie (HttpOnly)

csrf_cookie_path = '/'

Path scope for the CSRF cookie (non-HttpOnly). Defaults to / so document.cookie in JavaScript can read it from any SPA route; scoping it under /api (like the session cookie) would hide it from code running on application pages, breaking the double-submit pattern.

cookie_domain pydantic-field

cookie_domain = None

Domain for session cookies (None = current host)

csrf_cookie_name = DEFAULT_CSRF_COOKIE_NAME

CSRF token cookie name (non-HttpOnly, JS-readable)

csrf_header_name pydantic-field

csrf_header_name = DEFAULT_CSRF_HEADER_NAME

Header name for CSRF token submission

max_concurrent_sessions pydantic-field

max_concurrent_sessions = 5

Max concurrent sessions per user (0 = unlimited)

jwt_refresh_enabled pydantic-field

jwt_refresh_enabled = False

Enable refresh token rotation

jwt_refresh_expiry_minutes pydantic-field

jwt_refresh_expiry_minutes = 10080

Refresh token lifetime in minutes (default 7 days)

refresh_cookie_name = DEFAULT_REFRESH_COOKIE_NAME

Refresh token cookie name

refresh_cookie_path = DEFAULT_REFRESH_COOKIE_PATH

Path scope for refresh token cookie (narrow)

lockout_threshold pydantic-field

lockout_threshold = 10

Failed login attempts before account lockout

lockout_window_minutes pydantic-field

lockout_window_minutes = 15

Sliding window for counting failed attempts

lockout_duration_minutes pydantic-field

lockout_duration_minutes = 15

Auto-unlock duration after lockout

with_secret

with_secret(secret)

Return a copy with the JWT secret set.

Parameters:

Name Type Description Default
secret str

Resolved JWT signing secret.

required

Returns:

Type Description
AuthConfig

New AuthConfig with the secret populated.

Raises:

Type Description
ValueError

If the secret is too short.

Source code in src/synthorg/api/auth/config.py
def with_secret(self, secret: str) -> AuthConfig:
    """Return a copy with the JWT secret set.

    Args:
        secret: Resolved JWT signing secret.

    Returns:
        New ``AuthConfig`` with the secret populated.

    Raises:
        ValueError: If the secret is too short.
    """
    _require_valid_secret(secret)
    return self.model_copy(update={"jwt_secret": secret})

models

Authentication domain models.

AuthMethod

Bases: StrEnum

Authentication method used for a request.

OrgRole

Bases: StrEnum

Permission-level role for org configuration access.

Orthogonal to HumanRole (operational persona). HumanRole controls who you are in the org simulation; OrgRole controls what you can do to the org config.

User pydantic-model

Bases: BaseModel

Persisted user account.

Attributes:

Name Type Description
id NotBlankStr

Unique user identifier.

username NotBlankStr

Login username.

password_hash str

Argon2id hash (excluded from repr).

role HumanRole

Access control role.

must_change_password bool

Whether the user must change password.

org_roles tuple[OrgRole, ...]

Permission-level roles for org config access.

scoped_departments tuple[NotBlankStr, ...]

Departments accessible to dept admins.

created_at AwareDatetime

Account creation timestamp.

updated_at AwareDatetime

Last modification timestamp.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_scoped_departments

ApiKey pydantic-model

Bases: BaseModel

Persisted API key (hash-only storage).

Attributes:

Name Type Description
id NotBlankStr

Unique key identifier (UUID).

key_hash NotBlankStr

HMAC-SHA256 hex digest of the raw key.

name NotBlankStr

Human-readable label.

role HumanRole

Access control role.

user_id NotBlankStr

Owner user ID.

created_at AwareDatetime

Key creation timestamp (timezone-aware).

expires_at AwareDatetime | None

Optional expiry timestamp (timezone-aware).

revoked bool

Whether the key has been revoked.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

AuthenticatedUser pydantic-model

Bases: BaseModel

Lightweight identity attached to connection.user.

Populated by the auth middleware after successful authentication.

Attributes:

Name Type Description
user_id NotBlankStr

User's unique identifier.

username NotBlankStr

User's login name.

role HumanRole

Access control role.

auth_method AuthMethod

How the user authenticated.

must_change_password bool

Whether forced password change is pending.

org_roles tuple[OrgRole, ...]

Permission-level roles for org config access.

scoped_departments tuple[NotBlankStr, ...]

Departments accessible to dept admins.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_scoped_departments

service

Authentication service -- password hashing, JWT ops, API key hashing.

SecretNotConfiguredError

Bases: RuntimeError

Raised when the JWT secret is required but not configured.

AuthService

AuthService(config)

Immutable authentication operations.

Parameters:

Name Type Description Default
config AuthConfig

Authentication configuration (carries JWT secret).

required
Source code in src/synthorg/api/auth/service.py
def __init__(self, config: AuthConfig) -> None:
    self._config = config

hash_password

hash_password(password)

Hash a password with Argon2id.

Parameters:

Name Type Description Default
password str

Plaintext password.

required

Returns:

Type Description
str

Argon2id hash string.

Source code in src/synthorg/api/auth/service.py
def hash_password(self, password: str) -> str:
    """Hash a password with Argon2id.

    Args:
        password: Plaintext password.

    Returns:
        Argon2id hash string.
    """
    return _hasher.hash(password)

verify_password

verify_password(password, password_hash)

Verify a password against an Argon2id hash.

Parameters:

Name Type Description Default
password str

Plaintext password to check.

required
password_hash str

Stored Argon2id hash.

required

Returns:

Type Description
bool

True if the password matches.

Raises:

Type Description
VerificationError

On non-mismatch verification failures (e.g. unsupported parameters).

InvalidHashError

If the stored hash is corrupted or malformed (data integrity issue).

Source code in src/synthorg/api/auth/service.py
def verify_password(self, password: str, password_hash: str) -> bool:
    """Verify a password against an Argon2id hash.

    Args:
        password: Plaintext password to check.
        password_hash: Stored Argon2id hash.

    Returns:
        ``True`` if the password matches.

    Raises:
        argon2.exceptions.VerificationError: On non-mismatch
            verification failures (e.g. unsupported parameters).
        argon2.exceptions.InvalidHashError: If the stored hash
            is corrupted or malformed (data integrity issue).
    """
    try:
        return _hasher.verify(password_hash, password)
    except argon2.exceptions.VerifyMismatchError:
        return False
    except argon2.exceptions.VerificationError:
        logger.warning(
            API_AUTH_FAILED,
            reason="hash_verification_error",
            exc_info=True,
        )
        raise
    except argon2.exceptions.InvalidHashError:
        logger.error(
            API_AUTH_FAILED,
            reason="invalid_hash_data_corruption",
            exc_info=True,
        )
        raise

hash_password_async async

hash_password_async(password)

Hash a password with Argon2id in a thread executor.

Offloads the CPU-intensive hashing to avoid blocking the event loop.

Parameters:

Name Type Description Default
password str

Plaintext password.

required

Returns:

Type Description
str

Argon2id hash string.

Source code in src/synthorg/api/auth/service.py
async def hash_password_async(self, password: str) -> str:
    """Hash a password with Argon2id in a thread executor.

    Offloads the CPU-intensive hashing to avoid blocking the
    event loop.

    Args:
        password: Plaintext password.

    Returns:
        Argon2id hash string.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, self.hash_password, password)

verify_password_async async

verify_password_async(password, password_hash)

Verify a password against an Argon2id hash in a thread executor.

Offloads the CPU-intensive verification to avoid blocking the event loop.

Parameters:

Name Type Description Default
password str

Plaintext password to check.

required
password_hash str

Stored Argon2id hash.

required

Returns:

Type Description
bool

True if the password matches.

Source code in src/synthorg/api/auth/service.py
async def verify_password_async(
    self,
    password: str,
    password_hash: str,
) -> bool:
    """Verify a password against an Argon2id hash in a thread executor.

    Offloads the CPU-intensive verification to avoid blocking the
    event loop.

    Args:
        password: Plaintext password to check.
        password_hash: Stored Argon2id hash.

    Returns:
        ``True`` if the password matches.
    """
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None, self.verify_password, password, password_hash
    )

create_token

create_token(user)

Create a JWT for the given user.

The token includes a pwd_sig claim -- a 16-character truncated SHA-256 of the stored password hash. This is plain SHA-256, not HMAC -- the password hash is already a high-entropy Argon2id output, and the claim is protected by the JWT signature. The auth middleware validates this claim on every request so that tokens issued before a password change are automatically rejected.

A jti (JWT ID) claim is included for per-token session tracking and revocation.

Parameters:

Name Type Description Default
user User

Authenticated user.

required

Returns:

Type Description
tuple[str, int, str]

Tuple of (encoded JWT, expiry seconds, session ID).

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

Source code in src/synthorg/api/auth/service.py
def create_token(
    self,
    user: User,
) -> tuple[str, int, str]:
    """Create a JWT for the given user.

    The token includes a ``pwd_sig`` claim -- a 16-character
    truncated SHA-256 of the stored password hash.  This is
    plain SHA-256, not HMAC -- the password hash is already a
    high-entropy Argon2id output, and the claim is protected
    by the JWT signature.  The auth middleware validates this
    claim on every request so that tokens issued before a
    password change are automatically rejected.

    A ``jti`` (JWT ID) claim is included for per-token session
    tracking and revocation.

    Args:
        user: Authenticated user.

    Returns:
        Tuple of (encoded JWT, expiry seconds, session ID).

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
    """
    secret = self._require_secret("create_token")
    now = datetime.now(UTC)
    expiry_seconds = self._config.jwt_expiry_minutes * 60
    session_id = uuid.uuid4().hex
    pwd_sig = hashlib.sha256(
        user.password_hash.encode(),
    ).hexdigest()[:16]
    payload: dict[str, Any] = {
        "sub": user.id,
        "username": user.username,
        "role": user.role.value,
        "must_change_password": user.must_change_password,
        "pwd_sig": pwd_sig,
        "jti": session_id,
        "iat": now,
        "exp": now + timedelta(seconds=expiry_seconds),
    }
    token = jwt.encode(
        payload,
        secret,
        algorithm=self._config.jwt_algorithm,
    )
    return token, expiry_seconds, session_id

decode_token

decode_token(token)

Decode and validate a JWT.

Audience (aud) verification is intentionally disabled here (verify_aud=False) because audience validation is performed per-role in the auth middleware's _resolve_jwt_user. System-user tokens require aud=synthorg-backend; regular user tokens omit aud.

Parameters:

Name Type Description Default
token str

Encoded JWT string.

required

Returns:

Type Description
dict[str, Any]

Decoded claims dictionary.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

InvalidTokenError

If the token is invalid or expired.

Source code in src/synthorg/api/auth/service.py
def decode_token(self, token: str) -> dict[str, Any]:
    """Decode and validate a JWT.

    Audience (``aud``) verification is intentionally disabled
    here (``verify_aud=False``) because audience validation is
    performed per-role in the auth middleware's
    ``_resolve_jwt_user``.  System-user tokens require
    ``aud=synthorg-backend``; regular user tokens omit ``aud``.

    Args:
        token: Encoded JWT string.

    Returns:
        Decoded claims dictionary.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
        jwt.InvalidTokenError: If the token is invalid or expired.
    """
    secret = self._require_secret("decode_token")
    return jwt.decode(
        token,
        secret,
        algorithms=[self._config.jwt_algorithm],
        options={"require": ["exp", "iat", "sub", "jti"], "verify_aud": False},
    )

hash_api_key

hash_api_key(raw_key)

Compute HMAC-SHA256 hex digest of a raw API key.

Uses the server-side JWT secret as the HMAC key so that an attacker with read access to stored hashes cannot brute-force API keys offline.

Parameters:

Name Type Description Default
raw_key str

The plaintext API key.

required

Returns:

Type Description
str

Lowercase hex digest.

Raises:

Type Description
SecretNotConfiguredError

If the JWT secret is empty.

Source code in src/synthorg/api/auth/service.py
def hash_api_key(self, raw_key: str) -> str:
    """Compute HMAC-SHA256 hex digest of a raw API key.

    Uses the server-side JWT secret as the HMAC key so that
    an attacker with read access to stored hashes cannot
    brute-force API keys offline.

    Args:
        raw_key: The plaintext API key.

    Returns:
        Lowercase hex digest.

    Raises:
        SecretNotConfiguredError: If the JWT secret is empty.
    """
    secret = self._require_secret("hash_api_key")
    return hmac.digest(
        secret.encode(),
        raw_key.encode(),
        "sha256",
    ).hex()

generate_api_key staticmethod

generate_api_key()

Generate a cryptographically secure API key.

Returns:

Type Description
str

URL-safe base64 string (43 chars).

Source code in src/synthorg/api/auth/service.py
@staticmethod
def generate_api_key() -> str:
    """Generate a cryptographically secure API key.

    Returns:
        URL-safe base64 string (43 chars).
    """
    return secrets.token_urlsafe(32)

middleware

JWT + API key authentication middleware.

ApiAuthMiddleware

Bases: AbstractAuthenticationMiddleware

Authenticate requests via cookie, JWT header, or API key.

Authentication priority:

  1. Session cookie -- HttpOnly cookie set by login/setup. Primary auth path for browser sessions.
  2. Authorization header -- Bearer <token>. Tokens with dots are JWTs (system user CLI tokens). Tokens without dots are API keys (HMAC-SHA256 lookup).

Requires auth_service, persistence backend on app.state["app_state"].

authenticate_request async

authenticate_request(connection)

Validate the session cookie or Authorization header.

Tries the session cookie first. Falls back to the Authorization header for API keys and system user JWTs.

Parameters:

Name Type Description Default
connection ASGIConnection[Any, Any, Any, Any]

Incoming ASGI connection.

required

Returns:

Type Description
AuthenticationResult

AuthenticationResult with AuthenticatedUser.

Raises:

Type Description
NotAuthorizedException

If authentication fails.

Source code in src/synthorg/api/auth/middleware.py
async def authenticate_request(
    self,
    connection: ASGIConnection[Any, Any, Any, Any],
) -> AuthenticationResult:
    """Validate the session cookie or Authorization header.

    Tries the session cookie first.  Falls back to the
    Authorization header for API keys and system user JWTs.

    Args:
        connection: Incoming ASGI connection.

    Returns:
        AuthenticationResult with AuthenticatedUser.

    Raises:
        NotAuthorizedException: If authentication fails.
    """
    app_state = connection.app.state["app_state"]
    auth_service: AuthService = app_state.auth_service
    path = str(connection.url.path)

    # 1. Try session cookie (primary path for browser sessions)
    cookie_name = _get_cookie_name(app_state)
    session_cookie = connection.cookies.get(cookie_name)
    if session_cookie and "." in session_cookie:
        user = await _try_jwt_auth(
            session_cookie,
            auth_service,
            app_state,
            path,
        )
        if user is not None:
            logger.debug(
                API_AUTH_COOKIE_USED,
                user_id=user.user_id,
                path=path,
            )
            return AuthenticationResult(user=user, auth=session_cookie)

    if session_cookie:
        logger.warning(
            API_AUTH_FAILED,
            reason="cookie_jwt_invalid",
            path=path,
        )

    # 2. Fall back to Authorization header (API keys, system user)
    auth_header = connection.headers.get("authorization")
    if not auth_header:
        if session_cookie:
            # Cookie was present but invalid
            raise NotAuthorizedException(
                detail="Invalid session cookie",
            )
        logger.warning(
            API_AUTH_FAILED,
            reason="missing_authentication",
            path=path,
        )
        raise NotAuthorizedException(
            detail="Missing authentication",
        )

    token = _extract_bearer_token(auth_header)
    if token is None:
        logger.warning(
            API_AUTH_FAILED,
            reason="invalid_scheme",
            path=path,
        )
        raise NotAuthorizedException(
            detail="Invalid authorization scheme",
        )

    if "." in token:
        user = await _try_jwt_auth(
            token,
            auth_service,
            app_state,
            path,
        )
        if user is not None:
            return AuthenticationResult(user=user, auth=token)
        raise NotAuthorizedException(detail="Invalid JWT token")

    user = await _try_api_key_auth(
        token,
        auth_service,
        app_state,
        path,
    )
    if user is not None:
        return AuthenticationResult(user=user, auth=token)
    raise NotAuthorizedException(detail="Invalid credentials")

create_auth_middleware_class

create_auth_middleware_class(auth_config)

Create a middleware class with excluded paths baked in.

Litestar's AbstractAuthenticationMiddleware.__init__ takes exclude as a parameter (default None). We create a subclass whose __init__ forwards the configured exclude list to super().__init__.

The middleware is restricted to ScopeType.HTTP only -- WebSocket connections use ticket-based auth handled entirely inside the WS handler (see controllers/ws.py).

Parameters:

Name Type Description Default
auth_config AuthConfig

Auth configuration with exclude_paths.

required

Returns:

Type Description
type[ApiAuthMiddleware]

Middleware class ready for use in the Litestar middleware stack.

Source code in src/synthorg/api/auth/middleware.py
def create_auth_middleware_class(
    auth_config: AuthConfig,
) -> type[ApiAuthMiddleware]:
    """Create a middleware class with excluded paths baked in.

    Litestar's ``AbstractAuthenticationMiddleware.__init__`` takes
    ``exclude`` as a parameter (default ``None``).  We create a
    subclass whose ``__init__`` forwards the configured exclude
    list to ``super().__init__``.

    The middleware is restricted to ``ScopeType.HTTP`` only --
    WebSocket connections use ticket-based auth handled entirely
    inside the WS handler (see ``controllers/ws.py``).

    Args:
        auth_config: Auth configuration with exclude_paths.

    Returns:
        Middleware class ready for use in the Litestar middleware stack.
    """
    exclude_paths = (
        list(auth_config.exclude_paths) if auth_config.exclude_paths else None
    )

    class ConfiguredAuthMiddleware(ApiAuthMiddleware):
        """Auth middleware with pre-configured exclude paths."""

        def __init__(self, app: Any) -> None:
            super().__init__(
                app,
                exclude=exclude_paths,
                scopes={ScopeType.HTTP},
            )

    return ConfiguredAuthMiddleware