Skip to content

Observability

Structured logging, event constants, correlation tracking, and log sinks.

Logger

observability

Observability module for structured logging and correlation tracking.

Provides:

  • Structured logging via structlog with stdlib bridge
  • Log configuration with console and file sinks
  • Sensitive field sanitization
  • Correlation ID tracking via context variables

.. note::

Call :func:`configure_logging` once at application startup to
initialise the logging pipeline.  Use :func:`get_logger` in all
modules to obtain a bound structured logger.

get_logger

get_logger(name, **initial_bindings)

Get a structured logger bound to the given name.

Thin wrapper over :func:structlog.get_logger that ensures consistent logger creation across the codebase.

Usage::

from synthorg.observability import get_logger

logger = get_logger(__name__)
logger.info("something happened", key="value")

Parameters:

Name Type Description Default
name str

Logger name, typically __name__.

required
**initial_bindings Any

Key-value pairs bound to every log entry.

{}

Returns:

Type Description
BoundLogger

A bound structlog logger.

Source code in src/synthorg/observability/_logger.py
def get_logger(name: str, **initial_bindings: Any) -> structlog.stdlib.BoundLogger:
    """Get a structured logger bound to the given name.

    Thin wrapper over :func:`structlog.get_logger` that ensures
    consistent logger creation across the codebase.

    Usage::

        from synthorg.observability import get_logger

        logger = get_logger(__name__)
        logger.info("something happened", key="value")

    Args:
        name: Logger name, typically ``__name__``.
        **initial_bindings: Key-value pairs bound to every log entry.

    Returns:
        A bound structlog logger.
    """
    return structlog.get_logger(name, **initial_bindings)  # type: ignore[no-any-return]

Config

config

Observability configuration models.

Frozen Pydantic models for log sinks, rotation, and top-level logging configuration. All models are immutable and validated on construction.

.. note::

``DEFAULT_SINKS`` provides the standard eleven-sink layout described
in the design spec (console + ten file sinks).

RotationConfig pydantic-model

Bases: BaseModel

Log file rotation configuration.

Attributes:

Name Type Description
strategy RotationStrategy

Rotation mechanism to use.

max_bytes int

Maximum file size in bytes before rotation. Only used when strategy is :attr:RotationStrategy.BUILTIN.

backup_count int

Number of rotated backup files to keep.

compress_rotated bool

Whether to gzip-compress rotated backup files. Only supported with builtin rotation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _reject_compress_with_external

strategy pydantic-field

strategy = BUILTIN

Rotation mechanism

max_bytes pydantic-field

max_bytes = 10 * 1024 * 1024

Maximum file size in bytes before rotation

backup_count pydantic-field

backup_count = 5

Number of rotated backup files to keep

compress_rotated pydantic-field

compress_rotated = False

Gzip-compress rotated backup files

SinkConfig pydantic-model

Bases: BaseModel

Configuration for a single log output destination.

Attributes:

Name Type Description
sink_type SinkType

Where to send log output.

level LogLevel

Minimum log level for this sink.

file_path str | None

Relative path for FILE sinks (within log_dir).

rotation RotationConfig | None

Rotation settings for FILE sinks.

json_format bool

Whether to format output as JSON.

syslog_host str | None

Hostname for SYSLOG sinks.

syslog_port int

Port for SYSLOG sinks.

syslog_facility SyslogFacility

Syslog facility code.

syslog_protocol SyslogProtocol

Transport protocol (TCP or UDP).

http_url str | None

Endpoint URL for HTTP sinks.

http_headers tuple[tuple[str, str], ...]

Extra HTTP headers as (name, value) pairs.

http_batch_size int

Records per HTTP POST batch.

http_flush_interval_seconds float

Seconds between automatic flushes.

http_timeout_seconds float

HTTP request timeout.

http_max_retries int

Retry count on HTTP failure.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_sink_type_fields

sink_type pydantic-field

sink_type

Log output destination type

level pydantic-field

level = INFO

Minimum log level for this sink

file_path pydantic-field

file_path = None

Relative path for FILE sinks (within log_dir)

rotation pydantic-field

rotation = None

Rotation settings for FILE sinks

json_format pydantic-field

json_format = True

Whether to format output as JSON

syslog_host pydantic-field

syslog_host = None

Hostname for SYSLOG sinks

syslog_port pydantic-field

syslog_port = 514

Port for SYSLOG sinks

syslog_facility pydantic-field

syslog_facility = USER

Syslog facility code

syslog_protocol pydantic-field

syslog_protocol = UDP

Transport protocol (TCP or UDP)

http_url pydantic-field

http_url = None

Endpoint URL for HTTP sinks

http_headers pydantic-field

http_headers = ()

Extra HTTP headers as (name, value) pairs

http_batch_size pydantic-field

http_batch_size = 100

Records per HTTP POST batch

http_flush_interval_seconds pydantic-field

http_flush_interval_seconds = 5.0

Seconds between automatic flushes

http_timeout_seconds pydantic-field

http_timeout_seconds = 10.0

HTTP request timeout in seconds

http_max_retries pydantic-field

http_max_retries = 3

Retry count on HTTP failure

otlp_endpoint pydantic-field

otlp_endpoint = None

OTLP collector endpoint URL

otlp_protocol pydantic-field

otlp_protocol = HTTP_JSON

OTLP transport protocol

otlp_headers pydantic-field

otlp_headers = ()

Extra OTLP headers as (name, value) pairs

otlp_export_interval_seconds pydantic-field

otlp_export_interval_seconds = 5.0

Seconds between OTLP export batches

otlp_batch_size pydantic-field

otlp_batch_size = _DEFAULT_OTLP_BATCH_SIZE

Records per OTLP export batch

otlp_timeout_seconds pydantic-field

otlp_timeout_seconds = _DEFAULT_OTLP_TIMEOUT

HTTP request timeout in seconds for OTLP export

ContainerLogShippingConfig pydantic-model

Bases: BaseModel

Configuration for shipping container logs to the observability stack.

Controls whether sandbox and sidecar container logs are collected and shipped through the structlog pipeline after execution.

Attributes:

Name Type Description
enabled bool

Whether container log shipping is active.

ship_raw_logs bool

Whether to include raw stdout/stderr/sidecar payloads in shipped events (security-sensitive).

collection_timeout_seconds float

Timeout for collecting container logs.

max_log_bytes int

Total byte budget across all shipped fields per execution (stdout + stderr + sidecar logs combined).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

enabled pydantic-field

enabled = True

Whether to ship collected container logs

ship_raw_logs pydantic-field

ship_raw_logs = False

Include raw stdout/stderr/sidecar payloads in shipped events. When False, only metadata (sizes, counts, timing) is shipped. Enable only in trusted environments -- raw output may contain secrets that bypass key-name-based redaction.

collection_timeout_seconds pydantic-field

collection_timeout_seconds = 5.0

Timeout for log collection from containers

max_log_bytes pydantic-field

max_log_bytes = 10 * 1024 * 1024

Total byte budget per execution across all shipped fields

LogConfig pydantic-model

Bases: BaseModel

Top-level logging configuration.

Attributes:

Name Type Description
root_level LogLevel

Root logger level (handlers filter individually).

logger_levels tuple[tuple[NotBlankStr, LogLevel], ...]

Per-logger level overrides as (name, level) pairs.

sinks tuple[SinkConfig, ...]

Tuple of sink configurations.

enable_correlation bool

Whether to enable correlation ID tracking.

log_dir NotBlankStr

Directory for log files.

console_level str

Optional override for the console sink's log level, distinct from root_level. Empty string means "use the per-sink level / root_level default". Mirrors the observability.log_level_console registry entry; the console-override applier reads DB > env > YAML (this field)

unset.

container_log_shipping ContainerLogShippingConfig

Container log shipping configuration.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_at_least_one_sink
  • _validate_no_duplicate_logger_names
  • _validate_no_duplicate_file_paths
  • _validate_no_duplicate_syslog_endpoints
  • _validate_no_duplicate_http_urls
  • _validate_log_dir_safe

root_level pydantic-field

root_level = INFO

Root logger level. Defaults to INFO so HTTP log sinks do not leak verbose payloads or burn bandwidth on sampled streams; set to DEBUG explicitly (settings: observability.root_level) when operators need the full event stream. Per-logger overrides still force DEBUG on synthorg.engine / synthorg.memory so agent traces stay detailed.

logger_levels pydantic-field

logger_levels = ()

Per-logger level overrides as (name, level) pairs

sinks pydantic-field

sinks

Log output destinations

enable_correlation pydantic-field

enable_correlation = True

Whether to enable correlation ID tracking

log_dir pydantic-field

log_dir = 'logs'

Directory for log files

console_level pydantic-field

console_level = ''

Optional console-sink level override (mutable); empty string means use the per-sink / root level. The applier resolves DB > env (SYNTHORG_LOG_LEVEL) > YAML (this field) > unset through the observability.log_level_console registry entry.

container_log_shipping pydantic-field

container_log_shipping

Container log shipping configuration

Correlation

correlation

Correlation ID management for structured logging.

Uses structlog's contextvars integration for async-safe context propagation across agent actions, tasks, and API requests.

.. note::

All binding functions are safe to call from both sync and async
code because Python's :mod:`contextvars` is natively async-aware.

generate_correlation_id

generate_correlation_id()

Generate a new correlation ID.

Returns:

Type Description
str

A UUID4 string suitable for use as a correlation identifier.

Source code in src/synthorg/observability/correlation.py
def generate_correlation_id() -> str:
    """Generate a new correlation ID.

    Returns:
        A UUID4 string suitable for use as a correlation identifier.
    """
    return str(uuid.uuid4())

bind_correlation_id

bind_correlation_id(*, request_id=None, task_id=None, agent_id=None)

Bind correlation IDs to the current context.

Only non-None values are bound. Existing bindings for unspecified keys are left unchanged.

Parameters:

Name Type Description Default
request_id str | None

Request correlation identifier.

None
task_id str | None

Task correlation identifier.

None
agent_id str | None

Agent correlation identifier.

None
Source code in src/synthorg/observability/correlation.py
def bind_correlation_id(
    *,
    request_id: str | None = None,
    task_id: str | None = None,
    agent_id: str | None = None,
) -> None:
    """Bind correlation IDs to the current context.

    Only non-``None`` values are bound.  Existing bindings for
    unspecified keys are left unchanged.

    Args:
        request_id: Request correlation identifier.
        task_id: Task correlation identifier.
        agent_id: Agent correlation identifier.
    """
    bindings = _build_bindings(request_id, task_id, agent_id)
    if bindings:
        structlog.contextvars.bind_contextvars(**bindings)

unbind_correlation_id

unbind_correlation_id(*, request_id=False, task_id=False, agent_id=False)

Remove specific correlation IDs from the current context.

Parameters:

Name Type Description Default
request_id bool

Whether to unbind the request_id key.

False
task_id bool

Whether to unbind the task_id key.

False
agent_id bool

Whether to unbind the agent_id key.

False
Source code in src/synthorg/observability/correlation.py
def unbind_correlation_id(
    *,
    request_id: bool = False,
    task_id: bool = False,
    agent_id: bool = False,
) -> None:
    """Remove specific correlation IDs from the current context.

    Args:
        request_id: Whether to unbind the ``request_id`` key.
        task_id: Whether to unbind the ``task_id`` key.
        agent_id: Whether to unbind the ``agent_id`` key.
    """
    keys: list[str] = []
    if request_id:
        keys.append("request_id")
    if task_id:
        keys.append("task_id")
    if agent_id:
        keys.append("agent_id")
    if keys:
        structlog.contextvars.unbind_contextvars(*keys)

clear_correlation_ids

clear_correlation_ids()

Remove all correlation IDs from the current context.

Unbinds request_id, task_id, and agent_id. Other context variables are preserved.

Source code in src/synthorg/observability/correlation.py
def clear_correlation_ids() -> None:
    """Remove all correlation IDs from the current context.

    Unbinds ``request_id``, ``task_id``, and ``agent_id``.  Other
    context variables are preserved.
    """
    structlog.contextvars.unbind_contextvars(
        "request_id",
        "task_id",
        "agent_id",
    )

correlation_scope

correlation_scope(*, request_id=None, task_id=None, agent_id=None)

Scoped correlation binding that restores prior values on exit.

Uses structlog's bound_contextvars to save and restore any pre-existing correlation IDs, making this safe for nested execution contexts (e.g. hierarchical agent delegation).

Parameters:

Name Type Description Default
request_id str | None

Request correlation identifier to bind.

None
task_id str | None

Task correlation identifier to bind.

None
agent_id str | None

Agent correlation identifier to bind.

None
Source code in src/synthorg/observability/correlation.py
@contextmanager
def correlation_scope(
    *,
    request_id: str | None = None,
    task_id: str | None = None,
    agent_id: str | None = None,
) -> Iterator[None]:
    """Scoped correlation binding that restores prior values on exit.

    Uses structlog's ``bound_contextvars`` to save and restore any
    pre-existing correlation IDs, making this safe for nested
    execution contexts (e.g. hierarchical agent delegation).

    Args:
        request_id: Request correlation identifier to bind.
        task_id: Task correlation identifier to bind.
        agent_id: Agent correlation identifier to bind.
    """
    bindings = _build_bindings(request_id, task_id, agent_id)
    if bindings:
        with structlog.contextvars.bound_contextvars(**bindings):
            yield
    else:
        yield

with_correlation

with_correlation(*, request_id=None, task_id=None, agent_id=None)

Decorator that binds correlation IDs for a function's duration.

Correlation IDs are bound before the function executes and unbound after it returns or raises. Only non-None IDs are managed.

Note

This decorator is for synchronous functions only. Applying it to an async def function raises :exc:TypeError. For async functions, use :func:with_correlation_async instead.

Parameters:

Name Type Description Default
request_id str | None

Request correlation identifier to bind.

None
task_id str | None

Task correlation identifier to bind.

None
agent_id str | None

Agent correlation identifier to bind.

None

Returns:

Type Description
Callable[[Callable[_P, _T]], Callable[_P, _T]]

A decorator that manages correlation ID lifecycle.

Raises:

Type Description
TypeError

If the decorated function is a coroutine function.

Source code in src/synthorg/observability/correlation.py
def with_correlation(
    *,
    request_id: str | None = None,
    task_id: str | None = None,
    agent_id: str | None = None,
) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
    """Decorator that binds correlation IDs for a function's duration.

    Correlation IDs are bound before the function executes and unbound
    after it returns or raises.  Only non-``None`` IDs are managed.

    Note:
        This decorator is for **synchronous** functions only.  Applying
        it to an ``async def`` function raises :exc:`TypeError`.  For
        async functions, use :func:`with_correlation_async` instead.

    Args:
        request_id: Request correlation identifier to bind.
        task_id: Task correlation identifier to bind.
        agent_id: Agent correlation identifier to bind.

    Returns:
        A decorator that manages correlation ID lifecycle.

    Raises:
        TypeError: If the decorated function is a coroutine function.
    """

    def decorator(func: Callable[_P, _T]) -> Callable[_P, _T]:
        if inspect.iscoroutinefunction(func):
            msg = (
                "with_correlation() does not support async functions. "
                "Use with_correlation_async() instead."
            )
            logger.warning(
                CORRELATION_SYNC_DECORATOR_MISUSE,
                function=func.__qualname__,
            )
            raise TypeError(msg)

        @functools.wraps(func)
        def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
            bindings = _build_bindings(request_id, task_id, agent_id)
            with structlog.contextvars.bound_contextvars(**bindings):
                return func(*args, **kwargs)

        return wrapper

    return decorator

with_correlation_async

with_correlation_async(*, request_id=None, task_id=None, agent_id=None)

Decorator that binds correlation IDs for an async function's duration.

Correlation IDs are bound before the coroutine executes and unbound after it returns or raises. Only non-None IDs are managed.

Note

This decorator is for async functions only. Applying it to a synchronous function raises :exc:TypeError. For sync functions use :func:with_correlation.

Parameters:

Name Type Description Default
request_id str | None

Request correlation identifier to bind.

None
task_id str | None

Task correlation identifier to bind.

None
agent_id str | None

Agent correlation identifier to bind.

None

Returns:

Type Description
Callable[[Callable[_P, Coroutine[object, object, _T]]], Callable[_P, Coroutine[object, object, _T]]]

A decorator that manages correlation ID lifecycle for async

Callable[[Callable[_P, Coroutine[object, object, _T]]], Callable[_P, Coroutine[object, object, _T]]]

functions.

Raises:

Type Description
TypeError

If the decorated function is not a coroutine function.

Source code in src/synthorg/observability/correlation.py
def with_correlation_async(
    *,
    request_id: str | None = None,
    task_id: str | None = None,
    agent_id: str | None = None,
) -> Callable[
    [Callable[_P, Coroutine[object, object, _T]]],
    Callable[_P, Coroutine[object, object, _T]],
]:
    """Decorator that binds correlation IDs for an async function's duration.

    Correlation IDs are bound before the coroutine executes and unbound
    after it returns or raises.  Only non-``None`` IDs are managed.

    Note:
        This decorator is for **async** functions only.  Applying it to
        a synchronous function raises :exc:`TypeError`.  For sync
        functions use :func:`with_correlation`.

    Args:
        request_id: Request correlation identifier to bind.
        task_id: Task correlation identifier to bind.
        agent_id: Agent correlation identifier to bind.

    Returns:
        A decorator that manages correlation ID lifecycle for async
        functions.

    Raises:
        TypeError: If the decorated function is not a coroutine function.
    """

    def decorator(
        func: Callable[_P, Coroutine[object, object, _T]],
    ) -> Callable[_P, Coroutine[object, object, _T]]:
        if not inspect.iscoroutinefunction(func):
            msg = (
                "with_correlation_async() requires an async function. "
                "Use with_correlation() for synchronous functions."
            )
            logger.warning(
                CORRELATION_ASYNC_DECORATOR_MISUSE,
                function=func.__qualname__,
            )
            raise TypeError(msg)

        @functools.wraps(func)
        async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _T:
            bindings = _build_bindings(request_id, task_id, agent_id)
            with structlog.contextvars.bound_contextvars(**bindings):
                return await func(*args, **kwargs)

        return wrapper

    return decorator

Setup

setup

Logging system setup and configuration.

Provides the idempotent :func:configure_logging entry point that wires structlog processors, stdlib handlers, and per-logger levels.

configure_logging

configure_logging(config=None, *, routing_overrides=None)

Configure the structured logging system.

Sets up structlog processor chains, stdlib handlers, and per-logger levels. This function is idempotent -- calling it multiple times replaces the previous configuration without duplicating handlers.

Respects the SYNTHORG_LOG_LEVEL env var to override the console sink level (useful for Docker deployments).

Parameters:

Name Type Description Default
config LogConfig | None

Logging configuration. When None, uses sensible defaults with all standard sinks.

None
routing_overrides Mapping[str, tuple[str, ...]] | None

Optional extra logger-name routing entries (e.g. from custom sinks) merged with the default SINK_ROUTING table.

None

Raises:

Type Description
RuntimeError

If a critical sink fails to initialise.

Source code in src/synthorg/observability/setup.py
def configure_logging(
    config: LogConfig | None = None,
    *,
    routing_overrides: Mapping[str, tuple[str, ...]] | None = None,
) -> None:
    """Configure the structured logging system.

    Sets up structlog processor chains, stdlib handlers, and per-logger
    levels.  This function is **idempotent** -- calling it multiple times
    replaces the previous configuration without duplicating handlers.

    Respects the ``SYNTHORG_LOG_LEVEL`` env var to override the console
    sink level (useful for Docker deployments).

    Args:
        config: Logging configuration.  When ``None``, uses sensible
            defaults with all standard sinks.
        routing_overrides: Optional extra logger-name routing entries
            (e.g. from custom sinks) merged with the default
            ``SINK_ROUTING`` table.

    Raises:
        RuntimeError: If a critical sink fails to initialise.
    """
    if config is None:
        config = LogConfig(sinks=DEFAULT_SINKS)

    config = _apply_console_level_override(config)

    # 1. Reset structlog to a clean state
    structlog.reset_defaults()

    # 2. Clear existing stdlib root handlers
    root_logger = logging.getLogger()
    _clear_root_handlers(root_logger)

    # 3. Set root logger level from config
    root_logger.setLevel(config.root_level.value)

    # 4. Build shared processor chain (foreign pre-chain)
    shared = _build_shared_processors(
        enable_correlation=config.enable_correlation,
    )

    # 5. Configure structlog main chain
    _configure_structlog(enable_correlation=config.enable_correlation)

    # 6. Build and attach handlers for each sink
    _attach_handlers(
        config,
        root_logger,
        shared,
        routing_overrides=routing_overrides,
    )

    # 7. Tame third-party loggers (clear duplicate handlers, set defaults)
    _tame_third_party_loggers()

    # 8. Apply per-logger levels (after taming so user overrides take precedence)
    _apply_logger_levels(config)

Processors

processors

Custom structlog processors for the observability pipeline.

sanitize_sensitive_fields

sanitize_sensitive_fields(logger, method_name, event_dict)

Redact values of keys matching sensitive patterns.

Returns a new dict rather than mutating the original event dict, following the project's immutability convention. Redaction is applied recursively to nested dicts, lists, and tuples.

Parameters:

Name Type Description Default
logger Any

The wrapped logger object (unused, required by structlog).

required
method_name str

The name of the log method called (unused).

required
event_dict MutableMapping[str, Any]

The event dictionary to process.

required

Returns:

Type Description
Mapping[str, Any]

A new event dict with sensitive values replaced by

Mapping[str, Any]

**REDACTED** at all nesting depths.

Source code in src/synthorg/observability/processors.py
def sanitize_sensitive_fields(
    logger: Any,  # noqa: ARG001
    method_name: str,  # noqa: ARG001
    event_dict: MutableMapping[str, Any],
) -> Mapping[str, Any]:
    """Redact values of keys matching sensitive patterns.

    Returns a new dict rather than mutating the original event dict,
    following the project's immutability convention.  Redaction is
    applied recursively to nested dicts, lists, and tuples.

    Args:
        logger: The wrapped logger object (unused, required by structlog).
        method_name: The name of the log method called (unused).
        event_dict: The event dictionary to process.

    Returns:
        A new event dict with sensitive values replaced by
        ``**REDACTED**`` at all nesting depths.
    """
    return {
        key: (
            _REDACTED
            if isinstance(key, str) and _SENSITIVE_PATTERN.search(key)
            else _redact_value(value)
        )
        for key, value in event_dict.items()
    }

scrub_event_fields

scrub_event_fields(logger, method_name, event_dict)

Deep-scrub credential patterns out of every string value.

Belt-and-braces defence against the error=str(exc) leak vector: even when a caller embeds a stringified exception (or response body) that carries client_secret=..., "access_token":"...", Authorization: Bearer ..., or raw Fernet ciphertext, this processor rewrites the string so those substrings are masked before the renderer sees them.

Runs after sanitize_sensitive_fields so keys that the field-name scrubber already replaced with **REDACTED** stay redacted.

Robustness contract: this processor runs on every log record. If _scrub_value raises (e.g. a corrupted object whose repr blows up, or a pathological recursive structure), we return the original event dict unchanged rather than letting the exception propagate and abort the caller's log call. Losing scrubbing on one event is preferable to silencing the entire logging pipeline at the moment of crisis.

Parameters:

Name Type Description Default
logger Any

The wrapped logger object (unused, required by structlog).

required
method_name str

The name of the log method called (unused).

required
event_dict MutableMapping[str, Any]

The event dictionary to process.

required

Returns:

Type Description
Mapping[str, Any]

A new event dict with every string value scrubbed via

Mapping[str, Any]

func:synthorg.observability.redaction.scrub_secret_tokens,

Mapping[str, Any]

or the original dict if the scrub itself fails.

Source code in src/synthorg/observability/processors.py
def scrub_event_fields(
    logger: Any,  # noqa: ARG001
    method_name: str,  # noqa: ARG001
    event_dict: MutableMapping[str, Any],
) -> Mapping[str, Any]:
    """Deep-scrub credential patterns out of every string value.

    Belt-and-braces defence against the ``error=str(exc)`` leak
    vector: even when a caller embeds a stringified exception (or
    response body) that carries ``client_secret=...``,
    ``"access_token":"..."``, ``Authorization: Bearer ...``, or raw
    Fernet ciphertext, this processor rewrites the string so those
    substrings are masked before the renderer sees them.

    Runs *after* ``sanitize_sensitive_fields`` so keys that the
    field-name scrubber already replaced with ``**REDACTED**`` stay
    redacted.

    **Robustness contract**: this processor runs on every log record.
    If ``_scrub_value`` raises (e.g. a corrupted object whose ``repr``
    blows up, or a pathological recursive structure), we return the
    *original* event dict unchanged rather than letting the exception
    propagate and abort the caller's log call. Losing scrubbing on one
    event is preferable to silencing the entire logging pipeline at the
    moment of crisis.

    Args:
        logger: The wrapped logger object (unused, required by structlog).
        method_name: The name of the log method called (unused).
        event_dict: The event dictionary to process.

    Returns:
        A new event dict with every string value scrubbed via
        :func:`synthorg.observability.redaction.scrub_secret_tokens`,
        or the original dict if the scrub itself fails.
    """
    try:
        return {key: _scrub_value(value) for key, value in event_dict.items()}
    except MemoryError, RecursionError:
        # Interpreter-fatal errors must propagate per project convention
        # -- swallowing them here would hide exactly the class of failures
        # the rest of the codebase relies on for surfacing catastrophic
        # state.
        raise
    except Exception as exc:
        # Fail open: pass the event through unscrubbed rather than drop
        # the log line entirely.  Still safer than crashing the log
        # pipeline -- ``sanitize_sensitive_fields`` (which ran just
        # before us) has already redacted known-sensitive *field names*.
        # We write to ``sys.stderr`` directly (never via ``logger``) so
        # operators notice the scrub regression without triggering a
        # recursive log-through-logger failure. ``processors.py`` is
        # not on the ``print()`` allowlist, so we use the raw stream
        # write instead of ``print(file=sys.stderr)``.
        sys.stderr.write(
            f"WARNING: scrub_event_fields failed; event passed unscrubbed: "
            f"{type(exc).__name__}\n",
        )
        sys.stderr.flush()
        return event_dict

Sinks

sinks

Log handler factory for building stdlib handlers from sink config.

Translates :class:~synthorg.observability.config.SinkConfig instances into fully configured :class:logging.Handler objects with the appropriate structlog :class:~structlog.stdlib.ProcessorFormatter.

build_handler

build_handler(sink, log_dir, foreign_pre_chain, *, routing=None)

Build a stdlib logging handler from a sink configuration.

For CONSOLE sinks a :class:logging.StreamHandler writing to stderr is created. For FILE sinks see :func:_build_file_handler. For SYSLOG and HTTP sinks, dedicated handler builders are used.

Note: SYSLOG and HTTP sinks are built and returned by dedicated handler modules; they do not participate in logger-name routing.

Parameters:

Name Type Description Default
sink SinkConfig

The sink configuration describing the handler to build.

required
log_dir Path

Base directory for log files.

required
foreign_pre_chain list[Any]

Processor chain for stdlib-originated logs.

required
routing Mapping[str, tuple[str, ...]] | None

Optional routing table to use instead of the module-level SINK_ROUTING. When None, the default routing is used.

None

Returns:

Type Description
Handler

A configured :class:logging.Handler with formatter attached.

Source code in src/synthorg/observability/sinks.py
def build_handler(
    sink: SinkConfig,
    log_dir: Path,
    foreign_pre_chain: list[Any],
    *,
    routing: Mapping[str, tuple[str, ...]] | None = None,
) -> logging.Handler:
    """Build a stdlib logging handler from a sink configuration.

    For ``CONSOLE`` sinks a :class:`logging.StreamHandler` writing to
    ``stderr`` is created.  For ``FILE`` sinks see
    :func:`_build_file_handler`.  For ``SYSLOG`` and ``HTTP`` sinks,
    dedicated handler builders are used.

    Note: SYSLOG and HTTP sinks are built and returned by dedicated
    handler modules; they do not participate in logger-name routing.

    Args:
        sink: The sink configuration describing the handler to build.
        log_dir: Base directory for log files.
        foreign_pre_chain: Processor chain for stdlib-originated logs.
        routing: Optional routing table to use instead of the
            module-level ``SINK_ROUTING``.  When ``None``, the
            default routing is used.

    Returns:
        A configured :class:`logging.Handler` with formatter attached.
    """
    effective_routing = routing if routing is not None else SINK_ROUTING

    handler: logging.Handler
    match sink.sink_type:
        case SinkType.CONSOLE:
            handler = logging.StreamHandler(sys.stderr)
        case SinkType.FILE:
            handler = _build_file_handler(sink, log_dir)
        case SinkType.SYSLOG:
            from synthorg.observability.syslog_handler import (  # noqa: PLC0415
                build_syslog_handler,
            )

            return build_syslog_handler(sink, foreign_pre_chain)
        case SinkType.HTTP:
            from synthorg.observability.http_handler import (  # noqa: PLC0415
                build_http_handler,
            )

            return build_http_handler(sink, foreign_pre_chain)
        case SinkType.PROMETHEUS:
            # Prometheus is pull-based (scrape endpoint), not a log handler.
            # Return a no-op handler -- the /metrics controller serves metrics.
            handler = logging.NullHandler()
            handler.setLevel(sink.level.value)
            return handler
        case SinkType.OTLP:
            from synthorg.observability.otlp_handler import (  # noqa: PLC0415
                build_otlp_handler,
            )

            return build_otlp_handler(sink, foreign_pre_chain)
        case _:  # pragma: no cover
            msg = f"Unsupported sink type: {sink.sink_type}"  # type: ignore[unreachable]
            raise ValueError(msg)

    _attach_formatter_and_routing(
        handler,
        sink,
        foreign_pre_chain,
        effective_routing,
    )
    return handler

Sink Config Builder

sink_config_builder

Build a LogConfig from DEFAULT_SINKS + runtime overrides + custom sinks.

Pure-function module that merges static defaults with runtime settings to produce a validated :class:LogConfig suitable for :func:configure_logging.

The two JSON inputs come from SettingsService settings:

  • sink_overrides: JSON object keyed by sink identifier (__console__ for the console sink, file path for file sinks). Each value is an object with optional fields: enabled, level, json_format, rotation.
  • custom_sinks: JSON array of objects, each describing a new sink (file, syslog, or http). File sinks require file_path; syslog sinks require syslog_host; HTTP sinks require http_url. All types accept optional level.

SinkBuildResult dataclass

SinkBuildResult(config, routing_overrides)

Result of building a LogConfig from settings.

Attributes:

Name Type Description
config LogConfig

The fully validated logging configuration.

routing_overrides MappingProxyType[str, tuple[str, ...]]

Custom sink routing entries keyed by file_path, mapping to logger name prefix tuples.

build_log_config_from_settings

build_log_config_from_settings(
    *,
    root_level,
    enable_correlation,
    sink_overrides_json,
    custom_sinks_json,
    log_dir="logs",
)

Merge DEFAULT_SINKS with runtime overrides and custom sinks.

Parameters:

Name Type Description Default
root_level LogLevel

Root logger level.

required
enable_correlation bool

Whether to enable correlation ID tracking.

required
sink_overrides_json str

JSON object of per-sink overrides.

required
custom_sinks_json str

JSON array of custom sink definitions.

required
log_dir str

Directory for log files.

'logs'

Returns:

Name Type Description
A SinkBuildResult

class:SinkBuildResult containing the validated

SinkBuildResult

class:LogConfig and any routing overrides for custom sinks.

Raises:

Type Description
ValueError

On invalid JSON, validation failures, or attempts to disable the console sink.

Source code in src/synthorg/observability/sink_config_builder.py
def build_log_config_from_settings(
    *,
    root_level: LogLevel,
    enable_correlation: bool,
    sink_overrides_json: str,
    custom_sinks_json: str,
    log_dir: str = "logs",
) -> SinkBuildResult:
    """Merge DEFAULT_SINKS with runtime overrides and custom sinks.

    Args:
        root_level: Root logger level.
        enable_correlation: Whether to enable correlation ID tracking.
        sink_overrides_json: JSON object of per-sink overrides.
        custom_sinks_json: JSON array of custom sink definitions.
        log_dir: Directory for log files.

    Returns:
        A :class:`SinkBuildResult` containing the validated
        :class:`LogConfig` and any routing overrides for custom sinks.

    Raises:
        ValueError: On invalid JSON, validation failures, or
            attempts to disable the console sink.
    """
    overrides = _parse_sink_overrides(sink_overrides_json)
    custom_entries = _parse_custom_sinks(custom_sinks_json)

    merged = _merge_default_sinks(overrides)
    routing = _process_custom_entries(custom_entries, merged)

    config = LogConfig(
        root_level=root_level,
        enable_correlation=enable_correlation,
        sinks=tuple(merged),
        log_dir=log_dir,
    )
    return SinkBuildResult(config=config, routing_overrides=routing)

Enums

enums

Observability-specific enumerations.

LogLevel

Bases: StrEnum

Standard log severity levels.

Values match Python's stdlib logging level names for seamless integration between structlog and the logging module.

RotationStrategy

Bases: StrEnum

Log file rotation strategies.

Attributes:

Name Type Description
BUILTIN

Size-based rotation via RotatingFileHandler.

EXTERNAL

Watched rotation via WatchedFileHandler (logrotate).

SinkType

Bases: StrEnum

Log output destination types.

Attributes:

Name Type Description
CONSOLE

Write to stderr via StreamHandler.

FILE

Write to a log file with optional rotation.

SYSLOG

Ship structured JSON to a syslog endpoint.

HTTP

POST JSON log batches to an HTTP endpoint.

PROMETHEUS

Prometheus metrics scrape endpoint (pull-based).

OTLP

OpenTelemetry Protocol log/trace exporter (push-based).

OtlpProtocol

Bases: StrEnum

OpenTelemetry Protocol transport.

Attributes:

Name Type Description
HTTP_JSON

HTTP with JSON encoding (the only implemented transport).

GRPC

gRPC transport (not implemented; rejected at handler init).

SyslogFacility

Bases: StrEnum

Syslog facility codes.

Maps to logging.handlers.SysLogHandler.LOG_* constants.

SyslogProtocol

Bases: StrEnum

Syslog transport protocol.

Attributes:

Name Type Description
TCP

Reliable delivery via socket.SOCK_STREAM.

UDP

Lightweight delivery via socket.SOCK_DGRAM.

Events

events

Per-domain event name constants for observability.

All event names follow a dotted domain.subject[.qualifier] convention and are used as the first positional argument to structured log calls::

from synthorg.observability.events.config import CONFIG_LOADED

logger.info(CONFIG_LOADED, config_path=path)

Import constants from their domain module directly (e.g. events.provider, events.budget, events.tool).