Skip to content

Security

Security subsystem -- rule engine, trust strategies, autonomy levels, output scanning, and timeout policies.

Protocol

protocol

SecurityInterceptionStrategy protocol.

Defines the async interface that the ToolInvoker calls for pre-tool security checks and post-tool output scanning.

SecurityInterceptionStrategy

Bases: Protocol

Protocol for the security interception layer.

The ToolInvoker calls evaluate_pre_tool before execution and scan_output after execution. Implementations may be sync-backed (rule engine) or async (future LLM fallback).

evaluate_pre_tool async

evaluate_pre_tool(context)

Evaluate a tool invocation before execution.

Parameters:

Name Type Description Default
context SecurityContext

The tool call's security context.

required

Returns:

Type Description
SecurityVerdict

A verdict: allow, deny, or escalate.

Source code in src/synthorg/security/protocol.py
async def evaluate_pre_tool(
    self,
    context: SecurityContext,
) -> SecurityVerdict:
    """Evaluate a tool invocation before execution.

    Args:
        context: The tool call's security context.

    Returns:
        A verdict: allow, deny, or escalate.
    """
    ...

scan_output async

scan_output(context, output)

Scan tool output for sensitive data after execution.

Parameters:

Name Type Description Default
context SecurityContext

The tool call's security context.

required
output str

The tool's output string.

required

Returns:

Type Description
OutputScanResult

An OutputScanResult with findings and optional redaction.

Source code in src/synthorg/security/protocol.py
async def scan_output(
    self,
    context: SecurityContext,
    output: str,
) -> OutputScanResult:
    """Scan tool output for sensitive data after execution.

    Args:
        context: The tool call's security context.
        output: The tool's output string.

    Returns:
        An ``OutputScanResult`` with findings and optional redaction.
    """
    ...

Config

config

Security configuration models.

Defines SecurityConfig (the top-level security configuration), RuleEngineConfig, SecurityPolicyRule, and OutputScanPolicyType for output scan response policy selection.

SecurityEnforcementMode

Bases: StrEnum

Security enforcement mode for the SecOps service.

Controls whether security verdicts are enforced, logged only (shadow mode for calibration), or fully disabled.

Members

ACTIVE: Full enforcement -- verdicts are applied as-is. SHADOW: Shadow mode -- full evaluation pipeline runs and audit entries are recorded, but blocking verdicts (DENY, ESCALATE) are converted to ALLOW. Used for pre-deployment calibration of risk budgets. DISABLED: Security subsystem is disabled -- no evaluation, always ALLOW.

OutputScanPolicyType

Bases: StrEnum

Declarative output scan policy selection.

Used in SecurityConfig to select the output scan response policy at config time. Runtime constructor injection is also supported for full flexibility.

Members

REDACT: Return redacted content (scanner-level redaction). WITHHOLD: Clear redacted content, forcing fail-closed. LOG_ONLY: Log findings but pass output through. AUTONOMY_TIERED: Delegate based on effective autonomy level (default -- falls back to REDACT when no autonomy is configured).

VerdictReasonVisibility

Bases: StrEnum

Controls how much of the LLM evaluator's reason is visible to agents.

Attributes:

Name Type Description
FULL

Return the full LLM reason to the agent.

GENERIC

Return a generic denial/escalation message.

CATEGORY

Return verdict type and risk level only.

ArgumentTruncationStrategy

Bases: StrEnum

How to truncate large tool arguments for the LLM security prompt.

Attributes:

Name Type Description
WHOLE_STRING

Truncate the serialized JSON at a character limit.

PER_VALUE

Truncate each argument value individually before serialization, preserving all key names.

KEYS_AND_VALUES

Include all keys with individually capped values (explicit about key preservation).

LlmFallbackErrorPolicy

Bases: StrEnum

What to do when the LLM security evaluation fails.

Attributes:

Name Type Description
USE_RULE_VERDICT

Fall back to the original rule engine verdict.

ESCALATE

Send the action to the human approval queue.

DENY

Deny the action (fail-closed).

LlmFallbackConfig pydantic-model

Bases: BaseModel

Configuration for LLM-based security evaluation fallback.

When enabled, actions that the rule engine cannot classify (no rule matched, low confidence) are routed to an LLM from a different provider family for cross-validation.

Attributes:

Name Type Description
enabled bool

Whether LLM fallback is active.

model NotBlankStr | None

Explicit model ID for security evaluation. When None, the evaluator picks the first model from the selected provider (cross-family preferred, same-family fallback).

timeout_seconds float

Maximum time for the LLM call.

max_input_tokens int

Token budget cap for security eval prompts.

on_error LlmFallbackErrorPolicy

Policy when the LLM call fails.

reason_visibility VerdictReasonVisibility

How much of the LLM reason is visible to the evaluated agent.

argument_truncation ArgumentTruncationStrategy

Strategy for truncating large tool arguments in the LLM prompt.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

SecurityPolicyRule pydantic-model

Bases: BaseModel

A single configurable security policy rule.

Attributes:

Name Type Description
name NotBlankStr

Rule name (used in matched_rules lists).

description str

Human-readable description.

action_types tuple[str, ...]

Action types this rule applies to (category:action).

verdict SecurityVerdictType

Verdict to return when rule matches.

risk_level ApprovalRiskLevel

Risk level to assign.

enabled bool

Whether this rule is active.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _check_action_type_format

RuleEngineConfig pydantic-model

Bases: BaseModel

Configuration for the synchronous rule engine.

Attributes:

Name Type Description
credential_patterns_enabled bool

Detect credentials in arguments.

data_leak_detection_enabled bool

Detect sensitive file paths / PII.

destructive_op_detection_enabled bool

Detect destructive operations.

path_traversal_detection_enabled bool

Detect path traversal attacks.

max_argument_length int

Maximum argument string length for scanning.

custom_allow_bypasses_detectors bool

When True, custom ALLOW policies are placed before detectors, allowing them to short-circuit security scanning. When False (default), custom policies are placed after all detectors so security scanning always runs first.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • credential_patterns_enabled (bool)
  • data_leak_detection_enabled (bool)
  • destructive_op_detection_enabled (bool)
  • path_traversal_detection_enabled (bool)
  • max_argument_length (int)
  • custom_allow_bypasses_detectors (bool)

SecurityConfig pydantic-model

Bases: BaseModel

Top-level security configuration.

Attributes:

Name Type Description
enabled bool

Master switch for the security subsystem.

enforcement_mode SecurityEnforcementMode

Security enforcement mode (active/shadow/disabled).

rule_engine RuleEngineConfig

Rule engine configuration.

llm_fallback LlmFallbackConfig

LLM-based fallback for uncertain evaluations.

audit_enabled bool

Whether to record audit entries.

post_tool_scanning_enabled bool

Scan tool output for secrets.

hard_deny_action_types tuple[str, ...]

Action types always denied.

auto_approve_action_types tuple[str, ...]

Action types always approved.

output_scan_policy_type OutputScanPolicyType

Output scan response policy (default: AUTONOMY_TIERED).

custom_policies tuple[SecurityPolicyRule, ...]

User-defined policy rules.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _check_disjoint_action_types
  • _check_unique_custom_policy_names
  • _check_no_allow_or_escalate_bypass

enforcement_mode pydantic-field

enforcement_mode = ACTIVE

Security enforcement mode (active/shadow/disabled)

Models

models

Security domain models.

Defines the value objects used by the SecOps service: security verdicts, evaluation contexts, audit entries, and output scan results.

ScanOutcome

Bases: StrEnum

Outcome of an output scan policy decision.

Tracks what the scanner/policy did with the output so that downstream consumers (e.g. ToolInvoker) can distinguish intentional withholding from scanner failure.

Attributes:

Name Type Description
CLEAN

No sensitive data detected (default).

REDACTED

Sensitive data found, redacted content available.

WITHHELD

Content intentionally withheld by policy.

LOG_ONLY

Findings discarded by policy, original content passed through. Always emitted with has_sensitive_data=False because the policy resets the result -- the audit log (written by SecOpsService before the policy runs) is the source of truth for what was actually detected.

EvaluationConfidence

Bases: StrEnum

Confidence level of a security evaluation.

Indicates whether the verdict came from a matched rule (high confidence) or the fallback risk classifier (low confidence). Low-confidence verdicts may trigger LLM-based re-evaluation.

Attributes:

Name Type Description
HIGH

A specific security rule matched and produced the verdict.

LOW

No rule matched; verdict came from fallback risk classification.

SecurityVerdictType

Bases: StrEnum

Security verdict constants.

Three possible outcomes of a security evaluation: the tool call is allowed, denied, or escalated for human approval.

SecurityVerdict pydantic-model

Bases: BaseModel

Result of a security evaluation.

Attributes:

Name Type Description
verdict SecurityVerdictType

One of allow, deny, escalate.

reason NotBlankStr

Human-readable explanation.

risk_level ApprovalRiskLevel

Assessed risk level for the action.

confidence EvaluationConfidence

Whether a rule matched (HIGH) or the verdict came from fallback risk classification (LOW).

matched_rules tuple[NotBlankStr, ...]

Names of rules that triggered.

evaluated_at AwareDatetime

Timestamp of evaluation.

evaluation_duration_ms float

How long the evaluation took.

approval_id NotBlankStr | None

Set only when verdict is escalate.

agent_visible_reason NotBlankStr | None

Reason string visible to the evaluated agent. When None, reason is used. Set by the LLM evaluator based on VerdictReasonVisibility config.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _check_approval_id

SecurityContext pydantic-model

Bases: BaseModel

Context passed to the security evaluator before tool execution.

Attributes:

Name Type Description
tool_name NotBlankStr

Name of the tool being invoked.

tool_category ToolCategory

Tool's category for access-level gating.

action_type str

Two-level category:action type string.

arguments dict[str, Any]

Tool call arguments for inspection.

agent_id NotBlankStr | None

ID of the agent requesting the tool.

task_id NotBlankStr | None

ID of the task being executed.

agent_provider_name NotBlankStr | None

Name of the provider the agent is currently using. Used by the LLM security evaluator for cross-family model selection.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _check_action_type_format

AuditEntry pydantic-model

Bases: BaseModel

Immutable record of a security evaluation for the audit log.

Attributes:

Name Type Description
id NotBlankStr

Unique entry identifier.

timestamp AwareDatetime

When the evaluation occurred.

agent_id NotBlankStr | None

Agent that requested the tool.

task_id NotBlankStr | None

Task being executed.

tool_name NotBlankStr

Tool that was evaluated.

tool_category ToolCategory

Tool category.

action_type str

Action type string.

arguments_hash _HEX_SHA256

SHA-256 hex digest of serialized arguments.

verdict AuditVerdictStr

One of SecurityVerdictType values (allow/deny/ escalate) for pre-tool evaluations, or 'output_scan' for post-tool output scan entries.

risk_level ApprovalRiskLevel

Assessed risk level.

reason NotBlankStr

Explanation of the verdict.

matched_rules tuple[NotBlankStr, ...]

Rules that triggered.

evaluation_duration_ms float

Duration of evaluation.

confidence EvaluationConfidence

Confidence level of the evaluation source.

approval_id NotBlankStr | None

Set when verdict is escalate.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

OutputScanResult pydantic-model

Bases: BaseModel

Result of scanning tool output for sensitive data.

Attributes:

Name Type Description
has_sensitive_data bool

Whether sensitive data was detected.

findings tuple[NotBlankStr, ...]

Descriptions of findings.

redacted_content str | None

Content with sensitive data replaced, or None.

outcome ScanOutcome

What the scanner/policy did with the output. Allows downstream consumers to distinguish intentional withholding from scanner failure.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • has_sensitive_data (bool)
  • findings (tuple[NotBlankStr, ...])
  • redacted_content (str | None)
  • outcome (ScanOutcome)

Validators:

  • _check_consistency

Service

service

SecOps service -- the security meta-agent.

Coordinates the rule engine, audit log, output scanner, output scan response policy, and approval store into a single SecurityInterceptionStrategy implementation that the ToolInvoker calls.

SecOpsService

SecOpsService(
    *,
    config,
    rule_engine,
    audit_log,
    output_scanner,
    approval_store=None,
    effective_autonomy=None,
    risk_classifier=None,
    output_scan_policy=None,
    llm_evaluator=None,
)

Implements SecurityInterceptionStrategy.

Coordinates the rule engine, audit log, output scanner, output scan response policy, and optional approval store. Enforces security policies, scans for sensitive data, and records audit entries.

On ESCALATE: creates an ApprovalItem in the ApprovalStore and returns the verdict with approval_id set.

Initialize the SecOps service.

Parameters:

Name Type Description Default
config SecurityConfig

Security configuration.

required
rule_engine RuleEngine

The synchronous rule engine.

required
audit_log AuditLog

Audit log for recording evaluations.

required
output_scanner OutputScanner

Post-tool output scanner.

required
approval_store ApprovalStore | None

Optional store for escalation items.

None
effective_autonomy EffectiveAutonomy | None

Resolved autonomy for the current run. When provided, autonomy routing is applied after the rule engine -- never bypassing security detectors.

None
risk_classifier RiskTierClassifier | None

Optional classifier for determining action risk levels in autonomy escalations. Defaults to HIGH when absent (fail-safe).

None
output_scan_policy OutputScanResponsePolicy | None

Policy applied to scan results before returning. When None, a default policy is built from config.output_scan_policy_type via the factory. Pass an explicit instance to override.

None
llm_evaluator LlmSecurityEvaluator | None

Optional LLM-based security evaluator for uncertain verdicts (EvaluationConfidence.LOW). When provided and config.llm_fallback.enabled is True, low-confidence verdicts are re-evaluated by an LLM from a different provider family.

None
Source code in src/synthorg/security/service.py
def __init__(  # noqa: PLR0913
    self,
    *,
    config: SecurityConfig,
    rule_engine: RuleEngine,
    audit_log: AuditLog,
    output_scanner: OutputScanner,
    approval_store: ApprovalStore | None = None,
    effective_autonomy: EffectiveAutonomy | None = None,
    risk_classifier: RiskTierClassifier | None = None,
    output_scan_policy: OutputScanResponsePolicy | None = None,
    llm_evaluator: LlmSecurityEvaluator | None = None,
) -> None:
    """Initialize the SecOps service.

    Args:
        config: Security configuration.
        rule_engine: The synchronous rule engine.
        audit_log: Audit log for recording evaluations.
        output_scanner: Post-tool output scanner.
        approval_store: Optional store for escalation items.
        effective_autonomy: Resolved autonomy for the current run.
            When provided, autonomy routing is applied *after*
            the rule engine -- never bypassing security detectors.
        risk_classifier: Optional classifier for determining action
            risk levels in autonomy escalations.  Defaults to HIGH
            when absent (fail-safe).
        output_scan_policy: Policy applied to scan results before
            returning.  When ``None``, a default policy is built
            from ``config.output_scan_policy_type`` via the
            factory.  Pass an explicit instance to override.
        llm_evaluator: Optional LLM-based security evaluator for
            uncertain verdicts (``EvaluationConfidence.LOW``).
            When provided and ``config.llm_fallback.enabled`` is
            ``True``, low-confidence verdicts are re-evaluated
            by an LLM from a different provider family.
    """
    self._config = config
    self._rule_engine = rule_engine
    self._audit_log = audit_log
    self._output_scanner = output_scanner
    self._approval_store = approval_store
    self._effective_autonomy = effective_autonomy
    self._risk_classifier = risk_classifier
    self._llm_evaluator = llm_evaluator
    self._output_scan_policy: OutputScanResponsePolicy = (
        output_scan_policy
        if output_scan_policy is not None
        else build_output_scan_policy(
            config.output_scan_policy_type,
            effective_autonomy=effective_autonomy,
        )
    )

evaluate_pre_tool async

evaluate_pre_tool(context)

Evaluate a tool invocation before execution.

Steps
  1. Check disabled/DISABLED enforcement mode.
  2. Run rule engine.
  3. LLM fallback for uncertain evaluations.
  4. Autonomy augmentation (tighten only).
  5. If ESCALATE, create approval item or convert to DENY.
  6. Record audit entry.
  7. Apply shadow mode conversion if applicable.
  8. Return verdict.
Source code in src/synthorg/security/service.py
async def evaluate_pre_tool(
    self,
    context: SecurityContext,
) -> SecurityVerdict:
    """Evaluate a tool invocation before execution.

    Steps:
        1. Check disabled/DISABLED enforcement mode.
        2. Run rule engine.
        3. LLM fallback for uncertain evaluations.
        4. Autonomy augmentation (tighten only).
        5. If ESCALATE, create approval item or convert to DENY.
        6. Record audit entry.
        7. Apply shadow mode conversion if applicable.
        8. Return verdict.
    """
    if (
        not self._config.enabled
        or self._config.enforcement_mode == SecurityEnforcementMode.DISABLED
    ):
        logger.warning(SECURITY_DISABLED, tool_name=context.tool_name)
        verdict = SecurityVerdict(
            verdict=SecurityVerdictType.ALLOW,
            reason="Security subsystem disabled",
            risk_level=ApprovalRiskLevel.LOW,
            evaluated_at=datetime.now(UTC),
            evaluation_duration_ms=0.0,
        )
        if self._config.audit_enabled:
            self._record_audit(context, verdict)
        return verdict

    logger.info(
        SECURITY_EVALUATE_START,
        tool_name=context.tool_name,
        action_type=context.action_type,
        agent_id=context.agent_id,
    )

    # Always run the rule engine first -- security detectors must
    # never be bypassed, regardless of autonomy configuration.
    try:
        verdict = self._rule_engine.evaluate(context)
    except MemoryError, RecursionError:
        raise
    except Exception:
        logger.exception(
            SECURITY_INTERCEPTOR_ERROR,
            tool_name=context.tool_name,
            note="Rule engine evaluation failed (fail-closed)",
        )
        verdict = SecurityVerdict(
            verdict=SecurityVerdictType.DENY,
            reason="Rule engine evaluation failed (fail-closed)",
            risk_level=ApprovalRiskLevel.CRITICAL,
            evaluated_at=datetime.now(UTC),
            evaluation_duration_ms=0.0,
        )

    # LLM fallback for uncertain evaluations (~5% of cases).
    verdict = await self._maybe_llm_fallback(context, verdict)

    # Apply autonomy augmentation *after* the rule engine (and
    # optional LLM fallback).  Autonomy can only add stricter
    # requirements (ALLOW -> ESCALATE), never weaken a DENY or
    # ESCALATE from security detectors.
    verdict = self._apply_autonomy_augmentation(context, verdict)

    # Handle escalation.
    if verdict.verdict == SecurityVerdictType.ESCALATE:
        verdict = await self._handle_escalation(context, verdict)

    # Record audit.
    if self._config.audit_enabled:
        self._record_audit(context, verdict)

    # Shadow mode: log blocking verdicts but return ALLOW.
    # Only replace non-ALLOW verdicts to preserve legitimate
    # ALLOW reasons in the audit trail.
    if (
        self._config.enforcement_mode == SecurityEnforcementMode.SHADOW
        and verdict.verdict != SecurityVerdictType.ALLOW
    ):
        logger.warning(
            SECURITY_SHADOW_WOULD_BLOCK,
            tool_name=context.tool_name,
            original_verdict=verdict.verdict.value,
            risk_level=verdict.risk_level.value,
            reason=verdict.reason,
        )
        verdict = SecurityVerdict(
            verdict=SecurityVerdictType.ALLOW,
            reason=f"Shadow mode (original: {verdict.verdict.value})",
            risk_level=verdict.risk_level,
            confidence=verdict.confidence,
            matched_rules=verdict.matched_rules,
            evaluated_at=verdict.evaluated_at,
            evaluation_duration_ms=verdict.evaluation_duration_ms,
        )

    # Log verdict.
    event = {
        SecurityVerdictType.ALLOW: SECURITY_VERDICT_ALLOW,
        SecurityVerdictType.DENY: SECURITY_VERDICT_DENY,
        SecurityVerdictType.ESCALATE: SECURITY_VERDICT_ESCALATE,
    }.get(verdict.verdict, SECURITY_EVALUATE_COMPLETE)
    logger.info(
        event,
        tool_name=context.tool_name,
        verdict=verdict.verdict.value,
        risk_level=verdict.risk_level.value,
    )

    return verdict

scan_output async

scan_output(context, output)

Scan tool output for sensitive data.

Steps
  1. Delegate to the output scanner.
  2. Record an audit entry if sensitive data is found.
  3. Apply the output scan response policy to transform the result before returning.
Source code in src/synthorg/security/service.py
async def scan_output(
    self,
    context: SecurityContext,
    output: str,
) -> OutputScanResult:
    """Scan tool output for sensitive data.

    Steps:
        1. Delegate to the output scanner.
        2. Record an audit entry if sensitive data is found.
        3. Apply the output scan response policy to transform
           the result before returning.
    """
    if not self._config.post_tool_scanning_enabled:
        logger.debug(
            SECURITY_EVALUATE_COMPLETE,
            note="output scanning disabled",
            tool_name=context.tool_name,
        )
        return OutputScanResult()

    result = self._output_scanner.scan(output)

    if result.has_sensitive_data and self._config.audit_enabled:
        entry = AuditEntry(
            id=str(uuid.uuid4()),
            timestamp=datetime.now(UTC),
            agent_id=context.agent_id,
            task_id=context.task_id,
            tool_name=context.tool_name,
            tool_category=context.tool_category,
            action_type=context.action_type,
            arguments_hash=_hash_arguments(context.arguments),
            verdict=OUTPUT_SCAN_VERDICT,
            risk_level=ApprovalRiskLevel.HIGH,
            reason=("Sensitive data in output: " + ", ".join(result.findings)),
            evaluation_duration_ms=0.0,
        )
        try:
            self._audit_log.record(entry)
        except MemoryError, RecursionError:
            raise
        except Exception:
            logger.exception(
                SECURITY_AUDIT_RECORD_ERROR,
                tool_name=context.tool_name,
                note="Output scan audit recording failed",
            )

    # Apply the output scan response policy.  On failure, fall back
    # to the raw scan result which already has scanner-level redaction
    # applied (pattern matches replaced with [REDACTED]), so the
    # fallback is still reasonably safe even if the intended policy
    # (e.g. WithholdPolicy) would have been stricter.
    policy_name = getattr(self._output_scan_policy, "name", "<unknown>")
    try:
        result = self._output_scan_policy.apply(result, context)
    except MemoryError, RecursionError:
        raise
    except Exception:
        logger.exception(
            SECURITY_INTERCEPTOR_ERROR,
            tool_name=context.tool_name,
            policy=policy_name,
            fallback_outcome=result.outcome.value,
            note="Output scan policy application failed "
            "-- returning raw scan result "
            "(may be less strict than intended policy)",
        )

    return result

Action Types

action_types

Action type taxonomy -- categories, registry, and validation.

Provides ActionTypeCategory for the 11 top-level categories and ActionTypeRegistry for validating built-in and custom action types, expanding category shortcuts, and querying the taxonomy.

ActionTypeCategory

Bases: StrEnum

Top-level action type category prefixes.

ActionTypeRegistry

ActionTypeRegistry(*, custom_types=frozenset())

Validates built-in and custom action types.

Supports category expansion (e.g. "code" → all code:* types) and registration of custom action types at runtime.

Access the full set of registered types via all_types().

Initialize with optional custom types.

Parameters:

Name Type Description Default
custom_types frozenset[str]

Additional action type strings to register.

frozenset()

Raises:

Type Description
ValueError

If any custom type lacks a category:action format.

Source code in src/synthorg/security/action_types.py
def __init__(
    self,
    *,
    custom_types: frozenset[str] = frozenset(),
) -> None:
    """Initialize with optional custom types.

    Args:
        custom_types: Additional action type strings to register.

    Raises:
        ValueError: If any custom type lacks a ``category:action`` format.
    """
    for ct in custom_types:
        if ct.count(":") != 1 or ct.startswith(":") or ct.endswith(":"):
            msg = (
                f"Custom action type {ct!r} must use "
                "'category:action' format (exactly one ':')"
            )
            logger.warning(SECURITY_ACTION_TYPE_INVALID, error=msg)
            raise ValueError(msg)
    self._custom_types = custom_types
    self._all_types = _BUILTIN_TYPES | custom_types
    logger.debug(
        SECURITY_CONFIG_LOADED,
        builtin_count=len(_BUILTIN_TYPES),
        custom_count=len(custom_types),
    )

is_registered

is_registered(action_type)

Check if an action type is known (built-in or custom).

Source code in src/synthorg/security/action_types.py
def is_registered(self, action_type: str) -> bool:
    """Check if an action type is known (built-in or custom)."""
    return action_type in self._all_types

validate

validate(action_type)

Validate that an action type is registered.

Parameters:

Name Type Description Default
action_type str

The action type string to check.

required

Raises:

Type Description
ValueError

If the action type is not registered.

Source code in src/synthorg/security/action_types.py
def validate(self, action_type: str) -> None:
    """Validate that an action type is registered.

    Args:
        action_type: The action type string to check.

    Raises:
        ValueError: If the action type is not registered.
    """
    if not self.is_registered(action_type):
        msg = f"Unknown action type: {action_type!r}"
        logger.warning(SECURITY_ACTION_TYPE_INVALID, error=msg)
        raise ValueError(msg)

expand_category

expand_category(category)

Expand a category prefix into all matching action types.

Parameters:

Name Type Description Default
category str

A category prefix (e.g. "code").

required

Returns:

Type Description
frozenset[str]

All action types under that category. Returns an empty

frozenset[str]

frozenset if the category has no built-in types (custom

frozenset[str]

types under unknown categories are included).

Source code in src/synthorg/security/action_types.py
def expand_category(self, category: str) -> frozenset[str]:
    """Expand a category prefix into all matching action types.

    Args:
        category: A category prefix (e.g. ``"code"``).

    Returns:
        All action types under that category. Returns an empty
        frozenset if the category has no built-in types (custom
        types under unknown categories are included).
    """
    builtin = _CATEGORY_MAP.get(category, frozenset())
    custom = frozenset(
        ct for ct in self._custom_types if ct.split(":")[0] == category
    )
    return builtin | custom

get_category staticmethod

get_category(action_type)

Extract the category prefix from an action type string.

Parameters:

Name Type Description Default
action_type str

A category:action string.

required

Returns:

Type Description
str

The category prefix before the first :.

Raises:

Type Description
ValueError

If the string does not contain :.

Source code in src/synthorg/security/action_types.py
@staticmethod
def get_category(action_type: str) -> str:
    """Extract the category prefix from an action type string.

    Args:
        action_type: A ``category:action`` string.

    Returns:
        The category prefix before the first ``:``.

    Raises:
        ValueError: If the string does not contain ``:``.
    """
    if action_type.count(":") != 1:
        msg = (
            f"Action type {action_type!r} must use 'category:action' "
            "format (exactly one ':')"
        )
        raise ValueError(msg)
    category, action = action_type.split(":")
    if not category or not action:
        msg = (
            f"Action type {action_type!r} must have non-empty "
            "category and action parts"
        )
        raise ValueError(msg)
    return category

all_types

all_types()

Return all registered action types (built-in + custom).

Source code in src/synthorg/security/action_types.py
def all_types(self) -> frozenset[str]:
    """Return all registered action types (built-in + custom)."""
    return self._all_types

Audit

audit

Append-only in-memory audit log for security evaluations.

AuditLog

AuditLog(*, max_entries=100000)

Append-only in-memory security audit log.

Thread-safety is not needed because the framework runs on a single event loop. When max_entries is exceeded, the oldest entries are evicted with a warning.

Future: backed by PersistenceBackend (see Memory design page).

Initialize the audit log.

Parameters:

Name Type Description Default
max_entries int

Maximum entries before oldest are evicted.

100000

Raises:

Type Description
ValueError

If max_entries < 1.

Source code in src/synthorg/security/audit.py
def __init__(self, *, max_entries: int = 100_000) -> None:
    """Initialize the audit log.

    Args:
        max_entries: Maximum entries before oldest are evicted.

    Raises:
        ValueError: If *max_entries* < 1.
    """
    if max_entries < 1:
        msg = f"max_entries must be >= 1, got {max_entries}"
        logger.warning(
            SECURITY_AUDIT_CONFIG_ERROR,
            error=msg,
        )
        raise ValueError(msg)
    self._max_entries = max_entries
    self._entries: deque[AuditEntry] = deque(maxlen=max_entries)
    self._total_recorded: int = 0

total_recorded property

total_recorded

Total entries ever recorded (including evicted).

entries property

entries

Return all entries as a tuple (oldest first).

record

record(entry)

Append an audit entry.

Parameters:

Name Type Description Default
entry AuditEntry

The audit entry to record.

required
Source code in src/synthorg/security/audit.py
def record(self, entry: AuditEntry) -> None:
    """Append an audit entry.

    Args:
        entry: The audit entry to record.
    """
    if len(self._entries) >= self._max_entries:
        logger.warning(
            SECURITY_AUDIT_EVICTION,
            max_entries=self._max_entries,
            total_recorded=self._total_recorded,
            note="oldest entry evicted to make room",
        )
    self._entries.append(entry)
    self._total_recorded += 1
    logger.debug(
        SECURITY_AUDIT_RECORDED,
        audit_id=entry.id,
        tool_name=entry.tool_name,
        verdict=entry.verdict,
    )

query

query(
    *,
    agent_id=None,
    tool_name=None,
    verdict=None,
    risk_level=None,
    since=None,
    limit=100,
)

Query audit entries with optional filters.

Filters are AND-combined. Results are returned newest-first, up to limit entries.

Parameters:

Name Type Description Default
agent_id str | None

Filter by agent ID.

None
tool_name str | None

Filter by tool name.

None
verdict str | None

Filter by verdict string.

None
risk_level ApprovalRiskLevel | None

Filter by risk level.

None
since AwareDatetime | None

Entries before this datetime are excluded.

None
limit int

Maximum results to return (must be >= 1).

100

Returns:

Type Description
tuple[AuditEntry, ...]

Tuple of matching entries, newest first.

Raises:

Type Description
ValueError

If limit < 1.

Source code in src/synthorg/security/audit.py
def query(  # noqa: PLR0913
    self,
    *,
    agent_id: str | None = None,
    tool_name: str | None = None,
    verdict: str | None = None,
    risk_level: ApprovalRiskLevel | None = None,
    since: AwareDatetime | None = None,
    limit: int = 100,
) -> tuple[AuditEntry, ...]:
    """Query audit entries with optional filters.

    Filters are AND-combined.  Results are returned newest-first,
    up to *limit* entries.

    Args:
        agent_id: Filter by agent ID.
        tool_name: Filter by tool name.
        verdict: Filter by verdict string.
        risk_level: Filter by risk level.
        since: Entries before this datetime are excluded.
        limit: Maximum results to return (must be >= 1).

    Returns:
        Tuple of matching entries, newest first.

    Raises:
        ValueError: If *limit* < 1.
    """
    if limit < 1:
        msg = f"limit must be >= 1, got {limit}"
        logger.warning(
            SECURITY_AUDIT_CONFIG_ERROR,
            error=msg,
        )
        raise ValueError(msg)
    results: list[AuditEntry] = []
    for entry in reversed(self._entries):
        if agent_id is not None and entry.agent_id != agent_id:
            continue
        if tool_name is not None and entry.tool_name != tool_name:
            continue
        if verdict is not None and entry.verdict != verdict:
            continue
        if risk_level is not None and entry.risk_level != risk_level:
            continue
        if since is not None and entry.timestamp < since:
            continue
        results.append(entry)
        if len(results) >= limit:
            break
    return tuple(results)

count

count()

Return the number of entries in the log.

Source code in src/synthorg/security/audit.py
def count(self) -> int:
    """Return the number of entries in the log."""
    return len(self._entries)

Output Scanner

output_scanner

Output scanner -- post-tool output scanning for sensitive data.

Reuses credential patterns from credential_detector and PII patterns from data_leak_detector to scan tool output for sensitive data. Always logs findings at WARNING.

OutputScanner

Scans tool output for sensitive data and optionally redacts it.

scan

scan(output)

Scan output text for sensitive patterns.

Detection runs on the original output. Redaction builds a separate redacted copy by applying substitutions in order.

Parameters:

Name Type Description Default
output str

The tool's output string.

required

Returns:

Type Description
OutputScanResult

An OutputScanResult with findings and optional

OutputScanResult

redacted content.

Source code in src/synthorg/security/output_scanner.py
def scan(self, output: str) -> OutputScanResult:
    """Scan output text for sensitive patterns.

    Detection runs on the original output.  Redaction builds
    a separate redacted copy by applying substitutions in order.

    Args:
        output: The tool's output string.

    Returns:
        An ``OutputScanResult`` with findings and optional
        redacted content.
    """
    logger.debug(
        SECURITY_OUTPUT_SCAN_START,
        output_length=len(output),
    )
    findings: list[str] = []
    redacted = output

    for pattern_name, pattern in _OUTPUT_PATTERNS:
        if pattern.search(output):
            findings.append(pattern_name)
            logger.warning(
                SECURITY_OUTPUT_SCAN_FINDING,
                finding=pattern_name,
            )
            redacted = pattern.sub(_REDACTED, redacted)

    if not findings:
        return OutputScanResult()

    return OutputScanResult(
        has_sensitive_data=True,
        findings=tuple(sorted(set(findings))),
        redacted_content=redacted,
        outcome=ScanOutcome.REDACTED,
    )

output_scan_policy

Output scan response policies.

Pluggable strategies that transform OutputScanResult after the output scanner runs. Each policy decides how to handle detected sensitive data -- redact, withhold, log-only, or delegate based on autonomy level.

OutputScanResponsePolicy

Bases: Protocol

Protocol for output scan response policies.

Implementations decide how to transform an OutputScanResult before it is returned to the invoker.

Implementations are expected to be stateless / immutable -- the AutonomyTieredPolicy stores policy instances by reference (shallow copy) and wraps the mapping as read-only.

name property

name

Policy name identifier.

apply

apply(scan_result, context)

Apply the policy to a scan result.

Parameters:

Name Type Description Default
scan_result OutputScanResult

Result from the output scanner.

required
context SecurityContext

Security context of the tool invocation.

required

Returns:

Type Description
OutputScanResult

Transformed scan result.

Source code in src/synthorg/security/output_scan_policy.py
def apply(
    self,
    scan_result: OutputScanResult,
    context: SecurityContext,
) -> OutputScanResult:
    """Apply the policy to a scan result.

    Args:
        scan_result: Result from the output scanner.
        context: Security context of the tool invocation.

    Returns:
        Transformed scan result.
    """
    ...

RedactPolicy

Return scan result as-is (redacted content preserved).

This is the default policy -- the scanner's redaction stands.

name property

name

Policy name identifier.

apply

apply(scan_result, context)

Pass through the scan result unchanged.

Parameters:

Name Type Description Default
scan_result OutputScanResult

Result from the output scanner.

required
context SecurityContext

Security context (unused).

required

Returns:

Type Description
OutputScanResult

The original scan result.

Source code in src/synthorg/security/output_scan_policy.py
def apply(
    self,
    scan_result: OutputScanResult,
    context: SecurityContext,  # noqa: ARG002
) -> OutputScanResult:
    """Pass through the scan result unchanged.

    Args:
        scan_result: Result from the output scanner.
        context: Security context (unused).

    Returns:
        The original scan result.
    """
    logger.debug(
        SECURITY_OUTPUT_SCAN_POLICY_APPLIED,
        policy="redact",
        has_sensitive_data=scan_result.has_sensitive_data,
    )
    return scan_result

WithholdPolicy

Clear redacted content when sensitive data is found.

Sets ScanOutcome.WITHHELD so the invoker returns a dedicated "withheld by policy" error -- no partial data is returned. This is distinct from the fail-closed path used for scanner errors. The findings tuple is deliberately preserved so that audit consumers can categorise what was detected without seeing the actual content.

name property

name

Policy name identifier.

apply

apply(scan_result, context)

Clear redacted content on sensitive results.

Parameters:

Name Type Description Default
scan_result OutputScanResult

Result from the output scanner.

required
context SecurityContext

Security context (unused).

required

Returns:

Type Description
OutputScanResult

Scan result with redacted_content cleared if sensitive.

Source code in src/synthorg/security/output_scan_policy.py
def apply(
    self,
    scan_result: OutputScanResult,
    context: SecurityContext,  # noqa: ARG002
) -> OutputScanResult:
    """Clear redacted content on sensitive results.

    Args:
        scan_result: Result from the output scanner.
        context: Security context (unused).

    Returns:
        Scan result with ``redacted_content`` cleared if sensitive.
    """
    logger.debug(
        SECURITY_OUTPUT_SCAN_POLICY_APPLIED,
        policy="withhold",
        has_sensitive_data=scan_result.has_sensitive_data,
    )
    if not scan_result.has_sensitive_data:
        return scan_result
    return scan_result.model_copy(
        update={"redacted_content": None, "outcome": ScanOutcome.WITHHELD},
    )

LogOnlyPolicy

Discard scan findings, returning a clean result.

The caller should treat the original tool output as unmodified. Suitable for audit-only mode or high-trust agents where output scanning is informational rather than enforced. The audit entry written by SecOpsService.scan_output before this policy runs preserves the original findings.

name property

name

Policy name identifier.

apply

apply(scan_result, context)

Return a clean OutputScanResult regardless of findings.

Suppresses enforcement while preserving the audit log entry written by SecOpsService.scan_output.

Parameters:

Name Type Description Default
scan_result OutputScanResult

Result from the output scanner.

required
context SecurityContext

Security context of the tool invocation.

required

Returns:

Type Description
OutputScanResult

Clean OutputScanResult with has_sensitive_data=False.

Source code in src/synthorg/security/output_scan_policy.py
def apply(
    self,
    scan_result: OutputScanResult,
    context: SecurityContext,
) -> OutputScanResult:
    """Return a clean ``OutputScanResult`` regardless of findings.

    Suppresses enforcement while preserving the audit log entry
    written by ``SecOpsService.scan_output``.

    Args:
        scan_result: Result from the output scanner.
        context: Security context of the tool invocation.

    Returns:
        Clean ``OutputScanResult`` with ``has_sensitive_data=False``.
    """
    if scan_result.has_sensitive_data:
        logger.warning(
            SECURITY_OUTPUT_SCAN_POLICY_APPLIED,
            policy="log_only",
            has_sensitive_data=True,
            findings=scan_result.findings,
            tool_name=context.tool_name,
            agent_id=context.agent_id,
            note="Sensitive data detected but passed through by log_only policy",
        )
        return OutputScanResult(outcome=ScanOutcome.LOG_ONLY)
    logger.debug(
        SECURITY_OUTPUT_SCAN_POLICY_APPLIED,
        policy="log_only",
        has_sensitive_data=False,
    )
    return OutputScanResult()

AutonomyTieredPolicy

AutonomyTieredPolicy(*, effective_autonomy, policy_map=None)

Delegate to sub-policies based on the effective autonomy level.

Uses a configurable mapping from AutonomyLevel to a concrete policy. Falls back to RedactPolicy when no autonomy is set or when the autonomy level has no entry in the policy map.

Initialize with autonomy and optional policy map.

Parameters:

Name Type Description Default
effective_autonomy EffectiveAutonomy | None

Resolved autonomy for the current run.

required
policy_map Mapping[AutonomyLevel, OutputScanResponsePolicy] | None

Mapping from autonomy level to policy. Uses defaults when None.

None
Source code in src/synthorg/security/output_scan_policy.py
def __init__(
    self,
    *,
    effective_autonomy: EffectiveAutonomy | None,
    policy_map: Mapping[AutonomyLevel, OutputScanResponsePolicy] | None = None,
) -> None:
    """Initialize with autonomy and optional policy map.

    Args:
        effective_autonomy: Resolved autonomy for the current run.
        policy_map: Mapping from autonomy level to policy. Uses
            defaults when ``None``.
    """
    self._effective_autonomy = effective_autonomy
    raw = policy_map if policy_map is not None else _DEFAULT_AUTONOMY_POLICY_MAP
    # Shallow copy decouples from the caller's mapping; MappingProxyType
    # prevents mutation.  Policy instances are treated as immutable /
    # stateless, so deep-copying is unnecessary and would break callers
    # passing non-copyable policies (mocks, policies with resources).
    self._policy_map: Mapping[AutonomyLevel, OutputScanResponsePolicy] = (
        MappingProxyType(dict(raw))
    )
    self._fallback: OutputScanResponsePolicy = RedactPolicy()

name property

name

Policy name identifier.

apply

apply(scan_result, context)

Delegate to the sub-policy for the current autonomy level.

Parameters:

Name Type Description Default
scan_result OutputScanResult

Result from the output scanner.

required
context SecurityContext

Security context of the tool invocation.

required

Returns:

Type Description
OutputScanResult

Transformed scan result from the delegated policy.

Source code in src/synthorg/security/output_scan_policy.py
def apply(
    self,
    scan_result: OutputScanResult,
    context: SecurityContext,
) -> OutputScanResult:
    """Delegate to the sub-policy for the current autonomy level.

    Args:
        scan_result: Result from the output scanner.
        context: Security context of the tool invocation.

    Returns:
        Transformed scan result from the delegated policy.
    """
    if self._effective_autonomy is None:
        delegate = self._fallback
        autonomy_level = None
    else:
        autonomy_level = self._effective_autonomy.level
        mapped = self._policy_map.get(autonomy_level)
        if mapped is not None:
            delegate = mapped
        else:
            delegate = self._fallback
            logger.warning(
                SECURITY_OUTPUT_SCAN_POLICY_APPLIED,
                policy="autonomy_tiered",
                autonomy_level=autonomy_level.value,
                fallback_to=self._fallback.name,
                note="No policy mapped for autonomy level -- falling back",
            )

    logger.debug(
        SECURITY_OUTPUT_SCAN_POLICY_APPLIED,
        policy="autonomy_tiered",
        delegate=delegate.name,
        autonomy_level=(
            autonomy_level.value if autonomy_level is not None else None
        ),
    )
    return delegate.apply(scan_result, context)

Rules Engine

protocol

SecurityRule protocol -- interface for synchronous rule evaluators.

SecurityRule

Bases: Protocol

Protocol for a single security rule.

Rules are evaluated synchronously in a chain. Returning a SecurityVerdict means the rule matched; returning None passes through to the next rule.

name property

name

Human-readable rule name.

evaluate

evaluate(context)

Evaluate the rule against a security context.

Parameters:

Name Type Description Default
context SecurityContext

The tool invocation context.

required

Returns:

Type Description
SecurityVerdict | None

A verdict if the rule matched, or None to pass through.

Source code in src/synthorg/security/rules/protocol.py
def evaluate(
    self,
    context: SecurityContext,
) -> SecurityVerdict | None:
    """Evaluate the rule against a security context.

    Args:
        context: The tool invocation context.

    Returns:
        A verdict if the rule matched, or ``None`` to pass through.
    """
    ...

engine

Rule engine -- evaluates security rules in order.

RuleEngine

RuleEngine(*, rules, risk_classifier, config)

Evaluates security rules in a defined order.

Rules are run sequentially. The first DENY or ESCALATE verdict wins. If no rule triggers, the engine returns ALLOW with a risk level from the RiskClassifier.

The evaluation order is determined solely by the rules tuple passed at construction. The recommended (but not enforced) order is: 1. Policy validator (fast path: hard deny / auto approve) 2. Credential detector 3. Path traversal detector 4. Destructive operation detector 5. Data leak detector

An ALLOW from the policy validator (auto-approve) does NOT short-circuit remaining detection rules. Only DENY/ESCALATE from the policy validator is a hard exit. This ensures that auto-approved action types are still scanned for credentials, path traversal, etc.

All rules are synchronous -- the engine itself is synchronous.

Initialize the rule engine.

Parameters:

Name Type Description Default
rules tuple[SecurityRule, ...]

Ordered tuple of rules to evaluate.

required
risk_classifier RiskClassifier

Fallback risk classifier.

required
config RuleEngineConfig

Rule engine configuration.

required
Source code in src/synthorg/security/rules/engine.py
def __init__(
    self,
    *,
    rules: tuple[SecurityRule, ...],
    risk_classifier: RiskClassifier,
    config: RuleEngineConfig,
) -> None:
    """Initialize the rule engine.

    Args:
        rules: Ordered tuple of rules to evaluate.
        risk_classifier: Fallback risk classifier.
        config: Rule engine configuration.
    """
    self._rules = rules
    self._risk_classifier = risk_classifier
    self._config = config

evaluate

evaluate(context)

Run all rules in order, returning the final verdict.

Individual rule failures are caught and logged. A failing rule results in DENY (fail-closed) for that rule.

Parameters:

Name Type Description Default
context SecurityContext

The tool invocation security context.

required

Returns:

Type Description
SecurityVerdict

A SecurityVerdict -- DENY/ESCALATE from the first

SecurityVerdict

matching rule, or ALLOW with risk from the classifier.

Source code in src/synthorg/security/rules/engine.py
def evaluate(self, context: SecurityContext) -> SecurityVerdict:
    """Run all rules in order, returning the final verdict.

    Individual rule failures are caught and logged.  A failing
    rule results in DENY (fail-closed) for that rule.

    Args:
        context: The tool invocation security context.

    Returns:
        A ``SecurityVerdict`` -- DENY/ESCALATE from the first
        matching rule, or ALLOW with risk from the classifier.
    """
    start = time.monotonic()
    soft_allow: SecurityVerdict | None = None

    for rule in self._rules:
        verdict = self._safe_evaluate(rule, context)
        if verdict is None:
            continue

        duration_ms = (time.monotonic() - start) * 1000

        # Soft-allow rules (e.g. policy_validator auto-approve)
        # record their verdict but do NOT short-circuit.
        if (
            verdict.verdict == SecurityVerdictType.ALLOW
            and rule.name in _SOFT_ALLOW_RULES
        ):
            soft_allow = verdict.model_copy(
                update={"evaluation_duration_ms": duration_ms},
            )
            continue

        # DENY / ESCALATE / hard ALLOW → return immediately.
        logger.debug(
            SECURITY_RULE_MATCHED,
            rule_name=rule.name,
            verdict=verdict.verdict.value,
            tool_name=context.tool_name,
        )
        return verdict.model_copy(
            update={"evaluation_duration_ms": duration_ms},
        )

    # No rule returned DENY/ESCALATE.
    duration_ms = (time.monotonic() - start) * 1000

    # If a soft-allow was recorded, use it.
    if soft_allow is not None:
        logger.debug(
            SECURITY_EVALUATE_COMPLETE,
            tool_name=context.tool_name,
            duration_ms=duration_ms,
        )
        return soft_allow.model_copy(
            update={"evaluation_duration_ms": duration_ms},
        )

    # Fallback: ALLOW with risk from classifier.
    # Low confidence -- no rule matched, only risk-classified.
    # This is the ~5% of cases where LLM fallback may trigger.
    risk = self._risk_classifier.classify(context.action_type)
    logger.debug(
        SECURITY_VERDICT_ALLOW,
        tool_name=context.tool_name,
        risk_level=risk.value,
        confidence=EvaluationConfidence.LOW.value,
    )
    logger.debug(
        SECURITY_EVALUATE_COMPLETE,
        tool_name=context.tool_name,
        duration_ms=duration_ms,
    )
    return SecurityVerdict(
        verdict=SecurityVerdictType.ALLOW,
        reason="No security rule triggered",
        risk_level=risk,
        confidence=EvaluationConfidence.LOW,
        evaluated_at=datetime.now(UTC),
        evaluation_duration_ms=duration_ms,
    )

risk_classifier

Risk classifier -- maps action types to default risk levels.

RiskClassifier

RiskClassifier(*, custom_risk_map=None)

Maps action types to default risk levels.

Used by the rule engine when no specific rule triggers, to assign a baseline risk level based on the action type.

Initialize with optional custom risk overrides.

Parameters:

Name Type Description Default
custom_risk_map dict[str, ApprovalRiskLevel] | None

Additional or overriding risk mappings.

None
Source code in src/synthorg/security/rules/risk_classifier.py
def __init__(
    self,
    *,
    custom_risk_map: dict[str, ApprovalRiskLevel] | None = None,
) -> None:
    """Initialize with optional custom risk overrides.

    Args:
        custom_risk_map: Additional or overriding risk mappings.
    """
    if custom_risk_map:
        merged = dict(_DEFAULT_RISK_MAP)
        merged.update(custom_risk_map)
        self._risk_map = MappingProxyType(merged)
    else:
        self._risk_map = _DEFAULT_RISK_MAP

classify

classify(action_type)

Return the risk level for an action type.

Falls back to HIGH for unknown action types (fail-safe per DESIGN_SPEC D19).

Parameters:

Name Type Description Default
action_type str

The category:action string.

required

Returns:

Type Description
ApprovalRiskLevel

The assessed risk level.

Source code in src/synthorg/security/rules/risk_classifier.py
def classify(self, action_type: str) -> ApprovalRiskLevel:
    """Return the risk level for an action type.

    Falls back to ``HIGH`` for unknown action types (fail-safe per
    DESIGN_SPEC D19).

    Args:
        action_type: The ``category:action`` string.

    Returns:
        The assessed risk level.
    """
    result = self._risk_map.get(action_type)
    if result is None:
        logger.warning(
            SECURITY_RISK_FALLBACK,
            action_type=action_type,
            fallback="HIGH",
        )
        return ApprovalRiskLevel.HIGH
    return result

Trust

protocol

Trust strategy protocol.

Defines the pluggable interface for progressive trust strategies. All trust strategies must implement this protocol.

TrustStrategy

Bases: Protocol

Protocol for progressive trust evaluation strategies.

Implementations compute trust evaluations from agent performance data and maintain per-agent trust state.

name property

name

Strategy name identifier.

evaluate async

evaluate(*, agent_id, current_state, snapshot)

Evaluate an agent's trust level.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
current_state TrustState

Current trust state.

required
snapshot AgentPerformanceSnapshot

Agent performance snapshot.

required

Returns:

Type Description
TrustEvaluationResult

Evaluation result with recommended level.

Source code in src/synthorg/security/trust/protocol.py
async def evaluate(
    self,
    *,
    agent_id: NotBlankStr,
    current_state: TrustState,
    snapshot: AgentPerformanceSnapshot,
) -> TrustEvaluationResult:
    """Evaluate an agent's trust level.

    Args:
        agent_id: Agent to evaluate.
        current_state: Current trust state.
        snapshot: Agent performance snapshot.

    Returns:
        Evaluation result with recommended level.
    """
    ...

initial_state

initial_state(*, agent_id)

Create the initial trust state for a newly registered agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
TrustState

Initial trust state.

Source code in src/synthorg/security/trust/protocol.py
def initial_state(self, *, agent_id: NotBlankStr) -> TrustState:
    """Create the initial trust state for a newly registered agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Initial trust state.
    """
    ...

config

Trust configuration models.

Defines TrustConfig and strategy-specific sub-configs for progressive trust evaluation.

TrustThreshold pydantic-model

Bases: BaseModel

Threshold for a trust level transition.

Attributes:

Name Type Description
score float

Minimum score to trigger promotion.

requires_human_approval bool

Whether human approval is required.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

score pydantic-field

score

Minimum score

requires_human_approval pydantic-field

requires_human_approval = False

Whether human approval is required

WeightedTrustWeights pydantic-model

Bases: BaseModel

Weights for the weighted trust score computation.

Weights must sum to 1.0.

Attributes:

Name Type Description
task_difficulty float

Weight for task difficulty factor.

completion_rate float

Weight for completion rate factor.

error_rate float

Weight for error rate factor (inverse).

human_feedback float

Weight for human feedback factor.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_weights_sum

task_difficulty pydantic-field

task_difficulty = 0.3

Weight for task difficulty

completion_rate pydantic-field

completion_rate = 0.25

Weight for completion rate

error_rate pydantic-field

error_rate = 0.25

Weight for error rate (inverse)

human_feedback pydantic-field

human_feedback = 0.2

Weight for human feedback

CategoryTrustCriteria pydantic-model

Bases: BaseModel

Promotion criteria for a single tool category.

Attributes:

Name Type Description
tasks_completed int

Minimum tasks completed in this category.

quality_score_min float

Minimum quality score.

requires_human_approval bool

Whether human approval is required.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

tasks_completed pydantic-field

tasks_completed = 10

Minimum tasks completed

quality_score_min pydantic-field

quality_score_min = 7.0

Minimum quality score

requires_human_approval pydantic-field

requires_human_approval = False

Whether human approval is required

MilestoneCriteria pydantic-model

Bases: BaseModel

Criteria for a milestone-based trust promotion.

Attributes:

Name Type Description
tasks_completed int

Minimum tasks completed.

quality_score_min float

Minimum quality score.

time_active_days int

Minimum days active.

auto_promote bool

Whether to auto-promote without human approval.

clean_history_days int

Days of clean (error-free) history required.

requires_human_approval bool

Whether human approval is required.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_approval_flags

tasks_completed pydantic-field

tasks_completed = 5

Minimum tasks completed

quality_score_min pydantic-field

quality_score_min = 7.0

Minimum quality score

time_active_days pydantic-field

time_active_days = 0

Minimum days active

auto_promote pydantic-field

auto_promote = True

Whether to auto-promote

clean_history_days pydantic-field

clean_history_days = 0

Days of clean history required

requires_human_approval pydantic-field

requires_human_approval = False

Whether human approval is required

ReVerificationConfig pydantic-model

Bases: BaseModel

Configuration for periodic trust re-verification.

Attributes:

Name Type Description
enabled bool

Whether re-verification is enabled.

interval_days int

Days between re-verifications.

decay_on_idle_days int

Demote one level after this many idle days.

decay_on_error_rate float

Demote if error rate exceeds this threshold.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

enabled pydantic-field

enabled = False

Whether re-verification is enabled

interval_days pydantic-field

interval_days = 90

Days between re-verifications

decay_on_idle_days pydantic-field

decay_on_idle_days = 30

Idle days before trust decay

decay_on_error_rate pydantic-field

decay_on_error_rate = 0.15

Error rate threshold for decay

TrustConfig pydantic-model

Bases: BaseModel

Top-level trust configuration.

Attributes:

Name Type Description
strategy TrustStrategyType

Trust strategy type.

initial_level ToolAccessLevel

Default initial trust level for new agents.

weights WeightedTrustWeights

Weights for the weighted strategy.

promotion_thresholds dict[str, TrustThreshold]

Thresholds for trust level transitions.

initial_category_levels dict[str, ToolAccessLevel]

Initial per-category levels (per_category).

category_criteria dict[str, dict[str, CategoryTrustCriteria]]

Per-category promotion criteria (per_category).

milestones dict[str, MilestoneCriteria]

Milestone criteria (used by milestone strategy).

re_verification ReVerificationConfig

Re-verification configuration (used by milestone strategy).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_elevated_gate
  • _validate_strategy_specific_fields
  • _validate_category_criteria_coverage

strategy pydantic-field

strategy = DISABLED

Trust strategy type

initial_level pydantic-field

initial_level = STANDARD

Default initial trust level

weights pydantic-field

weights

Weights for weighted strategy

promotion_thresholds pydantic-field

promotion_thresholds

Thresholds for trust level transitions

initial_category_levels pydantic-field

initial_category_levels

Initial per-category trust levels

category_criteria pydantic-field

category_criteria

Per-category promotion criteria

milestones pydantic-field

milestones

Milestone criteria for trust transitions

re_verification pydantic-field

re_verification

Re-verification configuration

models

Trust domain models.

Frozen Pydantic models for trust state, change records, and evaluation results used by the progressive trust system.

TrustState pydantic-model

Bases: BaseModel

Current trust state for an agent.

Attributes:

Name Type Description
agent_id NotBlankStr

The agent this trust state belongs to.

global_level ToolAccessLevel

Current global trust/access level.

created_at AwareDatetime | None

When trust tracking was initialized for this agent.

category_levels dict[str, ToolAccessLevel]

Per-category trust levels (per_category strategy).

trust_score float | None

Weighted trust score (weighted strategy).

last_evaluated_at AwareDatetime | None

When trust was last evaluated.

last_promoted_at AwareDatetime | None

When trust level was last promoted.

last_decay_check_at AwareDatetime | None

When decay was last checked.

milestone_progress dict[str, int | float]

Milestone tracking data (milestone strategy).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent identifier

global_level pydantic-field

global_level = SANDBOXED

Current global trust level

created_at pydantic-field

created_at = None

When trust tracking was initialized for this agent

category_levels pydantic-field

category_levels

Per-category trust levels

trust_score pydantic-field

trust_score = None

Weighted trust score

last_evaluated_at pydantic-field

last_evaluated_at = None

When trust was last evaluated

last_promoted_at pydantic-field

last_promoted_at = None

When trust level was last promoted

last_decay_check_at pydantic-field

last_decay_check_at = None

When decay was last checked

milestone_progress pydantic-field

milestone_progress

Milestone tracking data

TrustChangeRecord pydantic-model

Bases: BaseModel

Record of a trust level change for audit purposes.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent whose trust changed.

old_level ToolAccessLevel

Previous trust level.

new_level ToolAccessLevel

New trust level.

category NotBlankStr | None

Tool category (None for global changes).

reason TrustChangeReason

Reason for the change.

timestamp AwareDatetime

When the change occurred.

approval_id NotBlankStr | None

Approval item ID if human-approved.

details str

Human-readable details.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent whose trust changed

old_level pydantic-field

old_level

Previous trust level

new_level pydantic-field

new_level

New trust level

category pydantic-field

category = None

Tool category (None for global changes)

reason pydantic-field

reason

Reason for the change

timestamp pydantic-field

timestamp

When the change occurred

approval_id pydantic-field

approval_id = None

Approval item ID if human-approved

details pydantic-field

details = ''

Human-readable details

TrustEvaluationResult pydantic-model

Bases: BaseModel

Result of a trust evaluation by a strategy.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent evaluated.

recommended_level ToolAccessLevel

Recommended trust level.

current_level ToolAccessLevel

Current trust level.

requires_human_approval bool

Whether human approval is needed.

score float | None

Trust score (strategy-dependent).

details str

Human-readable explanation.

strategy_name NotBlankStr

Name of the strategy that produced this result.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent evaluated

recommended_level pydantic-field

recommended_level

Recommended trust level

current_level pydantic-field

current_level

Current trust level

requires_human_approval pydantic-field

requires_human_approval = False

Whether human approval is needed

score pydantic-field

score = None

Trust score

details pydantic-field

details = ''

Human-readable explanation

strategy_name pydantic-field

strategy_name

Strategy that produced this result

should_change property

should_change

Whether the trust level should change.

service

Trust service orchestrator.

Central service for managing progressive trust state, evaluation, and trust level changes for agents.

TrustService

TrustService(*, strategy, config, approval_store=None)

Orchestrates progressive trust evaluation and state management.

Delegates trust evaluation to a pluggable strategy, manages per-agent trust state, and enforces the security invariant that standard-to-elevated always requires human approval.

Parameters:

Name Type Description Default
strategy TrustStrategy

Trust evaluation strategy.

required
config TrustConfig

Trust configuration.

required
approval_store ApprovalStore | None

Optional approval store for human approval gates.

None
Source code in src/synthorg/security/trust/service.py
def __init__(
    self,
    *,
    strategy: TrustStrategy,
    config: TrustConfig,
    approval_store: ApprovalStore | None = None,
) -> None:
    self._strategy = strategy
    self._config = config
    self._approval_store = approval_store
    self._trust_states: dict[str, TrustState] = {}
    self._change_history: dict[str, list[TrustChangeRecord]] = {}

initialize_agent

initialize_agent(agent_id)

Create initial trust state for a new agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
TrustState

Initial trust state with created_at timestamp.

Source code in src/synthorg/security/trust/service.py
def initialize_agent(self, agent_id: NotBlankStr) -> TrustState:
    """Create initial trust state for a new agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Initial trust state with created_at timestamp.
    """
    now = datetime.now(UTC)
    state = self._strategy.initial_state(agent_id=agent_id)
    state = state.model_copy(update={"created_at": now})
    self._trust_states[str(agent_id)] = state
    self._change_history.setdefault(str(agent_id), [])

    logger.info(
        TRUST_INITIALIZED,
        agent_id=agent_id,
        level=state.global_level.value,
    )
    return state

evaluate_agent async

evaluate_agent(agent_id, snapshot)

Evaluate an agent's trust level.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
snapshot AgentPerformanceSnapshot

Agent performance snapshot.

required

Returns:

Type Description
TrustEvaluationResult

Evaluation result with recommended level.

Raises:

Type Description
TrustEvaluationError

If agent not initialized or evaluation fails.

Source code in src/synthorg/security/trust/service.py
async def evaluate_agent(
    self,
    agent_id: NotBlankStr,
    snapshot: AgentPerformanceSnapshot,
) -> TrustEvaluationResult:
    """Evaluate an agent's trust level.

    Args:
        agent_id: Agent to evaluate.
        snapshot: Agent performance snapshot.

    Returns:
        Evaluation result with recommended level.

    Raises:
        TrustEvaluationError: If agent not initialized or
            evaluation fails.
    """
    key = str(agent_id)
    state = self._trust_states.get(key)
    if state is None:
        msg = f"Agent {agent_id!r} not initialized for trust tracking"
        logger.warning(
            TRUST_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise TrustEvaluationError(msg)

    logger.debug(
        TRUST_EVALUATE_START,
        agent_id=agent_id,
        strategy=self._strategy.name,
    )

    result = await self._strategy.evaluate(
        agent_id=agent_id,
        current_state=state,
        snapshot=snapshot,
    )

    # Defense-in-depth: enforce elevated gate
    result = self._enforce_elevated_gate(result)

    # Update last_evaluated_at
    now = datetime.now(UTC)
    updated_state = state.model_copy(
        update={"last_evaluated_at": now},
    )
    self._trust_states[key] = updated_state

    logger.debug(
        TRUST_EVALUATE_COMPLETE,
        agent_id=agent_id,
        recommended=result.recommended_level.value,
        should_change=result.should_change,
    )
    return result

apply_trust_change async

apply_trust_change(agent_id, result)

Apply a trust level change based on evaluation result.

If human approval is required, creates an approval item and returns None. The change is applied when the approval is granted.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent whose trust to change.

required
result TrustEvaluationResult

Evaluation result to apply.

required

Returns:

Type Description
TrustChangeRecord | None

Change record if applied, None if awaiting approval.

Raises:

Type Description
TrustEvaluationError

If agent not initialized.

Source code in src/synthorg/security/trust/service.py
async def apply_trust_change(
    self,
    agent_id: NotBlankStr,
    result: TrustEvaluationResult,
) -> TrustChangeRecord | None:
    """Apply a trust level change based on evaluation result.

    If human approval is required, creates an approval item and
    returns None. The change is applied when the approval is granted.

    Args:
        agent_id: Agent whose trust to change.
        result: Evaluation result to apply.

    Returns:
        Change record if applied, None if awaiting approval.

    Raises:
        TrustEvaluationError: If agent not initialized.
    """
    if not result.should_change:
        return None

    key = str(agent_id)
    state = self._trust_states.get(key)
    if state is None:
        msg = f"Agent {agent_id!r} not initialized for trust tracking"
        logger.warning(
            TRUST_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise TrustEvaluationError(msg)

    # Defense-in-depth: re-enforce elevated gate on the result
    # to prevent crafted TrustEvaluationResults from bypassing
    # the mandatory human approval gate.
    result = self._enforce_elevated_gate(result)

    if result.requires_human_approval:
        await self._create_approval(agent_id, result)
        return None

    # Apply the change
    now = datetime.now(UTC)
    reason = self._infer_reason(result)

    record = TrustChangeRecord(
        agent_id=agent_id,
        old_level=state.global_level,
        new_level=result.recommended_level,
        reason=reason,
        timestamp=now,
        details=result.details,
    )

    # Update state -- only set last_promoted_at on actual promotions
    from synthorg.security.trust.levels import (  # noqa: PLC0415
        TRUST_LEVEL_RANK,
    )

    is_promotion = TRUST_LEVEL_RANK.get(
        result.recommended_level, 0
    ) > TRUST_LEVEL_RANK.get(state.global_level, 0)
    state_update: dict[str, object] = {
        "global_level": result.recommended_level,
        "trust_score": result.score,
    }
    if is_promotion:
        state_update["last_promoted_at"] = now
    updated = state.model_copy(update=state_update)
    self._trust_states[key] = updated
    self._change_history.setdefault(key, []).append(record)

    logger.info(
        TRUST_LEVEL_CHANGED,
        agent_id=agent_id,
        old_level=record.old_level.value,
        new_level=record.new_level.value,
        reason=reason.value,
    )
    return record

get_trust_state

get_trust_state(agent_id)

Get current trust state for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
TrustState | None

Trust state, or None if not initialized.

Source code in src/synthorg/security/trust/service.py
def get_trust_state(
    self,
    agent_id: NotBlankStr,
) -> TrustState | None:
    """Get current trust state for an agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Trust state, or None if not initialized.
    """
    return self._trust_states.get(str(agent_id))

get_change_history

get_change_history(agent_id)

Get trust change history for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
tuple[TrustChangeRecord, ...]

Tuple of change records.

Source code in src/synthorg/security/trust/service.py
def get_change_history(
    self,
    agent_id: NotBlankStr,
) -> tuple[TrustChangeRecord, ...]:
    """Get trust change history for an agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Tuple of change records.
    """
    return tuple(self._change_history.get(str(agent_id), []))

check_decay async

check_decay(agent_id, snapshot)

Check for trust decay conditions.

Delegates to evaluate_agent first, then updates the decay check timestamp. The ordering ensures that the strategy's decay logic sees the previous last_decay_check_at value, not a freshly-updated one.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to check.

required
snapshot AgentPerformanceSnapshot

Agent performance snapshot.

required

Returns:

Type Description
TrustEvaluationResult

Evaluation result (may recommend demotion on decay).

Source code in src/synthorg/security/trust/service.py
async def check_decay(
    self,
    agent_id: NotBlankStr,
    snapshot: AgentPerformanceSnapshot,
) -> TrustEvaluationResult:
    """Check for trust decay conditions.

    Delegates to evaluate_agent first, then updates the decay
    check timestamp.  The ordering ensures that the strategy's
    decay logic sees the *previous* last_decay_check_at value,
    not a freshly-updated one.

    Args:
        agent_id: Agent to check.
        snapshot: Agent performance snapshot.

    Returns:
        Evaluation result (may recommend demotion on decay).
    """
    result = await self.evaluate_agent(agent_id, snapshot)

    # Update decay check timestamp *after* evaluation
    key = str(agent_id)
    state = self._trust_states.get(key)
    if state is not None:
        now = datetime.now(UTC)
        updated = state.model_copy(
            update={"last_decay_check_at": now},
        )
        self._trust_states[key] = updated

    return result

Autonomy

protocol

Autonomy change strategy protocol (see Operations design page, D7).

AutonomyChangeStrategy

Bases: Protocol

Strategy for managing runtime autonomy level changes.

Implementations control how promotion requests, automatic downgrades, and recovery requests are handled.

request_promotion

request_promotion(agent_id, target)

Request a promotion to a higher autonomy level.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent requesting promotion.

required
target AutonomyLevel

The desired autonomy level.

required

Returns:

Type Description
bool

True if the promotion is immediately granted,

bool

False if it requires human approval.

Source code in src/synthorg/security/autonomy/protocol.py
def request_promotion(
    self,
    agent_id: NotBlankStr,
    target: AutonomyLevel,
) -> bool:
    """Request a promotion to a higher autonomy level.

    Args:
        agent_id: The agent requesting promotion.
        target: The desired autonomy level.

    Returns:
        ``True`` if the promotion is immediately granted,
        ``False`` if it requires human approval.
    """
    ...

auto_downgrade

auto_downgrade(agent_id, reason, current_level=None)

Automatically downgrade an agent's autonomy level.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent to downgrade.

required
reason DowngradeReason

Why the downgrade is happening.

required
current_level AutonomyLevel | None

The agent's current effective autonomy level. Used as original_level when no prior override exists.

None

Returns:

Type Description
AutonomyLevel

The new (lower) autonomy level.

Source code in src/synthorg/security/autonomy/protocol.py
def auto_downgrade(
    self,
    agent_id: NotBlankStr,
    reason: DowngradeReason,
    current_level: AutonomyLevel | None = None,
) -> AutonomyLevel:
    """Automatically downgrade an agent's autonomy level.

    Args:
        agent_id: The agent to downgrade.
        reason: Why the downgrade is happening.
        current_level: The agent's current effective autonomy level.
            Used as ``original_level`` when no prior override exists.

    Returns:
        The new (lower) autonomy level.
    """
    ...

request_recovery

request_recovery(agent_id)

Request recovery from a previous downgrade.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent requesting recovery.

required

Returns:

Type Description
bool

True if recovery is immediately granted,

bool

False if it requires human approval.

Source code in src/synthorg/security/autonomy/protocol.py
def request_recovery(
    self,
    agent_id: NotBlankStr,
) -> bool:
    """Request recovery from a previous downgrade.

    Args:
        agent_id: The agent requesting recovery.

    Returns:
        ``True`` if recovery is immediately granted,
        ``False`` if it requires human approval.
    """
    ...

models

Autonomy data models -- presets, config, effective resolution, overrides.

AutonomyPreset pydantic-model

Bases: BaseModel

A named autonomy preset defining action routing rules.

Actions listed in auto_approve are executed without human review. Actions in human_approval require a human decision. The two sets must be disjoint -- an action cannot be both auto-approved and human-approval.

Attributes:

Name Type Description
level AutonomyLevel

The autonomy level this preset represents.

description NotBlankStr

Human-readable description.

auto_approve tuple[NotBlankStr, ...]

Action type patterns that are auto-approved. The special value "all" means every action type. Category shortcuts (e.g. "code") are expanded via :class:~synthorg.security.action_types.ActionTypeRegistry.

human_approval tuple[NotBlankStr, ...]

Action type patterns requiring human approval. Same expansion rules as auto_approve.

security_agent bool

Whether a security agent reviews escalated actions before they reach a human.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_disjoint

level pydantic-field

level

Autonomy level

description pydantic-field

description

Human-readable description

auto_approve pydantic-field

auto_approve = ()

Action patterns that are auto-approved

human_approval pydantic-field

human_approval = ()

Action patterns requiring human approval

security_agent pydantic-field

security_agent = True

Whether security agent reviews escalations

AutonomyConfig pydantic-model

Bases: BaseModel

Company-level autonomy configuration.

Attributes:

Name Type Description
level AutonomyLevel

Default autonomy level for the company.

presets dict[str, AutonomyPreset]

Available autonomy presets keyed by level name. Defaults to BUILTIN_PRESETS.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_level_in_presets

level pydantic-field

level = SEMI

Default company autonomy level

presets pydantic-field

presets

Available autonomy presets

EffectiveAutonomy pydantic-model

Bases: BaseModel

Resolved, expanded autonomy for an agent's execution run.

Produced by :class:~synthorg.security.autonomy.resolver.AutonomyResolver by resolving the three-level chain (agent → department → company) and expanding category shortcuts into concrete action types.

Attributes:

Name Type Description
level AutonomyLevel

Resolved autonomy level.

auto_approve_actions frozenset[str]

Concrete action types that are auto-approved.

human_approval_actions frozenset[str]

Concrete action types requiring human approval.

security_agent bool

Whether the security agent reviews escalations.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_disjoint

level pydantic-field

level

Resolved autonomy level

auto_approve_actions pydantic-field

auto_approve_actions

Expanded auto-approve action types

human_approval_actions pydantic-field

human_approval_actions

Expanded human-approval action types

security_agent pydantic-field

security_agent

Whether security agent reviews escalations

AutonomyOverride pydantic-model

Bases: BaseModel

Record of a runtime autonomy downgrade for an agent.

Attributes:

Name Type Description
agent_id NotBlankStr

The agent whose autonomy was changed.

original_level AutonomyLevel

Level before the downgrade.

current_level AutonomyLevel

Level after the downgrade.

reason DowngradeReason

Why the downgrade occurred.

downgraded_at AwareDatetime

When the downgrade happened.

requires_human_recovery bool

Whether a human must restore the level.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_downgrade

agent_id pydantic-field

agent_id

Agent identifier

original_level pydantic-field

original_level

Level before downgrade

current_level pydantic-field

current_level

Level after downgrade

reason pydantic-field

reason

Reason for downgrade

downgraded_at pydantic-field

downgraded_at

Timestamp of downgrade

requires_human_recovery pydantic-field

requires_human_recovery = True

Whether human approval is needed to restore level

resolver

Autonomy resolver -- three-level chain and category expansion.

AutonomyResolver

AutonomyResolver(*, registry, config)

Resolves effective autonomy via a three-level chain.

Resolution order (most specific wins): 1. Agent-level override 2. Department-level override 3. Company-level default

After resolution, category shortcuts (e.g. "code") are expanded into concrete action types via the ActionTypeRegistry, and the "all" shortcut is expanded to every registered action type.

Initialize the resolver.

Parameters:

Name Type Description Default
registry ActionTypeRegistry

Action type registry for category expansion.

required
config AutonomyConfig

Company-level autonomy configuration with presets.

required
Source code in src/synthorg/security/autonomy/resolver.py
def __init__(
    self,
    *,
    registry: ActionTypeRegistry,
    config: AutonomyConfig,
) -> None:
    """Initialize the resolver.

    Args:
        registry: Action type registry for category expansion.
        config: Company-level autonomy configuration with presets.
    """
    self._registry = registry
    self._config = config

resolve

resolve(agent_level=None, department_level=None, seniority=None)

Resolve effective autonomy from the three-level chain.

When seniority is provided, the JUNIOR/FULL constraint (D6) is enforced automatically.

Parameters:

Name Type Description Default
agent_level AutonomyLevel | None

Per-agent override (highest priority).

None
department_level AutonomyLevel | None

Per-department override.

None
seniority SeniorityLevel | None

Agent seniority level for constraint checks.

None

Returns:

Type Description
EffectiveAutonomy

Fully expanded :class:EffectiveAutonomy.

Raises:

Type Description
ValueError

If the resolved level has no matching preset or seniority constraints are violated.

Source code in src/synthorg/security/autonomy/resolver.py
def resolve(
    self,
    agent_level: AutonomyLevel | None = None,
    department_level: AutonomyLevel | None = None,
    seniority: SeniorityLevel | None = None,
) -> EffectiveAutonomy:
    """Resolve effective autonomy from the three-level chain.

    When ``seniority`` is provided, the JUNIOR/FULL constraint
    (D6) is enforced automatically.

    Args:
        agent_level: Per-agent override (highest priority).
        department_level: Per-department override.
        seniority: Agent seniority level for constraint checks.

    Returns:
        Fully expanded :class:`EffectiveAutonomy`.

    Raises:
        ValueError: If the resolved level has no matching preset
            or seniority constraints are violated.
    """
    level = agent_level or department_level or self._config.level

    if seniority is not None:
        self.validate_seniority(seniority, level)

    preset = self._config.presets.get(level)
    if preset is None:
        msg = (
            f"No preset found for autonomy level {level!r} "
            f"(available: {sorted(self._config.presets)})"
        )
        logger.warning(
            AUTONOMY_RESOLVED,
            resolved_level=level.value if hasattr(level, "value") else str(level),
            error=msg,
        )
        raise ValueError(msg)

    auto_approve = self._expand_patterns(preset.auto_approve)
    human_approval = self._expand_patterns(preset.human_approval)

    result = EffectiveAutonomy(
        level=level,
        auto_approve_actions=auto_approve,
        human_approval_actions=human_approval,
        security_agent=preset.security_agent,
    )

    logger.info(
        AUTONOMY_RESOLVED,
        resolved_level=level.value,
        agent_override=agent_level.value if agent_level else None,
        department_override=department_level.value if department_level else None,
        auto_approve_count=len(auto_approve),
        human_approval_count=len(human_approval),
    )
    return result

validate_seniority

validate_seniority(seniority, autonomy)

Reject JUNIOR agents with FULL autonomy (D6).

Parameters:

Name Type Description Default
seniority SeniorityLevel

The agent's seniority level.

required
autonomy AutonomyLevel

The requested autonomy level.

required

Raises:

Type Description
ValueError

If a JUNIOR agent requests FULL autonomy.

Source code in src/synthorg/security/autonomy/resolver.py
def validate_seniority(
    self,
    seniority: SeniorityLevel,
    autonomy: AutonomyLevel,
) -> None:
    """Reject JUNIOR agents with FULL autonomy (D6).

    Args:
        seniority: The agent's seniority level.
        autonomy: The requested autonomy level.

    Raises:
        ValueError: If a JUNIOR agent requests FULL autonomy.
    """
    if (
        compare_seniority(seniority, SeniorityLevel.JUNIOR) <= 0
        and autonomy == AutonomyLevel.FULL
    ):
        logger.warning(
            AUTONOMY_SENIORITY_VIOLATION,
            seniority=seniority.value,
            autonomy=autonomy.value,
        )
        msg = (
            f"Seniority level {seniority.value!r} cannot have "
            f"FULL autonomy -- maximum is {_JUNIOR_MAX_AUTONOMY.value!r}"
        )
        raise ValueError(msg)

Timeout Policies

protocol

Timeout policy and risk tier classifier protocols.

TimeoutPolicy

Bases: Protocol

Protocol for approval timeout policies (see Operations design page).

Implementations determine what happens when a human does not respond to an approval request within a configured timeframe.

determine_action async

determine_action(item, elapsed_seconds)

Determine the timeout action for a pending approval.

Parameters:

Name Type Description Default
item ApprovalItem

The pending approval item.

required
elapsed_seconds float

Seconds since the item was created.

required

Returns:

Type Description
TimeoutAction

The action to take (wait, approve, deny, or escalate).

Source code in src/synthorg/security/timeout/protocol.py
async def determine_action(
    self,
    item: ApprovalItem,
    elapsed_seconds: float,
) -> TimeoutAction:
    """Determine the timeout action for a pending approval.

    Args:
        item: The pending approval item.
        elapsed_seconds: Seconds since the item was created.

    Returns:
        The action to take (wait, approve, deny, or escalate).
    """
    ...

RiskTierClassifier

Bases: Protocol

Classifies action types into risk tiers for tiered timeouts.

classify

classify(action_type)

Classify an action type's risk level.

Parameters:

Name Type Description Default
action_type str

The category:action string.

required

Returns:

Type Description
ApprovalRiskLevel

The risk tier for timeout policy selection.

Source code in src/synthorg/security/timeout/protocol.py
def classify(self, action_type: str) -> ApprovalRiskLevel:
    """Classify an action type's risk level.

    Args:
        action_type: The ``category:action`` string.

    Returns:
        The risk tier for timeout policy selection.
    """
    ...

config

Timeout policy configuration models -- discriminated union of 4 policies.

ApprovalTimeoutConfig module-attribute

ApprovalTimeoutConfig = Annotated[
    Annotated[WaitForeverConfig, Tag("wait")]
    | Annotated[DenyOnTimeoutConfig, Tag("deny")]
    | Annotated[TieredTimeoutConfig, Tag("tiered")]
    | Annotated[EscalationChainConfig, Tag("escalation")],
    Discriminator(_timeout_discriminator),
]

Discriminated union of the four timeout policy configurations.

WaitForeverConfig pydantic-model

Bases: BaseModel

Wait indefinitely for human approval -- the default.

Attributes:

Name Type Description
policy Literal['wait']

Discriminator tag.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • policy (Literal['wait'])

DenyOnTimeoutConfig pydantic-model

Bases: BaseModel

Deny the action after a fixed timeout.

Attributes:

Name Type Description
policy Literal['deny']

Discriminator tag.

timeout_minutes float

Minutes before auto-deny.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

timeout_minutes pydantic-field

timeout_minutes = 240.0

Minutes before auto-deny

TierConfig pydantic-model

Bases: BaseModel

Per-risk-tier timeout configuration.

Attributes:

Name Type Description
timeout_minutes float

Minutes before the on_timeout action.

on_timeout TimeoutActionType

What to do when the tier times out.

actions tuple[str, ...]

Optional set of specific action types in this tier (if empty, the tier is matched by risk level).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_no_escalate

timeout_minutes pydantic-field

timeout_minutes

Minutes before the timeout action

on_timeout pydantic-field

on_timeout = DENY

Action when this tier times out

actions pydantic-field

actions = ()

Specific action types in this tier

TieredTimeoutConfig pydantic-model

Bases: BaseModel

Per-risk-tier timeout policy.

Each tier defines its own timeout and action. Unknown action types are classified as HIGH risk by the RiskTierClassifier (D19). If a risk level has no tier configuration entry, the policy defaults to WAIT (safe fallback).

Attributes:

Name Type Description
policy Literal['tiered']

Discriminator tag.

tiers dict[str, TierConfig]

Tier configurations keyed by risk level name.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_tier_keys

tiers pydantic-field

tiers

Tier configs keyed by risk level (low/medium/high/critical)

EscalationStep pydantic-model

Bases: BaseModel

A single step in an escalation chain.

Attributes:

Name Type Description
role NotBlankStr

The role to escalate to at this step.

timeout_minutes float

Minutes to wait at this step before moving to the next.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

role pydantic-field

role

Escalation target role

timeout_minutes pydantic-field

timeout_minutes

Minutes to wait at this escalation step

EscalationChainConfig pydantic-model

Bases: BaseModel

Escalation chain timeout policy.

Approval is escalated through a chain of roles, each with its own timeout. If the entire chain is exhausted, the on_chain_exhausted action is taken.

Attributes:

Name Type Description
policy Literal['escalation']

Discriminator tag.

chain tuple[EscalationStep, ...]

Ordered escalation steps.

on_chain_exhausted TimeoutActionType

Action when all steps exhaust.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_chain

chain pydantic-field

chain = ()

Ordered escalation steps

on_chain_exhausted pydantic-field

on_chain_exhausted = DENY

Action when the entire chain is exhausted

models

Timeout action model -- the result of evaluating a timeout policy.

TimeoutAction pydantic-model

Bases: BaseModel

Action to take when an approval item times out.

Attributes:

Name Type Description
action TimeoutActionType

The timeout action type (wait, approve, deny, escalate).

reason NotBlankStr

Human-readable explanation for the action.

escalate_to NotBlankStr | None

Target role/agent for escalation (only when action is ESCALATE).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_escalate_to

action pydantic-field

action

Timeout action type

reason pydantic-field

reason

Explanation for the action

escalate_to pydantic-field

escalate_to = None

Escalation target (when action is ESCALATE)

policies

Timeout policy implementations -- wait, deny, tiered, escalation chain.

WaitForeverPolicy

Always returns WAIT -- no automatic timeout action.

This is the safest default: approvals remain pending until a human responds.

determine_action async

determine_action(item, elapsed_seconds)

Always wait.

Parameters:

Name Type Description Default
item ApprovalItem

The pending approval item.

required
elapsed_seconds float

Seconds since creation.

required

Returns:

Type Description
TimeoutAction

WAIT action.

Source code in src/synthorg/security/timeout/policies.py
async def determine_action(
    self,
    item: ApprovalItem,
    elapsed_seconds: float,
) -> TimeoutAction:
    """Always wait.

    Args:
        item: The pending approval item.
        elapsed_seconds: Seconds since creation.

    Returns:
        WAIT action.
    """
    logger.debug(
        TIMEOUT_WAITING,
        approval_id=item.id,
        elapsed_seconds=elapsed_seconds,
    )
    return TimeoutAction(
        action=TimeoutActionType.WAIT,
        reason="Wait-forever policy -- no automatic action",
    )

DenyOnTimeoutPolicy

DenyOnTimeoutPolicy(*, timeout_seconds)

Deny the action after a fixed timeout.

Parameters:

Name Type Description Default
timeout_seconds float

Seconds before auto-deny.

required
Source code in src/synthorg/security/timeout/policies.py
def __init__(self, *, timeout_seconds: float) -> None:
    if timeout_seconds <= 0:
        msg = f"timeout_seconds must be positive, got {timeout_seconds}"
        raise ValueError(msg)
    self._timeout_seconds = timeout_seconds

determine_action async

determine_action(item, elapsed_seconds)

WAIT if under timeout, DENY if over.

Parameters:

Name Type Description Default
item ApprovalItem

The pending approval item.

required
elapsed_seconds float

Seconds since creation.

required

Returns:

Type Description
TimeoutAction

WAIT or DENY action.

Source code in src/synthorg/security/timeout/policies.py
async def determine_action(
    self,
    item: ApprovalItem,
    elapsed_seconds: float,
) -> TimeoutAction:
    """WAIT if under timeout, DENY if over.

    Args:
        item: The pending approval item.
        elapsed_seconds: Seconds since creation.

    Returns:
        WAIT or DENY action.
    """
    if elapsed_seconds < self._timeout_seconds:
        logger.debug(
            TIMEOUT_WAITING,
            approval_id=item.id,
            elapsed_seconds=elapsed_seconds,
            timeout_seconds=self._timeout_seconds,
        )
        return TimeoutAction(
            action=TimeoutActionType.WAIT,
            reason=(
                f"Waiting -- {elapsed_seconds:.0f}s of "
                f"{self._timeout_seconds:.0f}s elapsed"
            ),
        )

    logger.info(
        TIMEOUT_AUTO_DENIED,
        approval_id=item.id,
        elapsed_seconds=elapsed_seconds,
        timeout_seconds=self._timeout_seconds,
    )
    return TimeoutAction(
        action=TimeoutActionType.DENY,
        reason=(
            f"Auto-denied after {elapsed_seconds:.0f}s "
            f"(timeout: {self._timeout_seconds:.0f}s)"
        ),
    )

TieredTimeoutPolicy

TieredTimeoutPolicy(*, tiers, classifier)

Per-risk-tier timeout with configurable actions.

Uses a :class:RiskTierClassifier to determine the risk tier of each approval item, then applies the corresponding tier configuration.

Parameters:

Name Type Description Default
tiers dict[str, TierConfig]

Tier configurations keyed by risk level name.

required
classifier RiskTierClassifier

Risk tier classifier for action types.

required
Source code in src/synthorg/security/timeout/policies.py
def __init__(
    self,
    *,
    tiers: dict[str, TierConfig],
    classifier: RiskTierClassifier,
) -> None:
    self._tiers = tiers
    self._classifier = classifier

determine_action async

determine_action(item, elapsed_seconds)

Apply the tier-specific timeout policy.

Parameters:

Name Type Description Default
item ApprovalItem

The pending approval item.

required
elapsed_seconds float

Seconds since creation.

required

Returns:

Type Description
TimeoutAction

WAIT, DENY, APPROVE, or ESCALATE based on tier config.

Source code in src/synthorg/security/timeout/policies.py
async def determine_action(
    self,
    item: ApprovalItem,
    elapsed_seconds: float,
) -> TimeoutAction:
    """Apply the tier-specific timeout policy.

    Args:
        item: The pending approval item.
        elapsed_seconds: Seconds since creation.

    Returns:
        WAIT, DENY, APPROVE, or ESCALATE based on tier config.
    """
    # Default: classify by risk level, then check explicit tier overrides.
    risk_level = self._classifier.classify(item.action_type)
    tier_config = None
    for tier_key, cfg in self._tiers.items():
        if cfg.actions and item.action_type in cfg.actions:
            tier_config = cfg
            risk_level = ApprovalRiskLevel(tier_key)
            break

    # Fall back to risk-level-based tier lookup.
    if tier_config is None:
        tier_config = self._tiers.get(risk_level.value)

    if tier_config is None:
        # No tier configured for this risk level -- wait (safe default).
        logger.warning(
            TIMEOUT_WAITING,
            approval_id=item.id,
            risk_level=risk_level.value,
            available_tiers=sorted(self._tiers.keys()),
            note="no tier config for this risk level -- defaulting to wait",
        )
        return TimeoutAction(
            action=TimeoutActionType.WAIT,
            reason=(
                f"No tier config for risk level {risk_level.value!r} -- waiting"
            ),
        )

    timeout_seconds = tier_config.timeout_minutes * _SECONDS_PER_MINUTE

    if elapsed_seconds < timeout_seconds:
        logger.debug(
            TIMEOUT_WAITING,
            approval_id=item.id,
            risk_level=risk_level.value,
            elapsed_seconds=elapsed_seconds,
            timeout_seconds=timeout_seconds,
        )
        return TimeoutAction(
            action=TimeoutActionType.WAIT,
            reason=(
                f"Tier {risk_level.value}: {elapsed_seconds:.0f}s of "
                f"{timeout_seconds:.0f}s elapsed"
            ),
        )

    effective_action = tier_config.on_timeout

    # Guard: never auto-approve HIGH or CRITICAL actions.
    _approve_forbidden = {ApprovalRiskLevel.HIGH, ApprovalRiskLevel.CRITICAL}
    if (
        effective_action == TimeoutActionType.APPROVE
        and risk_level in _approve_forbidden
    ):
        logger.warning(
            TIMEOUT_POLICY_EVALUATED,
            approval_id=item.id,
            risk_level=risk_level.value,
            configured_action=effective_action.value,
            note=(
                "auto-approve blocked for high/critical risk -- overriding to DENY"
            ),
        )
        effective_action = TimeoutActionType.DENY

    logger.info(
        TIMEOUT_POLICY_EVALUATED,
        approval_id=item.id,
        risk_level=risk_level.value,
        on_timeout=effective_action.value,
        elapsed_seconds=elapsed_seconds,
    )
    return TimeoutAction(
        action=effective_action,
        reason=(
            f"Tier {risk_level.value} timeout: auto-"
            f"{effective_action.value} after "
            f"{elapsed_seconds:.0f}s"
        ),
    )

EscalationChainPolicy

EscalationChainPolicy(*, chain, on_chain_exhausted)

Escalate through a chain of roles, each with its own timeout.

When the entire chain is exhausted, applies the on_chain_exhausted action.

Parameters:

Name Type Description Default
chain tuple[EscalationStep, ...]

Ordered escalation steps.

required
on_chain_exhausted TimeoutActionType

Action when all steps exhaust.

required
Source code in src/synthorg/security/timeout/policies.py
def __init__(
    self,
    *,
    chain: tuple[EscalationStep, ...],
    on_chain_exhausted: TimeoutActionType,
) -> None:
    self._chain = chain
    self._on_chain_exhausted = on_chain_exhausted

determine_action async

determine_action(item, elapsed_seconds)

Determine the current escalation step.

Calculates cumulative timeouts to find which step the approval is currently at.

Parameters:

Name Type Description Default
item ApprovalItem

The pending approval item.

required
elapsed_seconds float

Seconds since creation.

required

Returns:

Type Description
TimeoutAction

ESCALATE (to the current step's role) or the

TimeoutAction

chain-exhausted action.

Source code in src/synthorg/security/timeout/policies.py
async def determine_action(
    self,
    item: ApprovalItem,
    elapsed_seconds: float,
) -> TimeoutAction:
    """Determine the current escalation step.

    Calculates cumulative timeouts to find which step the
    approval is currently at.

    Args:
        item: The pending approval item.
        elapsed_seconds: Seconds since creation.

    Returns:
        ESCALATE (to the current step's role) or the
        chain-exhausted action.
    """
    if not self._chain:
        logger.warning(
            TIMEOUT_ESCALATED,
            approval_id=item.id,
            on_exhausted=self._on_chain_exhausted.value,
            note="empty escalation chain -- likely a configuration error",
        )
        return TimeoutAction(
            action=self._on_chain_exhausted,
            reason="Empty escalation chain -- applying exhausted action",
        )

    cumulative_seconds = 0.0
    for idx, step in enumerate(self._chain):
        step_timeout = step.timeout_minutes * _SECONDS_PER_MINUTE
        step_end = cumulative_seconds + step_timeout
        if elapsed_seconds < step_end:
            if idx == 0:
                # First step hasn't timed out yet -- WAIT.
                logger.debug(
                    TIMEOUT_WAITING,
                    approval_id=item.id,
                    escalation_role=step.role,
                    elapsed_seconds=elapsed_seconds,
                )
                return TimeoutAction(
                    action=TimeoutActionType.WAIT,
                    reason=(
                        f"Waiting at {step.role!r} -- "
                        f"{elapsed_seconds:.0f}s of "
                        f"{step_end:.0f}s elapsed"
                    ),
                )
            # Previous step timed out -- escalate to this step's role.
            logger.info(
                TIMEOUT_ESCALATED,
                approval_id=item.id,
                escalation_role=step.role,
                elapsed_seconds=elapsed_seconds,
            )
            return TimeoutAction(
                action=TimeoutActionType.ESCALATE,
                reason=(
                    f"Escalated to {step.role!r} -- {elapsed_seconds:.0f}s elapsed"
                ),
                escalate_to=step.role,
            )
        cumulative_seconds += step_timeout

    # All steps exhausted.
    logger.info(
        TIMEOUT_ESCALATED,
        approval_id=item.id,
        elapsed_seconds=elapsed_seconds,
        on_exhausted=self._on_chain_exhausted.value,
        note="escalation chain exhausted",
    )
    return TimeoutAction(
        action=self._on_chain_exhausted,
        reason=(
            f"Escalation chain exhausted after {elapsed_seconds:.0f}s "
            f"-- {self._on_chain_exhausted.value}"
        ),
    )