Skip to content

Tools

Tool system -- base class, registry, invoker, built-in tools, and MCP bridge.

Base Tool

base

Base tool abstraction and execution result model.

Defines the BaseTool ABC that all concrete tools extend, and the ToolExecutionResult value object returned by tool execution.

ToolExecutionResult pydantic-model

Bases: BaseModel

Result of executing a tool's business logic.

This is the internal result type returned by BaseTool.execute. The invoker converts it into a ToolResult for the LLM, carrying only content and is_error -- metadata is not forwarded to the LLM and is available only for programmatic consumers.

Note

The metadata dict is shallowly frozen by Pydantic's frozen=True. Tool implementations construct and return this model, but the invoker converts it into a provider-facing ToolResult -- metadata is not forwarded to LLM providers or other external boundaries, so no additional boundary copy is needed at this layer.

Attributes:

Name Type Description
content str

Tool output as a string.

is_error bool

Whether the execution failed.

metadata dict[str, Any]

Optional structured data for programmatic consumers.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

content pydantic-field

content

Tool output

is_error pydantic-field

is_error = False

Whether tool errored

metadata pydantic-field

metadata

Optional structured metadata

BaseTool

BaseTool(*, name, description='', parameters_schema=None, category, action_type=None)

Bases: ABC

Abstract base class for all tools in the system.

Subclasses must implement execute to define tool behavior. The to_definition method converts the tool into a ToolDefinition suitable for sending to an LLM provider.

Attributes:

Name Type Description
name str

Non-blank tool name.

description str

Human-readable description of the tool.

parameters_schema dict[str, Any] | None

JSON Schema dict describing expected arguments, or None if no parameter schema is defined (the invoker skips validation).

category ToolCategory

Tool category for access-level gating.

action_type str

Security action type for SecOps classification.

Initialize a tool with name, description, schema, and category.

Parameters:

Name Type Description Default
name str

Non-blank tool name.

required
description str

Human-readable description.

''
parameters_schema dict[str, Any] | None

JSON Schema for tool parameters.

None
category ToolCategory

Tool category for access-level gating.

required
action_type str | None

Security action type for SecOps classification. When None, derived from the category via DEFAULT_CATEGORY_ACTION_MAP.

None

Raises:

Type Description
ValueError

If name is empty or whitespace-only.

Source code in src/synthorg/tools/base.py
def __init__(
    self,
    *,
    name: str,
    description: str = "",
    parameters_schema: dict[str, Any] | None = None,
    category: ToolCategory,
    action_type: str | None = None,
) -> None:
    """Initialize a tool with name, description, schema, and category.

    Args:
        name: Non-blank tool name.
        description: Human-readable description.
        parameters_schema: JSON Schema for tool parameters.
        category: Tool category for access-level gating.
        action_type: Security action type for SecOps classification.
            When ``None``, derived from the category via
            ``DEFAULT_CATEGORY_ACTION_MAP``.

    Raises:
        ValueError: If name is empty or whitespace-only.
    """
    if not name or not name.strip():
        logger.warning(TOOL_BASE_INVALID_NAME, name=repr(name))
        msg = "Tool name must not be empty or whitespace-only"
        raise ValueError(msg)
    self._name = name
    self._description = description
    self._category = category
    if action_type is not None:
        parts = action_type.split(":")
        if len(parts) != 2 or not parts[0] or not parts[1]:  # noqa: PLR2004
            msg = f"action_type {action_type!r} must use 'category:action' format"
            logger.warning(TOOL_BASE_INVALID_NAME, name=repr(action_type))
            raise ValueError(msg)
        self._action_type = action_type
    else:
        if category not in DEFAULT_CATEGORY_ACTION_MAP:
            msg = f"No default action_type mapping for ToolCategory.{category.name}"
            raise ValueError(msg)
        self._action_type = str(DEFAULT_CATEGORY_ACTION_MAP[category])
    self._parameters_schema: MappingProxyType[str, Any] | None = (
        MappingProxyType(copy.deepcopy(parameters_schema))
        if parameters_schema is not None
        else None
    )

name property

name

Tool name.

category property

category

Tool category for access-level gating.

action_type property

action_type

Security action type for SecOps classification.

description property

description

Tool description.

parameters_schema property

parameters_schema

JSON Schema for tool parameters, or None if unspecified.

Returns a deep copy to prevent mutation of internal state.

to_definition

to_definition()

Convert this tool to a ToolDefinition for LLM providers.

Returns:

Type Description
ToolDefinition

A ToolDefinition with name, description, and schema.

Source code in src/synthorg/tools/base.py
def to_definition(self) -> ToolDefinition:
    """Convert this tool to a ``ToolDefinition`` for LLM providers.

    Returns:
        A ``ToolDefinition`` with name, description, and schema.
    """
    return ToolDefinition(
        name=self._name,
        description=self._description,
        parameters_schema=self.parameters_schema or {},
    )

execute abstractmethod async

execute(*, arguments)

Execute the tool with the given arguments.

Arguments are pre-validated against the tool's JSON Schema (if one is defined) by the ToolInvoker before reaching this method. Implementations with a schema can assume compliance when invoked through the invoker; tools without a schema receive unvalidated arguments.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Parsed arguments matching the parameters schema.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the tool output.

Source code in src/synthorg/tools/base.py
@abstractmethod
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Execute the tool with the given arguments.

    Arguments are pre-validated against the tool's JSON Schema (if
    one is defined) by the ``ToolInvoker`` before reaching this
    method.  Implementations with a schema can assume compliance
    when invoked through the invoker; tools without a schema
    receive unvalidated arguments.

    Args:
        arguments: Parsed arguments matching the parameters schema.

    Returns:
        A ``ToolExecutionResult`` with the tool output.
    """

Registry

registry

Tool registry -- maps tool names to BaseTool instances.

Immutable after construction. Provides lookup, membership testing, and conversion to a tuple of ToolDefinition objects for LLM providers.

ToolRegistry

ToolRegistry(tools)

Immutable registry of named tools.

Examples:

Build from a list of tools::

registry = ToolRegistry([echo_tool, search_tool])
tool = registry.get("echo")

Check membership::

if "echo" in registry:
    ...

Initialize with an iterable of tools.

Parameters:

Name Type Description Default
tools Iterable[BaseTool]

Tools to register. Duplicate names raise ValueError.

required

Raises:

Type Description
ValueError

If two tools share the same name.

Source code in src/synthorg/tools/registry.py
def __init__(self, tools: Iterable[BaseTool]) -> None:
    """Initialize with an iterable of tools.

    Args:
        tools: Tools to register. Duplicate names raise ``ValueError``.

    Raises:
        ValueError: If two tools share the same name.
    """
    mapping: dict[str, BaseTool] = {}
    for tool in tools:
        if tool.name in mapping:
            logger.warning(
                TOOL_REGISTRY_DUPLICATE,
                tool_name=tool.name,
            )
            msg = f"Duplicate tool name: {tool.name!r}"
            raise ValueError(msg)
        mapping[tool.name] = tool
    self._tools: MappingProxyType[str, BaseTool] = MappingProxyType(mapping)
    logger.info(
        TOOL_REGISTRY_BUILT,
        tool_count=len(self._tools),
        tools=sorted(self._tools),
    )

get

get(name)

Look up a tool by name.

Parameters:

Name Type Description Default
name str

Tool name.

required

Returns:

Type Description
BaseTool

The registered tool instance.

Raises:

Type Description
ToolNotFoundError

If no tool is registered with that name.

Source code in src/synthorg/tools/registry.py
def get(self, name: str) -> BaseTool:
    """Look up a tool by name.

    Args:
        name: Tool name.

    Returns:
        The registered tool instance.

    Raises:
        ToolNotFoundError: If no tool is registered with that name.
    """
    tool = self._tools.get(name)
    if tool is None:
        available = sorted(self._tools) or ["(none)"]
        logger.warning(
            TOOL_NOT_FOUND,
            tool_name=name,
            available=available,
        )
        msg = (
            f"Tool {name!r} is not registered. "
            f"Available tools: {', '.join(available)}"
        )
        raise ToolNotFoundError(msg, context={"tool": name})
    return tool

list_tools

list_tools()

Return sorted tuple of registered tool names.

Source code in src/synthorg/tools/registry.py
def list_tools(self) -> tuple[str, ...]:
    """Return sorted tuple of registered tool names."""
    return tuple(sorted(self._tools))

all_tools

all_tools()

Return all registered tool instances, sorted by name.

Source code in src/synthorg/tools/registry.py
def all_tools(self) -> tuple[BaseTool, ...]:
    """Return all registered tool instances, sorted by name."""
    return tuple(self._tools[name] for name in sorted(self._tools))

to_definitions

to_definitions()

Return all tool definitions as a sorted tuple, ordered by name.

Returns:

Type Description
tuple[ToolDefinition, ...]

Sorted tuple of tool definitions for LLM providers.

Source code in src/synthorg/tools/registry.py
def to_definitions(self) -> tuple[ToolDefinition, ...]:
    """Return all tool definitions as a sorted tuple, ordered by name.

    Returns:
        Sorted tuple of tool definitions for LLM providers.
    """
    return tuple(self._tools[name].to_definition() for name in sorted(self._tools))

__contains__

__contains__(name)

Check whether a tool name is registered.

Source code in src/synthorg/tools/registry.py
def __contains__(self, name: object) -> bool:
    """Check whether a tool name is registered."""
    if not isinstance(name, str):
        logger.debug(
            TOOL_REGISTRY_CONTAINS_TYPE_ERROR,
            name_type=type(name).__name__,
        )
        return False
    return name in self._tools

__len__

__len__()

Return the number of registered tools.

Source code in src/synthorg/tools/registry.py
def __len__(self) -> int:
    """Return the number of registered tools."""
    return len(self._tools)

Invoker

invoker

Tool invoker -- validates and executes tool calls.

Bridges LLM ToolCall objects with concrete BaseTool.execute methods. Recoverable errors are returned as ToolResult(is_error=True); non-recoverable errors (MemoryError, RecursionError) are logged and re-raised. BaseException subclasses (KeyboardInterrupt, SystemExit, asyncio.CancelledError) propagate uncaught.

ToolInvoker

ToolInvoker(
    registry,
    *,
    permission_checker=None,
    security_interceptor=None,
    agent_id=None,
    task_id=None,
    agent_provider_name=None,
    invocation_tracker=None,
)

Validate parameters, enforce security policies, and execute tools.

Recoverable errors are returned as ToolResult(is_error=True). Non-recoverable errors (MemoryError, RecursionError) are re-raised after logging.

Examples:

Invoke a single tool call::

invoker = ToolInvoker(registry)
result = await invoker.invoke(tool_call)

Invoke multiple tool calls concurrently::

results = await invoker.invoke_all(tool_calls)

Limit concurrency::

results = await invoker.invoke_all(tool_calls, max_concurrency=3)

Initialize with a tool registry and optional checkers.

Parameters:

Name Type Description Default
registry ToolRegistry

Registry to look up tools from.

required
permission_checker ToolPermissionChecker | None

Optional checker for access-level gating. When None, all registered tools are permitted.

None
security_interceptor SecurityInterceptionStrategy | None

Optional pre/post-tool security layer.

None
agent_id str | None

Agent ID for security context.

None
task_id str | None

Task ID for security context.

None
agent_provider_name str | None

Provider name the agent is using, for cross-family LLM security evaluation.

None
invocation_tracker ToolInvocationTracker | None

Optional tracker for recording invocations for the activity timeline.

None
Source code in src/synthorg/tools/invoker.py
def __init__(  # noqa: PLR0913
    self,
    registry: ToolRegistry,
    *,
    permission_checker: ToolPermissionChecker | None = None,
    security_interceptor: SecurityInterceptionStrategy | None = None,
    agent_id: str | None = None,
    task_id: str | None = None,
    agent_provider_name: str | None = None,
    invocation_tracker: ToolInvocationTracker | None = None,
) -> None:
    """Initialize with a tool registry and optional checkers.

    Args:
        registry: Registry to look up tools from.
        permission_checker: Optional checker for access-level gating.
            When ``None``, all registered tools are permitted.
        security_interceptor: Optional pre/post-tool security layer.
        agent_id: Agent ID for security context.
        task_id: Task ID for security context.
        agent_provider_name: Provider name the agent is using,
            for cross-family LLM security evaluation.
        invocation_tracker: Optional tracker for recording
            invocations for the activity timeline.
    """
    self._registry = registry
    self._permission_checker = permission_checker
    self._security_interceptor = security_interceptor
    self._agent_id = agent_id
    self._task_id = task_id
    self._agent_provider_name = agent_provider_name
    self._invocation_tracker = invocation_tracker

    self._pending_escalations: list[EscalationInfo] = []

registry property

registry

Read-only access to the underlying tool registry.

pending_escalations property

pending_escalations

Escalations detected during the most recent invoke/invoke_all.

Populated when a security ESCALATE verdict with a non-None approval_id is returned, or when a tool returns requires_parking metadata. Cleared at the start of every invoke() and invoke_all() call.

get_permitted_definitions

get_permitted_definitions()

Return tool definitions filtered by the permission checker.

When no permission checker is set, returns all definitions.

Returns:

Type Description
tuple[ToolDefinition, ...]

Tuple of permitted tool definitions, sorted by name.

Source code in src/synthorg/tools/invoker.py
def get_permitted_definitions(self) -> tuple[ToolDefinition, ...]:
    """Return tool definitions filtered by the permission checker.

    When no permission checker is set, returns all definitions.

    Returns:
        Tuple of permitted tool definitions, sorted by name.
    """
    if self._permission_checker is None:
        return self._registry.to_definitions()
    return self._permission_checker.filter_definitions(self._registry)

invoke async

invoke(tool_call)

Execute a single tool call.

Steps
  1. Look up the tool in the registry.
  2. Check permissions against the permission checker (if any).
  3. Validate arguments against the tool's JSON Schema (if any).
  4. Run security interceptor pre-tool check (if any).
  5. Call tool.execute(arguments=...).
  6. Scan tool output for sensitive data (if interceptor is set).
  7. Return a ToolResult with the output.

Recoverable errors produce ToolResult(is_error=True). Non-recoverable errors are re-raised.

Parameters:

Name Type Description Default
tool_call ToolCall

The tool call from the LLM.

required

Returns:

Type Description
ToolResult

A ToolResult with the tool's output or error message.

Source code in src/synthorg/tools/invoker.py
async def invoke(self, tool_call: ToolCall) -> ToolResult:
    """Execute a single tool call.

    Steps:
        1. Look up the tool in the registry.
        2. Check permissions against the permission checker (if any).
        3. Validate arguments against the tool's JSON Schema (if any).
        4. Run security interceptor pre-tool check (if any).
        5. Call ``tool.execute(arguments=...)``.
        6. Scan tool output for sensitive data (if interceptor is set).
        7. Return a ``ToolResult`` with the output.

    Recoverable errors produce ``ToolResult(is_error=True)``.
    Non-recoverable errors are re-raised.

    Args:
        tool_call: The tool call from the LLM.

    Returns:
        A ``ToolResult`` with the tool's output or error message.
    """
    self._pending_escalations.clear()
    return await self._invoke_single(tool_call)

invoke_all async

invoke_all(tool_calls, *, max_concurrency=None)

Execute multiple tool calls concurrently.

Parameters:

Name Type Description Default
tool_calls Iterable[ToolCall]

Tool calls to execute.

required
max_concurrency int | None

Max concurrent invocations (>= 1).

None

Returns:

Type Description
tuple[ToolResult, ...]

Tuple of results in the same order as the input.

Raises:

Type Description
ValueError

If max_concurrency < 1.

MemoryError

Re-raised if a single fatal error occurred.

RecursionError

Re-raised if a single fatal error occurred.

ExceptionGroup

If multiple fatal errors occurred.

Source code in src/synthorg/tools/invoker.py
async def invoke_all(
    self,
    tool_calls: Iterable[ToolCall],
    *,
    max_concurrency: int | None = None,
) -> tuple[ToolResult, ...]:
    """Execute multiple tool calls concurrently.

    Args:
        tool_calls: Tool calls to execute.
        max_concurrency: Max concurrent invocations (``>= 1``).

    Returns:
        Tuple of results in the same order as the input.

    Raises:
        ValueError: If *max_concurrency* < 1.
        MemoryError: Re-raised if a single fatal error occurred.
        RecursionError: Re-raised if a single fatal error occurred.
        ExceptionGroup: If multiple fatal errors occurred.
    """
    self._pending_escalations.clear()

    if max_concurrency is not None and max_concurrency < 1:
        msg = f"max_concurrency must be >= 1, got {max_concurrency}"
        raise ValueError(msg)

    calls = list(tool_calls)
    if not calls:
        return ()

    logger.info(
        TOOL_INVOKE_ALL_START,
        count=len(calls),
        max_concurrency=max_concurrency,
    )

    results: dict[int, ToolResult] = {}
    fatal_errors: list[Exception] = []
    semaphore = (
        asyncio.Semaphore(max_concurrency) if max_concurrency is not None else None
    )

    async with asyncio.TaskGroup() as tg:
        for idx, call in enumerate(calls):
            tg.create_task(
                self._run_guarded(
                    idx,
                    call,
                    results,
                    fatal_errors,
                    semaphore,
                ),
            )

    logger.info(
        TOOL_INVOKE_ALL_COMPLETE,
        count=len(calls),
        fatal_count=len(fatal_errors),
    )

    self._raise_fatal_errors(fatal_errors)

    # Sort escalations by tool-call index for deterministic ordering.
    if len(self._pending_escalations) > 1:
        call_id_order = {tc.id: idx for idx, tc in enumerate(calls)}
        self._pending_escalations.sort(
            key=lambda e: call_id_order.get(e.tool_call_id, len(calls)),
        )

    return tuple(results[i] for i in range(len(calls)))

Permissions

permissions

Tool permission checker -- enforces access-level gating.

Resolves tool permissions using a priority-based system: 1. If tool name is in denied → DENIED 2. If tool name is in allowed → ALLOWED 3. If access level is CUSTOM → DENIED 4. If tool category is in the level's allowed categories → ALLOWED 5. Otherwise → DENIED

ToolPermissionChecker

ToolPermissionChecker(
    *, access_level=STANDARD, allowed=frozenset(), denied=frozenset()
)

Enforces tool access permissions based on access level and explicit lists.

Each access level grants a set of tool categories. Explicit allowed and denied lists override the level-based rules (denied has highest priority). All name matching is case-insensitive.

Examples:

Create from agent permissions::

checker = ToolPermissionChecker.from_permissions(identity.tools)
if checker.is_permitted("git_push", ToolCategory.VERSION_CONTROL):
    ...

Filter tool definitions for LLM prompt::

defs = checker.filter_definitions(registry)

Initialize with access level and explicit name lists.

Parameters:

Name Type Description Default
access_level ToolAccessLevel

Base access level for category gating.

STANDARD
allowed frozenset[str]

Explicitly allowed tool names (normalized on store).

frozenset()
denied frozenset[str]

Explicitly denied tool names (normalized on store).

frozenset()
Source code in src/synthorg/tools/permissions.py
def __init__(
    self,
    *,
    access_level: ToolAccessLevel = ToolAccessLevel.STANDARD,
    allowed: frozenset[str] = frozenset(),
    denied: frozenset[str] = frozenset(),
) -> None:
    """Initialize with access level and explicit name lists.

    Args:
        access_level: Base access level for category gating.
        allowed: Explicitly allowed tool names (normalized on store).
        denied: Explicitly denied tool names (normalized on store).
    """
    self._access_level = access_level
    self._allowed = frozenset(n.strip().casefold() for n in allowed)
    self._denied = frozenset(n.strip().casefold() for n in denied)
    logger.debug(
        TOOL_PERMISSION_CHECKER_CREATED,
        access_level=access_level.value,
        allowed_count=len(self._allowed),
        denied_count=len(self._denied),
    )

from_permissions classmethod

from_permissions(permissions)

Create a checker from an agent's ToolPermissions model.

Parameters:

Name Type Description Default
permissions ToolPermissions

Agent tool permissions.

required

Returns:

Type Description
Self

Configured permission checker.

Source code in src/synthorg/tools/permissions.py
@classmethod
def from_permissions(cls, permissions: ToolPermissions) -> Self:
    """Create a checker from an agent's ``ToolPermissions`` model.

    Args:
        permissions: Agent tool permissions.

    Returns:
        Configured permission checker.
    """
    return cls(
        access_level=permissions.access_level,
        allowed=frozenset(permissions.allowed),
        denied=frozenset(permissions.denied),
    )

is_permitted

is_permitted(tool_name, category)

Check whether a tool is permitted.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
category ToolCategory

Category of the tool.

required

Returns:

Type Description
bool

True if the tool is permitted, False otherwise.

Source code in src/synthorg/tools/permissions.py
def is_permitted(self, tool_name: str, category: ToolCategory) -> bool:
    """Check whether a tool is permitted.

    Args:
        tool_name: Name of the tool.
        category: Category of the tool.

    Returns:
        ``True`` if the tool is permitted, ``False`` otherwise.
    """
    name_lower = tool_name.strip().casefold()
    if name_lower in self._denied:
        return False
    if name_lower in self._allowed:
        return True
    if self._access_level == ToolAccessLevel.CUSTOM:
        return False
    allowed_cats = self._LEVEL_CATEGORIES[self._access_level]
    return category in allowed_cats

check

check(tool_name, category)

Assert that a tool is permitted, raising on denial.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
category ToolCategory

Category of the tool.

required

Raises:

Type Description
ToolPermissionDeniedError

If the tool is not permitted.

Source code in src/synthorg/tools/permissions.py
def check(self, tool_name: str, category: ToolCategory) -> None:
    """Assert that a tool is permitted, raising on denial.

    Args:
        tool_name: Name of the tool.
        category: Category of the tool.

    Raises:
        ToolPermissionDeniedError: If the tool is not permitted.
    """
    if not self.is_permitted(tool_name, category):
        reason = self.denial_reason(tool_name, category)
        logger.warning(
            TOOL_PERMISSION_DENIED,
            tool_name=tool_name,
            category=category.value,
            reason=reason,
        )
        raise ToolPermissionDeniedError(
            reason,
            context={"tool": tool_name, "category": category.value},
        )

denial_reason

denial_reason(tool_name, category)

Return a human-readable reason why a tool would be denied.

Intended for use after confirming the tool is denied via is_permitted or via check. If the tool is actually permitted, the returned string does not apply and should not be shown to users.

Parameters:

Name Type Description Default
tool_name str

Name of the tool.

required
category ToolCategory

Category of the tool.

required

Returns:

Type Description
str

Explanation string suitable for error messages.

Source code in src/synthorg/tools/permissions.py
def denial_reason(self, tool_name: str, category: ToolCategory) -> str:
    """Return a human-readable reason why a tool would be denied.

    Intended for use after confirming the tool is denied via
    ``is_permitted`` or via ``check``.  If the tool is actually
    permitted, the returned string does not apply and should not
    be shown to users.

    Args:
        tool_name: Name of the tool.
        category: Category of the tool.

    Returns:
        Explanation string suitable for error messages.
    """
    name_lower = tool_name.strip().casefold()
    if name_lower in self._denied:
        return f"Tool {tool_name!r} is explicitly denied"
    if self._access_level == ToolAccessLevel.CUSTOM:
        return (
            f"Tool {tool_name!r} is not in the allowed list (access level: custom)"
        )
    return (
        f"Category {category.value!r} is not permitted "
        f"at access level {self._access_level.value!r}"
    )

filter_definitions

filter_definitions(registry)

Return only permitted tool definitions from a registry.

Parameters:

Name Type Description Default
registry ToolRegistry

Tool registry to filter.

required

Returns:

Type Description
tuple[ToolDefinition, ...]

Tuple of permitted tool definitions, sorted by tool name.

Source code in src/synthorg/tools/permissions.py
def filter_definitions(self, registry: ToolRegistry) -> tuple[ToolDefinition, ...]:
    """Return only permitted tool definitions from a registry.

    Args:
        registry: Tool registry to filter.

    Returns:
        Tuple of permitted tool definitions, sorted by tool name.
    """
    tool_names = registry.list_tools()
    result: list[ToolDefinition] = []
    for name in tool_names:
        tool = registry.get(name)
        if self.is_permitted(name, tool.category):
            result.append(tool.to_definition())
    result.sort(key=lambda d: d.name)
    excluded = len(tool_names) - len(result)
    if excluded:
        logger.debug(
            TOOL_PERMISSION_FILTERED,
            access_level=self._access_level.value,
            total=len(tool_names),
            permitted=len(result),
            excluded=excluded,
        )
    return tuple(result)

Errors

errors

Tool error hierarchy.

All tool errors carry an immutable context mapping for structured metadata. Unlike provider errors, tool errors have no is_retryable flag -- retry decisions are made at higher layers.

ToolError

ToolError(message, *, context=None)

Bases: Exception

Base exception for all tool-layer errors.

Attributes:

Name Type Description
message

Human-readable error description.

context MappingProxyType[str, Any]

Immutable metadata about the error (tool name, etc.).

Initialize a tool error.

Parameters:

Name Type Description Default
message str

Human-readable error description.

required
context dict[str, Any] | None

Arbitrary metadata about the error. Stored as an immutable mapping; defaults to empty if not provided.

None
Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

__str__

__str__()

Format error with optional context metadata.

Source code in src/synthorg/tools/errors.py
def __str__(self) -> str:
    """Format error with optional context metadata."""
    if self.context:
        ctx = ", ".join(f"{k}={v!r}" for k, v in self.context.items())
        return f"{self.message} ({ctx})"
    return self.message

ToolNotFoundError

ToolNotFoundError(message, *, context=None)

Bases: ToolError

Requested tool is not registered in the registry.

Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ToolParameterError

ToolParameterError(message, *, context=None)

Bases: ToolError

Tool parameters failed schema validation.

Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ToolExecutionError

ToolExecutionError(message, *, context=None)

Bases: ToolError

Tool execution raised an unexpected error.

Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

ToolPermissionDeniedError

ToolPermissionDeniedError(message, *, context=None)

Bases: ToolError

Tool invocation blocked by the permission checker.

Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

Code Runner

code_runner

Code runner tool -- executes code snippets in a sandboxed environment.

Supports Python, JavaScript, and Bash via configurable sandbox backends.

CodeRunnerTool

CodeRunnerTool(*, sandbox)

Bases: BaseTool

Executes code snippets in a sandboxed environment.

Supports Python, JavaScript, and Bash. Delegates execution to a SandboxBackend for isolation and resource control.

Initialize the code runner tool.

Parameters:

Name Type Description Default
sandbox SandboxBackend

Sandbox backend for isolated code execution.

required
Source code in src/synthorg/tools/code_runner.py
def __init__(self, *, sandbox: SandboxBackend) -> None:
    """Initialize the code runner tool.

    Args:
        sandbox: Sandbox backend for isolated code execution.
    """
    super().__init__(
        name="code_runner",
        description=(
            "Executes code snippets in Python, JavaScript, "
            "or Bash within a sandboxed environment"
        ),
        category=ToolCategory.CODE_EXECUTION,
        parameters_schema=dict(_PARAMETERS_SCHEMA),
    )
    self._sandbox = sandbox

execute async

execute(*, arguments)

Execute a code snippet in the sandbox.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Must contain code (str), language (str), and optionally timeout (float).

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with execution output.

Source code in src/synthorg/tools/code_runner.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Execute a code snippet in the sandbox.

    Args:
        arguments: Must contain ``code`` (str), ``language`` (str),
            and optionally ``timeout`` (float).

    Returns:
        A ``ToolExecutionResult`` with execution output.
    """
    code: str = arguments["code"]
    language: str = arguments["language"]
    timeout: float | None = arguments.get("timeout")

    if language not in _LANGUAGE_COMMANDS:
        logger.warning(
            CODE_RUNNER_INVALID_LANGUAGE,
            language=language,
        )
        return ToolExecutionResult(
            content=f"Unsupported language: {language!r}. "
            f"Supported: {sorted(_LANGUAGE_COMMANDS)}",
            is_error=True,
        )

    command, flag = _LANGUAGE_COMMANDS[language]

    logger.debug(
        CODE_RUNNER_EXECUTE_START,
        language=language,
        timeout=timeout,
        code_length=len(code),
    )

    try:
        result = await self._sandbox.execute(
            command=command,
            args=(flag, code),
            timeout=timeout,
        )
    except SandboxError as exc:
        logger.warning(
            CODE_RUNNER_EXECUTE_FAILED,
            language=language,
            error=str(exc),
        )
        return ToolExecutionResult(
            content=f"Sandbox error: {exc}",
            is_error=True,
            metadata={"language": language},
        )

    if result.success:
        logger.debug(
            CODE_RUNNER_EXECUTE_SUCCESS,
            language=language,
        )
        return ToolExecutionResult(
            content=result.stdout or "(no output)",
            metadata={
                "returncode": result.returncode,
                "language": language,
            },
        )

    logger.warning(
        CODE_RUNNER_EXECUTE_FAILED,
        language=language,
        returncode=result.returncode,
        timed_out=result.timed_out,
    )
    error_msg = result.stderr or result.stdout or "Execution failed"
    if result.timed_out:
        error_msg = f"Execution timed out. {error_msg}"
    return ToolExecutionResult(
        content=error_msg,
        is_error=True,
        metadata={
            "returncode": result.returncode,
            "timed_out": result.timed_out,
            "language": language,
        },
    )

Git Tools

git_tools

Built-in git tools for version control operations.

Provides workspace-scoped git tools that agents use to interact with git repositories. All tools enforce workspace boundary security -- the LLM never controls absolute paths. See _git_base._BaseGitTool for the subprocess execution model, environment hardening, and path validation shared by all tools.

GitStatusTool

GitStatusTool(*, workspace, sandbox=None)

Bases: _BaseGitTool

Show the working tree status of the git repository.

Returns the output of git status with optional short or porcelain formatting.

Initialize the git_status tool.

Parameters:

Name Type Description Default
workspace Path

Absolute path to the workspace root.

required
sandbox SandboxBackend | None

Optional sandbox backend for subprocess isolation.

None
Source code in src/synthorg/tools/git_tools.py
def __init__(
    self,
    *,
    workspace: Path,
    sandbox: SandboxBackend | None = None,
) -> None:
    """Initialize the git_status tool.

    Args:
        workspace: Absolute path to the workspace root.
        sandbox: Optional sandbox backend for subprocess isolation.
    """
    super().__init__(
        name="git_status",
        description=(
            "Show the working tree status. Returns modified, staged, "
            "and untracked files in the workspace repository."
        ),
        parameters_schema={
            "type": "object",
            "properties": {
                "short": {
                    "type": "boolean",
                    "description": "Use short format output.",
                    "default": False,
                },
                "porcelain": {
                    "type": "boolean",
                    "description": ("Use machine-readable porcelain format."),
                    "default": False,
                },
            },
            "additionalProperties": False,
        },
        workspace=workspace,
        sandbox=sandbox,
        action_type=ActionType.VCS_READ,
    )

execute async

execute(*, arguments)

Run git status.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Optional short and porcelain flags.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the status output.

Source code in src/synthorg/tools/git_tools.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Run ``git status``.

    Args:
        arguments: Optional ``short`` and ``porcelain`` flags.

    Returns:
        A ``ToolExecutionResult`` with the status output.
    """
    args = ["status"]
    if arguments.get("porcelain"):
        args.append("--porcelain")
    elif arguments.get("short"):
        args.append("--short")
    return await self._run_git(args)

GitLogTool

GitLogTool(*, workspace, sandbox=None)

Bases: _BaseGitTool

Show commit log history.

Returns recent commits with optional filtering by count, author, date range, ref, and paths.

Initialize the git_log tool.

Parameters:

Name Type Description Default
workspace Path

Absolute path to the workspace root.

required
sandbox SandboxBackend | None

Optional sandbox backend for subprocess isolation.

None
Source code in src/synthorg/tools/git_tools.py
def __init__(
    self,
    *,
    workspace: Path,
    sandbox: SandboxBackend | None = None,
) -> None:
    """Initialize the git_log tool.

    Args:
        workspace: Absolute path to the workspace root.
        sandbox: Optional sandbox backend for subprocess isolation.
    """
    super().__init__(
        name="git_log",
        description=(
            "Show commit log. Returns recent commits with optional "
            "filtering by count, author, date range, ref, and paths."
        ),
        parameters_schema=_GIT_LOG_SCHEMA,
        workspace=workspace,
        sandbox=sandbox,
        action_type=ActionType.VCS_READ,
    )

execute async

execute(*, arguments)

Run git log.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Log options (max_count, oneline, ref, author, since, until, paths).

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the log output.

Source code in src/synthorg/tools/git_tools.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Run ``git log``.

    Args:
        arguments: Log options (max_count, oneline, ref, author,
            since, until, paths).

    Returns:
        A ``ToolExecutionResult`` with the log output.
    """
    max_count = min(
        arguments.get("max_count", 10),
        self._MAX_COUNT_LIMIT,
    )
    args = ["log", f"--max-count={max_count}"]

    if arguments.get("oneline"):
        args.append("--oneline")

    filter_args = self._build_filter_args(arguments)
    if isinstance(filter_args, ToolExecutionResult):
        return filter_args
    args.extend(filter_args)

    if ref := arguments.get("ref"):
        if err := self._check_git_arg(ref, param="ref"):
            return err
        args.append(ref)

    paths: list[str] = arguments.get("paths", [])
    if paths:
        if err := self._check_paths(paths):
            return err
        args.append("--")
        args.extend(paths)

    result = await self._run_git(args)
    if not result.is_error and not result.content:
        return ToolExecutionResult(content="No commits found")
    return result

GitDiffTool

GitDiffTool(*, workspace, sandbox=None)

Bases: _BaseGitTool

Show changes between commits, the index, and the working tree.

Returns the output of git diff with optional ref comparison, staged changes view, stat summary, and path filtering.

Initialize the git_diff tool.

Parameters:

Name Type Description Default
workspace Path

Absolute path to the workspace root.

required
sandbox SandboxBackend | None

Optional sandbox backend for subprocess isolation.

None
Source code in src/synthorg/tools/git_tools.py
def __init__(
    self,
    *,
    workspace: Path,
    sandbox: SandboxBackend | None = None,
) -> None:
    """Initialize the git_diff tool.

    Args:
        workspace: Absolute path to the workspace root.
        sandbox: Optional sandbox backend for subprocess isolation.
    """
    super().__init__(
        name="git_diff",
        description=(
            "Show changes between commits, index, and working tree. "
            "Supports staged changes, ref comparison, and path "
            "filtering."
        ),
        action_type=ActionType.VCS_READ,
        parameters_schema={
            "type": "object",
            "properties": {
                "staged": {
                    "type": "boolean",
                    "description": "Show staged (cached) changes.",
                    "default": False,
                },
                "ref1": {
                    "type": "string",
                    "description": "First ref for comparison.",
                },
                "ref2": {
                    "type": "string",
                    "description": "Second ref for comparison.",
                },
                "stat": {
                    "type": "boolean",
                    "description": ("Show diffstat summary instead of full diff."),
                    "default": False,
                },
                "paths": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": "Limit diff to these paths.",
                },
            },
            "additionalProperties": False,
        },
        workspace=workspace,
        sandbox=sandbox,
    )

execute async

execute(*, arguments)

Run git diff.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Diff options (staged, ref1, ref2, stat, paths).

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the diff output. Empty diff

ToolExecutionResult

returns "No changes" (not an error).

Source code in src/synthorg/tools/git_tools.py
async def execute(  # noqa: C901
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Run ``git diff``.

    Args:
        arguments: Diff options (staged, ref1, ref2, stat, paths).

    Returns:
        A ``ToolExecutionResult`` with the diff output. Empty diff
        returns "No changes" (not an error).
    """
    args = ["diff"]

    if arguments.get("staged"):
        args.append("--cached")

    if arguments.get("stat"):
        args.append("--stat")

    if ref1 := arguments.get("ref1"):
        if err := self._check_git_arg(ref1, param="ref1"):
            return err
        args.append(ref1)
    if ref2 := arguments.get("ref2"):
        if not ref1:
            return ToolExecutionResult(
                content="ref2 requires ref1 to be specified",
                is_error=True,
            )
        if err := self._check_git_arg(ref2, param="ref2"):
            return err
        args.append(ref2)

    paths: list[str] = arguments.get("paths", [])
    if paths:
        if err := self._check_paths(paths):
            return err
        args.append("--")
        args.extend(paths)

    result = await self._run_git(args)
    if not result.is_error and not result.content:
        return ToolExecutionResult(content="No changes")
    return result

GitBranchTool

GitBranchTool(*, workspace, sandbox=None)

Bases: _BaseGitTool

Manage branches -- list, create, switch, or delete.

Supports listing all branches, creating new branches (optionally from a start point), switching between branches, and deleting branches.

Initialize the git_branch tool.

Parameters:

Name Type Description Default
workspace Path

Absolute path to the workspace root.

required
sandbox SandboxBackend | None

Optional sandbox backend for subprocess isolation.

None
Source code in src/synthorg/tools/git_tools.py
def __init__(
    self,
    *,
    workspace: Path,
    sandbox: SandboxBackend | None = None,
) -> None:
    """Initialize the git_branch tool.

    Args:
        workspace: Absolute path to the workspace root.
        sandbox: Optional sandbox backend for subprocess isolation.
    """
    super().__init__(
        name="git_branch",
        description=(
            "Manage branches: list, create, switch, or delete. "
            "Provide an action and branch name as needed."
        ),
        action_type=ActionType.VCS_BRANCH,
        parameters_schema={
            "type": "object",
            "properties": {
                "action": {
                    "type": "string",
                    "enum": [
                        "list",
                        "create",
                        "switch",
                        "delete",
                    ],
                    "description": "Branch action to perform.",
                    "default": "list",
                },
                "name": {
                    "type": "string",
                    "description": (
                        "Branch name (required for create/switch/delete)."
                    ),
                },
                "start_point": {
                    "type": "string",
                    "description": ("Starting ref for branch creation."),
                },
                "force": {
                    "type": "boolean",
                    "description": (
                        "Force delete (-D) instead of safe delete (-d)."
                    ),
                    "default": False,
                },
            },
            "required": ["action"],
            "additionalProperties": False,
        },
        workspace=workspace,
        sandbox=sandbox,
    )

execute async

execute(*, arguments)

Run a branch operation.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Branch action, name, start_point, force.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the operation output.

Source code in src/synthorg/tools/git_tools.py
async def execute(  # noqa: PLR0911
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Run a branch operation.

    Args:
        arguments: Branch action, name, start_point, force.

    Returns:
        A ``ToolExecutionResult`` with the operation output.
    """
    action: str = arguments.get("action", "list")
    name: str | None = arguments.get("name")

    if action in self._ACTIONS_REQUIRING_NAME and not name:
        return ToolExecutionResult(
            content=(f"Branch name is required for '{action}' action"),
            is_error=True,
        )

    if action == "list":
        return await self._list_branches()

    # Narrowing: guaranteed non-None by guard above.
    branch_name: str = name  # type: ignore[assignment]

    if err := self._check_git_arg(branch_name, param="name"):
        return err

    if action == "create":
        return await self._create_branch(branch_name, arguments)

    if action == "switch":
        return await self._run_git(["switch", branch_name])

    if action == "delete":
        flag = "-D" if arguments.get("force") else "-d"
        return await self._run_git(["branch", flag, branch_name])

    return ToolExecutionResult(
        content=f"Unknown branch action: {action!r}",
        is_error=True,
    )

GitCommitTool

GitCommitTool(*, workspace, sandbox=None)

Bases: _BaseGitTool

Stage and commit changes.

Stages specified paths (or all changes with all=True), then creates a commit with the provided message.

Initialize the git_commit tool.

Parameters:

Name Type Description Default
workspace Path

Absolute path to the workspace root.

required
sandbox SandboxBackend | None

Optional sandbox backend for subprocess isolation.

None
Source code in src/synthorg/tools/git_tools.py
def __init__(
    self,
    *,
    workspace: Path,
    sandbox: SandboxBackend | None = None,
) -> None:
    """Initialize the git_commit tool.

    Args:
        workspace: Absolute path to the workspace root.
        sandbox: Optional sandbox backend for subprocess isolation.
    """
    super().__init__(
        name="git_commit",
        action_type=ActionType.VCS_COMMIT,
        description=(
            "Stage and commit changes. Provide a commit message and "
            "optionally specify paths to stage or use 'all' to stage "
            "everything."
        ),
        parameters_schema={
            "type": "object",
            "properties": {
                "message": {
                    "type": "string",
                    "description": "Commit message.",
                },
                "paths": {
                    "type": "array",
                    "items": {"type": "string"},
                    "description": ("Paths to stage before committing."),
                },
                "all": {
                    "type": "boolean",
                    "description": ("Stage all modified and deleted files."),
                    "default": False,
                },
            },
            "required": ["message"],
            "additionalProperties": False,
        },
        workspace=workspace,
        sandbox=sandbox,
    )

execute async

execute(*, arguments)

Stage and commit changes.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Commit message, optional paths, optional all flag.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the commit output.

Source code in src/synthorg/tools/git_tools.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Stage and commit changes.

    Args:
        arguments: Commit message, optional paths, optional all flag.

    Returns:
        A ``ToolExecutionResult`` with the commit output.
    """
    message: str = arguments["message"]
    paths: list[str] = arguments.get("paths", [])
    stage_all: bool = arguments.get("all", False)

    if paths:
        if err := self._check_paths(paths):
            return err
        add_result = await self._run_git(["add", "--", *paths])
        if add_result.is_error:
            return add_result
    elif stage_all:
        add_result = await self._run_git(["add", "-A"])
        if add_result.is_error:
            return add_result
    else:
        logger.debug(
            GIT_COMMAND_START,
            command=["git", "commit"],
            note="no staging requested; committing already staged",
        )

    return await self._run_git(["commit", "-m", message])

GitCloneTool

GitCloneTool(*, workspace, sandbox=None, network_policy=None)

Bases: _BaseGitTool

Clone a git repository into the workspace.

Validates that the target directory stays within the workspace boundary. Supports optional branch selection and shallow clone depth. URLs are validated against allowed schemes (https, ssh, SCP-like) and checked for SSRF via hostname/IP validation with async DNS resolution. Local paths, file://, and plain http:// URLs are rejected.

Initialize the git_clone tool.

Parameters:

Name Type Description Default
workspace Path

Absolute path to the workspace root.

required
sandbox SandboxBackend | None

Optional sandbox backend for subprocess isolation.

None
network_policy GitCloneNetworkPolicy | None

SSRF prevention network policy. Defaults to blocking all private/reserved IPs with an empty hostname allowlist.

None
Source code in src/synthorg/tools/git_tools.py
def __init__(
    self,
    *,
    workspace: Path,
    sandbox: SandboxBackend | None = None,
    network_policy: GitCloneNetworkPolicy | None = None,
) -> None:
    """Initialize the git_clone tool.

    Args:
        workspace: Absolute path to the workspace root.
        sandbox: Optional sandbox backend for subprocess isolation.
        network_policy: SSRF prevention network policy.  Defaults
            to blocking all private/reserved IPs with an empty
            hostname allowlist.
    """
    super().__init__(
        name="git_clone",
        action_type=ActionType.VCS_READ,
        description=(
            "Clone a git repository into a directory within the "
            "workspace. Supports branch selection and shallow clones."
        ),
        parameters_schema={
            "type": "object",
            "properties": {
                "url": {
                    "type": "string",
                    "description": "Repository URL to clone.",
                },
                "directory": {
                    "type": "string",
                    "description": ("Target directory name within workspace."),
                },
                "branch": {
                    "type": "string",
                    "description": "Branch to clone.",
                },
                "depth": {
                    "type": "integer",
                    "description": "Shallow clone depth.",
                    "minimum": 1,
                },
            },
            "required": ["url"],
            "additionalProperties": False,
        },
        workspace=workspace,
        sandbox=sandbox,
    )
    self._network_policy = (
        network_policy if network_policy is not None else GitCloneNetworkPolicy()
    )

execute async

execute(*, arguments)

Clone a repository.

Validation order: scheme check -> argument checks (branch, depth, directory) -> SSRF host/IP check -> TOCTOU DNS rebinding mitigation -> git clone. All cheap local checks run before the async DNS lookup.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Clone URL, optional directory, branch, depth.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the clone output.

Source code in src/synthorg/tools/git_tools.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Clone a repository.

    Validation order: scheme check -> argument checks (branch,
    depth, directory) -> SSRF host/IP check -> TOCTOU DNS
    rebinding mitigation -> ``git clone``.  All cheap local
    checks run before the async DNS lookup.

    Args:
        arguments: Clone URL, optional directory, branch, depth.

    Returns:
        A ``ToolExecutionResult`` with the clone output.
    """
    url: str = arguments["url"]

    if not is_allowed_clone_scheme(url):
        logger.warning(
            GIT_CLONE_URL_REJECTED,
            url=_CREDENTIAL_RE.sub(r"\1***@", url),
        )
        schemes = ", ".join(ALLOWED_CLONE_SCHEMES)
        return ToolExecutionResult(
            content=(
                f"Invalid clone URL. Only {schemes} "
                "and SCP-like (user@host:path) URLs are "
                "allowed"
            ),
            is_error=True,
        )

    args = ["clone"]

    if branch := arguments.get("branch"):
        if err := self._check_git_arg(branch, param="branch"):
            return err
        args.extend(["--branch", branch])

    if depth := arguments.get("depth"):
        args.extend(["--depth", str(depth)])

    args.append("--")
    args.append(url)

    if directory := arguments.get("directory"):
        if err := self._check_paths([directory]):
            return err
        args.append(directory)

    # SSRF prevention: validate hostname/IP after all local checks.
    validation = await validate_clone_url_host(url, self._network_policy)
    if isinstance(validation, str):
        return ToolExecutionResult(content=validation, is_error=True)

    # TOCTOU DNS rebinding mitigation
    result = await self._apply_toctou_mitigation(args, validation)
    if isinstance(result, ToolExecutionResult):
        return result
    args = result

    return await self._run_git(args, deadline=_CLONE_TIMEOUT)

File System Tools

read_file

Read file tool -- reads file content from the workspace.

Supports optional line-range selection and enforces a maximum file-size guard to prevent loading excessively large files into memory.

ReadFileTool

ReadFileTool(*, workspace_root)

Bases: BaseFileSystemTool

Reads the content of a file within the workspace.

Supports optional start_line / end_line for partial reads. Files exceeding 1 MB are read in bounded fashion: when no line range is specified only the first 1 MB is returned (with a truncation notice). Binary (non-UTF-8) files produce an error.

Examples:

Read an entire file::

tool = ReadFileTool(workspace_root=Path("/ws"))
result = await tool.execute(arguments={"path": "src/main.py"})

Initialize the read-file tool.

Parameters:

Name Type Description Default
workspace_root Path

Root directory bounding file access.

required
Source code in src/synthorg/tools/file_system/read_file.py
def __init__(self, *, workspace_root: Path) -> None:
    """Initialize the read-file tool.

    Args:
        workspace_root: Root directory bounding file access.
    """
    super().__init__(
        workspace_root=workspace_root,
        name="read_file",
        description=(
            "Read the contents of a file. Supports optional "
            "line-range selection via start_line and end_line."
        ),
        action_type=ActionType.CODE_READ,
        parameters_schema={
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "File path relative to workspace",
                },
                "start_line": {
                    "type": "integer",
                    "minimum": 1,
                    "description": "First line to read (1-based inclusive)",
                },
                "end_line": {
                    "type": "integer",
                    "minimum": 1,
                    "description": "Last line to read (1-based inclusive)",
                },
            },
            "required": ["path"],
            "additionalProperties": False,
        },
    )

execute async

execute(*, arguments)

Read a file and return its content.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Must contain path; optionally start_line and end_line.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the file content or an error.

Source code in src/synthorg/tools/file_system/read_file.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Read a file and return its content.

    Args:
        arguments: Must contain ``path``; optionally ``start_line``
            and ``end_line``.

    Returns:
        A ``ToolExecutionResult`` with the file content or an error.
    """
    user_path: str = arguments["path"]
    start_line: int | None = arguments.get("start_line")
    end_line: int | None = arguments.get("end_line")

    if err := self._validate_read_args(start_line, end_line):
        return err

    try:
        resolved = self.path_validator.validate(user_path)
    except ValueError as exc:
        return ToolExecutionResult(content=str(exc), is_error=True)

    size_bytes, preflight_err = await self._preflight_check_file(
        user_path,
        resolved,
    )
    if preflight_err is not None:
        return preflight_err

    oversized = size_bytes > MAX_FILE_SIZE_BYTES
    if oversized:
        logger.warning(
            TOOL_FS_SIZE_EXCEEDED,
            path=user_path,
            size_bytes=size_bytes,
            max_bytes=MAX_FILE_SIZE_BYTES,
        )
        if start_line is not None or end_line is not None:
            return ToolExecutionResult(
                content=(
                    f"File too large for line-range read: "
                    f"{user_path} ({size_bytes:,} bytes, "
                    f"max {MAX_FILE_SIZE_BYTES:,})"
                ),
                is_error=True,
            )

    return await self._perform_read(
        user_path,
        resolved,
        start_line,
        end_line,
        size_bytes=size_bytes,
        oversized=oversized,
    )

write_file

Write file tool -- creates or overwrites files in the workspace.

WriteFileTool

WriteFileTool(*, workspace_root)

Bases: BaseFileSystemTool

Creates or overwrites a file within the workspace.

Optionally creates parent directories when create_directories is True.

Examples:

Write a new file::

tool = WriteFileTool(workspace_root=Path("/ws"))
result = await tool.execute(
    arguments={"path": "out.txt", "content": "hello"}
)

Initialize the write-file tool.

Parameters:

Name Type Description Default
workspace_root Path

Root directory bounding file access.

required
Source code in src/synthorg/tools/file_system/write_file.py
def __init__(self, *, workspace_root: Path) -> None:
    """Initialize the write-file tool.

    Args:
        workspace_root: Root directory bounding file access.
    """
    super().__init__(
        workspace_root=workspace_root,
        name="write_file",
        action_type=ActionType.CODE_WRITE,
        description=(
            "Write content to a file, creating or overwriting it. "
            "Set create_directories to true to create parent dirs."
        ),
        parameters_schema={
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "File path relative to workspace",
                },
                "content": {
                    "type": "string",
                    "description": "Content to write",
                },
                "create_directories": {
                    "type": "boolean",
                    "description": (
                        "Create parent directories if missing (default false)"
                    ),
                    "default": False,
                },
            },
            "required": ["path", "content"],
            "additionalProperties": False,
        },
    )

execute async

execute(*, arguments)

Write content to a file.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Must contain path and content; optionally create_directories.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult confirming the write or an error.

Source code in src/synthorg/tools/file_system/write_file.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Write content to a file.

    Args:
        arguments: Must contain ``path`` and ``content``; optionally
            ``create_directories``.

    Returns:
        A ``ToolExecutionResult`` confirming the write or an error.
    """
    user_path: str = arguments["path"]
    content: str = arguments["content"]
    create_dirs: bool = arguments.get("create_directories", False)

    if err := self._validate_write_args(user_path, content):
        return err

    resolved_or_err = self._resolve_write_path(
        user_path,
        create_dirs=create_dirs,
    )
    if isinstance(resolved_or_err, ToolExecutionResult):
        return resolved_or_err

    return await self._perform_write(
        user_path,
        resolved_or_err,
        content,
        create_dirs=create_dirs,
    )

edit_file

Edit file tool -- search-and-replace within workspace files.

EditFileTool

EditFileTool(*, workspace_root)

Bases: BaseFileSystemTool

Replaces the first occurrence of old_text with new_text.

If old_text is not found, returns an error indicating that the text was not found. When multiple occurrences exist, only the first is replaced and a warning is included in the output.

Returns immediately with no change if old_text and new_text are identical.

Examples:

Replace text::

tool = EditFileTool(workspace_root=Path("/ws"))
result = await tool.execute(
    arguments={
        "path": "main.py",
        "old_text": "foo",
        "new_text": "bar",
    }
)

Initialize the edit-file tool.

Parameters:

Name Type Description Default
workspace_root Path

Root directory bounding file access.

required
Source code in src/synthorg/tools/file_system/edit_file.py
def __init__(self, *, workspace_root: Path) -> None:
    """Initialize the edit-file tool.

    Args:
        workspace_root: Root directory bounding file access.
    """
    super().__init__(
        workspace_root=workspace_root,
        name="edit_file",
        action_type=ActionType.CODE_WRITE,
        description=(
            "Replace the first occurrence of old_text with new_text "
            "in a file. Use empty new_text to delete text."
        ),
        parameters_schema={
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "File path relative to workspace",
                },
                "old_text": {
                    "type": "string",
                    "minLength": 1,
                    "description": "Exact text to find",
                },
                "new_text": {
                    "type": "string",
                    "description": "Replacement text (empty string to delete)",
                },
            },
            "required": ["path", "old_text", "new_text"],
            "additionalProperties": False,
        },
    )

execute async

execute(*, arguments)

Edit a file by replacing text.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Must contain path, old_text, new_text.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult confirming the edit or an error.

Source code in src/synthorg/tools/file_system/edit_file.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Edit a file by replacing text.

    Args:
        arguments: Must contain ``path``, ``old_text``, ``new_text``.

    Returns:
        A ``ToolExecutionResult`` confirming the edit or an error.
    """
    user_path: str = arguments["path"]
    old_text: str = arguments["old_text"]
    new_text: str = arguments["new_text"]

    if err := self._validate_edit_args(user_path, old_text, new_text):
        return err

    try:
        resolved = self.path_validator.validate(user_path)
    except ValueError as exc:
        return ToolExecutionResult(content=str(exc), is_error=True)

    if err := await self._preflight_check_file(user_path, resolved):
        return err

    return await self._perform_edit(
        user_path,
        resolved,
        old_text,
        new_text,
    )

delete_file

Delete file tool -- removes a single file from the workspace.

DeleteFileTool

DeleteFileTool(*, workspace_root)

Bases: BaseFileSystemTool

Deletes a single file within the workspace.

Directories cannot be deleted with this tool -- only regular files. The require_elevated property is defined for future use by the engine's permission system (not yet enforced).

Examples:

Delete a file::

tool = DeleteFileTool(workspace_root=Path("/ws"))
result = await tool.execute(arguments={"path": "tmp.txt"})

Initialize the delete-file tool.

Parameters:

Name Type Description Default
workspace_root Path

Root directory bounding file access.

required
Source code in src/synthorg/tools/file_system/delete_file.py
def __init__(self, *, workspace_root: Path) -> None:
    """Initialize the delete-file tool.

    Args:
        workspace_root: Root directory bounding file access.
    """
    super().__init__(
        workspace_root=workspace_root,
        name="delete_file",
        action_type=ActionType.CODE_DELETE,
        description="Delete a single file from the workspace.",
        parameters_schema={
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "File path relative to workspace",
                },
            },
            "required": ["path"],
            "additionalProperties": False,
        },
    )

require_elevated property

require_elevated

Whether this tool requires elevated permissions.

Indicates this tool requires explicit approval before execution due to its destructive nature. Not yet consumed by the engine; defined for forward-compatibility.

execute async

execute(*, arguments)

Delete a file from the workspace.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Must contain path.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult confirming deletion or an error.

Source code in src/synthorg/tools/file_system/delete_file.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Delete a file from the workspace.

    Args:
        arguments: Must contain ``path``.

    Returns:
        A ``ToolExecutionResult`` confirming deletion or an error.
    """
    user_path: str = arguments["path"]

    try:
        resolved = self.path_validator.validate(user_path)
    except ValueError as exc:
        return ToolExecutionResult(content=str(exc), is_error=True)

    try:
        size_bytes = await asyncio.to_thread(_delete_sync, resolved)
    except OSError as exc:
        return self._handle_delete_error(exc, user_path)

    logger.info(
        TOOL_FS_DELETE,
        path=user_path,
        size_bytes=size_bytes,
    )
    return ToolExecutionResult(
        content=f"Deleted {user_path} ({size_bytes} bytes)",
        metadata={
            "path": user_path,
            "size_bytes": size_bytes,
        },
    )

list_directory

List directory tool -- lists entries in a workspace directory.

ListDirectoryTool

ListDirectoryTool(*, workspace_root)

Bases: BaseFileSystemTool

Lists files and directories within the workspace.

Supports optional glob filtering and recursive listing. Output is sorted alphabetically with type prefixes ([DIR] / [FILE] / [SYMLINK] / [ERROR]). Results are capped at MAX_ENTRIES (1000) entries to prevent excessive output.

Examples:

List current directory::

tool = ListDirectoryTool(workspace_root=Path("/ws"))
result = await tool.execute(arguments={})

Initialize the list-directory tool.

Parameters:

Name Type Description Default
workspace_root Path

Root directory bounding file access.

required
Source code in src/synthorg/tools/file_system/list_directory.py
def __init__(self, *, workspace_root: Path) -> None:
    """Initialize the list-directory tool.

    Args:
        workspace_root: Root directory bounding file access.
    """
    super().__init__(
        workspace_root=workspace_root,
        name="list_directory",
        action_type=ActionType.CODE_READ,
        description=(
            "List files and directories. Supports glob filtering "
            "and recursive listing."
        ),
        parameters_schema={
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": (
                        'Directory path relative to workspace (default ".")'
                    ),
                    "default": ".",
                },
                "pattern": {
                    "type": "string",
                    "description": 'Glob filter (e.g. "*.py")',
                },
                "recursive": {
                    "type": "boolean",
                    "description": "Recursive listing (default false)",
                    "default": False,
                },
            },
            "additionalProperties": False,
        },
    )

execute async

execute(*, arguments)

List directory contents.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Optionally contains path, pattern, and recursive.

required

Returns:

Type Description
ToolExecutionResult

A ToolExecutionResult with the listing or an error.

Source code in src/synthorg/tools/file_system/list_directory.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """List directory contents.

    Args:
        arguments: Optionally contains ``path``, ``pattern``,
            and ``recursive``.

    Returns:
        A ``ToolExecutionResult`` with the listing or an error.
    """
    user_path: str = arguments.get("path", ".")
    pattern: str | None = arguments.get("pattern")
    recursive: bool = arguments.get("recursive", False)

    if err := self._validate_list_args(pattern, recursive=recursive):
        return err

    resolved_or_err = self._resolve_and_check_dir(user_path)
    if isinstance(resolved_or_err, ToolExecutionResult):
        return resolved_or_err

    try:
        lines, raw_capped = await asyncio.to_thread(
            _list_sync,
            resolved_or_err,
            self.workspace_root,
            pattern,
            recursive=recursive,
        )
    except PermissionError:
        logger.warning(
            TOOL_FS_ERROR,
            path=user_path,
            error="permission_denied",
        )
        return ToolExecutionResult(
            content=f"Permission denied: {user_path}",
            is_error=True,
        )
    except OSError as exc:
        logger.warning(TOOL_FS_ERROR, path=user_path, error=str(exc))
        return ToolExecutionResult(
            content=f"OS error listing directory: {user_path}",
            is_error=True,
        )

    output, metadata = self._format_listing_result(
        user_path,
        lines,
        raw_capped=raw_capped,
    )
    logger.info(
        TOOL_FS_LIST,
        path=user_path,
        total_entries=metadata["total_entries"],
        directories=metadata["directories"],
        files=metadata["files"],
    )
    return ToolExecutionResult(content=output, metadata=metadata)

MCP Bridge

config

MCP bridge configuration models.

Defines MCPServerConfig for individual MCP server connections and MCPConfig as the top-level container. Both are frozen Pydantic models following the project's immutability conventions.

MCPServerConfig pydantic-model

Bases: BaseModel

Configuration for a single MCP server connection.

Attributes:

Name Type Description
name NotBlankStr

Unique server identifier.

transport Literal['stdio', 'streamable_http']

Transport type ("stdio" or "streamable_http").

command NotBlankStr | None

Command to launch a stdio server.

args tuple[str, ...]

Command-line arguments for stdio server.

env dict[str, str]

Environment variables for stdio server.

url NotBlankStr | None

URL for streamable HTTP server.

headers dict[str, str]

HTTP headers for streamable HTTP server.

enabled_tools tuple[NotBlankStr, ...] | None

Allowlist of tool names (None = all).

disabled_tools tuple[NotBlankStr, ...]

Denylist of tool names.

timeout_seconds float

Timeout for tool invocations.

connect_timeout_seconds float

Timeout for initial connection.

result_cache_ttl_seconds float

TTL for result cache entries.

result_cache_max_size int

Maximum result cache entries.

enabled bool

Whether the server is active.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_transport_fields
  • _validate_tool_filters

name pydantic-field

name

Unique server identifier

transport pydantic-field

transport

Transport type: stdio or streamable_http

command pydantic-field

command = None

Command to launch a stdio server

args pydantic-field

args = ()

Command-line arguments for stdio server

env pydantic-field

env

Environment variables for stdio server

url pydantic-field

url = None

URL for streamable HTTP server

headers pydantic-field

headers

HTTP headers for streamable HTTP server

enabled_tools pydantic-field

enabled_tools = None

Allowlist of tool names (None = all)

disabled_tools pydantic-field

disabled_tools = ()

Denylist of tool names

timeout_seconds pydantic-field

timeout_seconds = 30.0

Timeout for tool invocations in seconds

connect_timeout_seconds pydantic-field

connect_timeout_seconds = 10.0

Timeout for initial connection in seconds

result_cache_ttl_seconds pydantic-field

result_cache_ttl_seconds = 60.0

TTL for result cache entries in seconds

result_cache_max_size pydantic-field

result_cache_max_size = 256

Maximum result cache entries

enabled pydantic-field

enabled = True

Whether the server is active

MCPConfig pydantic-model

Bases: BaseModel

Top-level MCP bridge configuration.

Attributes:

Name Type Description
servers tuple[MCPServerConfig, ...]

Tuple of MCP server configurations.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_unique_server_names

servers pydantic-field

servers = ()

MCP server configurations

models

MCP bridge internal value objects.

Defines MCPToolInfo for discovered tool metadata and MCPRawResult for raw MCP call results before mapping.

MCPToolInfo pydantic-model

Bases: BaseModel

Discovered tool metadata from an MCP server.

Attributes:

Name Type Description
name NotBlankStr

Tool name as reported by the server.

description str

Human-readable tool description.

input_schema dict[str, Any]

JSON Schema for tool parameters.

server_name NotBlankStr

Name of the server that hosts this tool.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

name pydantic-field

name

Tool name

description pydantic-field

description = ''

Human-readable tool description

input_schema pydantic-field

input_schema

JSON Schema for tool parameters

server_name pydantic-field

server_name

Name of the hosting MCP server

MCPRawResult pydantic-model

Bases: BaseModel

Raw result from an MCP tool call before mapping.

Attributes:

Name Type Description
content tuple[Any, ...]

MCP content blocks from the call result.

is_error bool

Whether the MCP call reported an error.

structured_content dict[str, Any] | None

Optional structured content from the result.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

content pydantic-field

content = ()

MCP content blocks

is_error pydantic-field

is_error = False

Whether the MCP call reported an error

structured_content pydantic-field

structured_content = None

Optional structured content from the result

client

MCP client -- thin async wrapper over the MCP SDK.

Manages a single connection to an MCP server and provides tool discovery and invocation through the MCP protocol.

MCPClient

MCPClient(config)

Async client for a single MCP server.

Wraps the MCP SDK's ClientSession to provide connection management, tool discovery, and tool invocation. A lock serializes all session access to prevent interleaving.

Parameters:

Name Type Description Default
config MCPServerConfig

Server connection configuration.

required
Source code in src/synthorg/tools/mcp/client.py
def __init__(self, config: MCPServerConfig) -> None:
    self._config = config
    self._session: ClientSession | None = None
    self._exit_stack: AsyncExitStack | None = None
    self._lock = asyncio.Lock()

config property

config

Server connection configuration (read-only).

is_connected property

is_connected

Whether the client has an active session.

server_name property

server_name

Name of the configured server.

connect async

connect()

Establish a connection to the MCP server.

Raises:

Type Description
MCPConnectionError

If the connection fails.

RuntimeError

If already connected.

Source code in src/synthorg/tools/mcp/client.py
async def connect(self) -> None:
    """Establish a connection to the MCP server.

    Raises:
        MCPConnectionError: If the connection fails.
        RuntimeError: If already connected.
    """
    async with self._lock:
        if self._session is not None:
            msg = f"Already connected to {self._config.name!r}"
            logger.warning(
                MCP_CLIENT_CONNECTION_FAILED,
                server=self._config.name,
                error=msg,
            )
            raise RuntimeError(msg)
        logger.info(
            MCP_CLIENT_CONNECTING,
            server=self._config.name,
            transport=self._config.transport,
        )
        stack = AsyncExitStack()
        await stack.__aenter__()
        try:
            coro = self._connect_with_stack(stack)
            session = await asyncio.wait_for(
                coro,
                timeout=self._config.connect_timeout_seconds,
            )
            self._session = session
            self._exit_stack = stack
            logger.info(
                MCP_CLIENT_CONNECTED,
                server=self._config.name,
            )
        except TimeoutError as exc:
            await stack.aclose()
            msg = (
                f"Connection to {self._config.name!r} timed out "
                f"after {self._config.connect_timeout_seconds}s"
            )
            logger.warning(
                MCP_CLIENT_CONNECTION_FAILED,
                server=self._config.name,
                error=msg,
            )
            raise MCPConnectionError(
                msg,
                context={
                    "server": self._config.name,
                    "transport": self._config.transport,
                },
            ) from exc
        except MCPConnectionError:
            await stack.aclose()
            raise
        except Exception as exc:
            await stack.aclose()
            logger.exception(
                MCP_CLIENT_CONNECTION_FAILED,
                server=self._config.name,
                error=str(exc),
            )
            msg = f"Failed to connect to {self._config.name!r}: {exc}"
            raise MCPConnectionError(
                msg,
                context={
                    "server": self._config.name,
                    "transport": self._config.transport,
                },
            ) from exc
        except BaseException:
            # CancelledError, KeyboardInterrupt -- still close the stack
            await stack.aclose()
            raise

disconnect async

disconnect()

Close the connection and release resources.

Source code in src/synthorg/tools/mcp/client.py
async def disconnect(self) -> None:
    """Close the connection and release resources."""
    async with self._lock:
        if self._exit_stack is not None:
            try:
                await self._exit_stack.aclose()
            except Exception as exc:
                logger.warning(
                    MCP_CLIENT_DISCONNECT_FAILED,
                    server=self._config.name,
                    error=str(exc),
                )
            else:
                logger.info(
                    MCP_CLIENT_DISCONNECTED,
                    server=self._config.name,
                )
            finally:
                self._session = None
                self._exit_stack = None

list_tools async

list_tools()

Discover tools from the connected server.

Applies enabled_tools / disabled_tools filters from the server configuration.

Returns:

Type Description
tuple[MCPToolInfo, ...]

Filtered tuple of discovered tool metadata.

Raises:

Type Description
MCPDiscoveryError

If discovery fails.

Source code in src/synthorg/tools/mcp/client.py
async def list_tools(self) -> tuple[MCPToolInfo, ...]:
    """Discover tools from the connected server.

    Applies ``enabled_tools`` / ``disabled_tools`` filters
    from the server configuration.

    Returns:
        Filtered tuple of discovered tool metadata.

    Raises:
        MCPDiscoveryError: If discovery fails.
    """
    async with self._lock:
        session = self._require_session()
        logger.info(
            MCP_DISCOVERY_START,
            server=self._config.name,
        )
        try:
            result = await session.list_tools()
        except Exception as exc:
            logger.exception(
                MCP_DISCOVERY_FAILED,
                server=self._config.name,
                error=str(exc),
            )
            msg = f"Tool discovery failed for {self._config.name!r}: {exc}"
            raise MCPDiscoveryError(
                msg,
                context={"server": self._config.name},
            ) from exc

    tools = tuple(
        MCPToolInfo(
            name=t.name,
            description=t.description or "",
            input_schema=(copy.deepcopy(t.inputSchema) if t.inputSchema else {}),
            server_name=self._config.name,
        )
        for t in result.tools
    )

    filtered = self._apply_filters(tools)
    logger.info(
        MCP_DISCOVERY_COMPLETE,
        server=self._config.name,
        total=len(tools),
        after_filter=len(filtered),
    )
    return filtered

call_tool async

call_tool(tool_name, arguments)

Invoke a tool on the connected server.

Acquires the session lock to respect MCP's sequential protocol constraint. Applies the configured timeout.

Parameters:

Name Type Description Default
tool_name str

Name of the tool to invoke.

required
arguments dict[str, Any]

Arguments to pass to the tool.

required

Returns:

Type Description
MCPRawResult

Raw result from the MCP server.

Raises:

Type Description
MCPTimeoutError

If the invocation times out.

MCPInvocationError

If the invocation fails.

Source code in src/synthorg/tools/mcp/client.py
async def call_tool(
    self,
    tool_name: str,
    arguments: dict[str, Any],
) -> MCPRawResult:
    """Invoke a tool on the connected server.

    Acquires the session lock to respect MCP's sequential
    protocol constraint. Applies the configured timeout.

    Args:
        tool_name: Name of the tool to invoke.
        arguments: Arguments to pass to the tool.

    Returns:
        Raw result from the MCP server.

    Raises:
        MCPTimeoutError: If the invocation times out.
        MCPInvocationError: If the invocation fails.
    """
    logger.debug(
        MCP_INVOKE_START,
        server=self._config.name,
        tool=tool_name,
    )
    async with self._lock:
        session = self._require_session()
        try:
            result = await asyncio.wait_for(
                session.call_tool(tool_name, arguments),
                timeout=self._config.timeout_seconds,
            )
        except TimeoutError as exc:
            logger.warning(
                MCP_INVOKE_TIMEOUT,
                server=self._config.name,
                tool=tool_name,
                timeout=self._config.timeout_seconds,
            )
            msg = f"Tool {tool_name!r} timed out on {self._config.name!r}"
            raise MCPTimeoutError(
                msg,
                context={
                    "server": self._config.name,
                    "tool": tool_name,
                    "timeout": self._config.timeout_seconds,
                },
            ) from exc
        except Exception as exc:
            logger.exception(
                MCP_INVOKE_FAILED,
                server=self._config.name,
                tool=tool_name,
                error=str(exc),
            )
            msg = f"Tool {tool_name!r} failed on {self._config.name!r}: {exc}"
            raise MCPInvocationError(
                msg,
                context={
                    "server": self._config.name,
                    "tool": tool_name,
                },
            ) from exc

    logger.info(
        MCP_INVOKE_SUCCESS,
        server=self._config.name,
        tool=tool_name,
    )
    return MCPRawResult(
        content=tuple(result.content),
        is_error=result.isError or False,
        structured_content=(
            copy.deepcopy(result.structuredContent)
            if result.structuredContent is not None
            else None
        ),
    )

reconnect async

reconnect()

Disconnect and reconnect to the server.

Raises:

Type Description
MCPConnectionError

If the reconnection fails.

Source code in src/synthorg/tools/mcp/client.py
async def reconnect(self) -> None:
    """Disconnect and reconnect to the server.

    Raises:
        MCPConnectionError: If the reconnection fails.
    """
    logger.info(
        MCP_CLIENT_RECONNECTING,
        server=self._config.name,
    )
    await self.disconnect()
    await self.connect()

__aenter__ async

__aenter__()

Enter async context: connect to server.

Source code in src/synthorg/tools/mcp/client.py
async def __aenter__(self) -> Self:
    """Enter async context: connect to server."""
    await self.connect()
    return self

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

Exit async context: disconnect from server.

Source code in src/synthorg/tools/mcp/client.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Exit async context: disconnect from server."""
    await self.disconnect()

bridge_tool

MCP bridge tool -- wraps an MCP server tool as a BaseTool.

Each MCPBridgeTool instance represents a single tool discovered from an MCP server, bridging MCP protocol calls into the internal tool system.

MCPBridgeTool

MCPBridgeTool(*, tool_info, client, cache=None)

Bases: BaseTool

Bridge between an MCP server tool and the internal tool system.

Constructs a BaseTool whose execute delegates to an MCP server via MCPClient. An optional MCPResultCache avoids redundant remote calls for identical invocations.

Parameters:

Name Type Description Default
tool_info MCPToolInfo

Discovered MCP tool metadata.

required
client MCPClient

Connected MCP client for the server.

required
cache MCPResultCache | None

Optional result cache.

None
Source code in src/synthorg/tools/mcp/bridge_tool.py
def __init__(
    self,
    *,
    tool_info: MCPToolInfo,
    client: MCPClient,
    cache: MCPResultCache | None = None,
) -> None:
    super().__init__(
        name=f"mcp_{tool_info.server_name}_{tool_info.name}",
        description=tool_info.description,
        parameters_schema=tool_info.input_schema or None,
        category=ToolCategory.MCP,
    )
    self._client = client
    self._tool_info = tool_info
    self._cache = cache

tool_info property

tool_info

The underlying MCP tool metadata.

execute async

execute(*, arguments)

Execute the MCP tool via the client.

Checks the cache first (if available). On cache miss, invokes the remote tool and stores the result.

Parameters:

Name Type Description Default
arguments dict[str, Any]

Tool invocation arguments.

required

Returns:

Type Description
ToolExecutionResult

Mapped ToolExecutionResult.

Source code in src/synthorg/tools/mcp/bridge_tool.py
async def execute(
    self,
    *,
    arguments: dict[str, Any],
) -> ToolExecutionResult:
    """Execute the MCP tool via the client.

    Checks the cache first (if available). On cache miss,
    invokes the remote tool and stores the result.

    Args:
        arguments: Tool invocation arguments.

    Returns:
        Mapped ``ToolExecutionResult``.
    """
    cached = self._check_cache(arguments)
    if cached is not None:
        return cached

    result = await self._invoke(arguments)
    self._store_in_cache(arguments, result)
    return result

Sandbox

protocol

Sandbox backend protocol definition.

SandboxBackend

Bases: Protocol

Protocol for pluggable sandbox backends.

Implementations execute commands in an isolated environment with environment filtering, workspace enforcement, and timeout support. Subprocess and Docker are built-in backends.

execute async

execute(*, command, args, cwd=None, env_overrides=None, timeout=None)

Execute a command in the sandbox.

Parameters:

Name Type Description Default
command str

Executable name or path.

required
args tuple[str, ...]

Command arguments.

required
cwd Path | None

Working directory (defaults to sandbox workspace root).

None
env_overrides Mapping[str, str] | None

Extra environment variables for the sandbox.

None
timeout float | None

Seconds before the process is killed. Falls back to the backend's default timeout if None.

None

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Raises:

Type Description
SandboxStartError

If the subprocess could not be started.

SandboxError

If cwd is outside the workspace boundary.

Source code in src/synthorg/tools/sandbox/protocol.py
async def execute(
    self,
    *,
    command: str,
    args: tuple[str, ...],
    cwd: Path | None = None,
    env_overrides: Mapping[str, str] | None = None,
    timeout: float | None = None,  # noqa: ASYNC109
) -> SandboxResult:
    """Execute a command in the sandbox.

    Args:
        command: Executable name or path.
        args: Command arguments.
        cwd: Working directory (defaults to sandbox workspace root).
        env_overrides: Extra environment variables for the sandbox.
        timeout: Seconds before the process is killed. Falls back
            to the backend's default timeout if ``None``.

    Returns:
        A ``SandboxResult`` with captured output and exit status.

    Raises:
        SandboxStartError: If the subprocess could not be started.
        SandboxError: If cwd is outside the workspace boundary.
    """
    ...

cleanup async

cleanup()

Release any resources held by the backend.

Returns:

Type Description
None

Nothing.

Source code in src/synthorg/tools/sandbox/protocol.py
async def cleanup(self) -> None:
    """Release any resources held by the backend.

    Returns:
        Nothing.
    """
    ...

health_check async

health_check()

Return True if the backend is operational.

Returns:

Type Description
bool

True if healthy, False otherwise.

Source code in src/synthorg/tools/sandbox/protocol.py
async def health_check(self) -> bool:
    """Return ``True`` if the backend is operational.

    Returns:
        ``True`` if healthy, ``False`` otherwise.
    """
    ...

get_backend_type

get_backend_type()

Return a short identifier for this backend type.

Returns:

Type Description
NotBlankStr

A string like 'subprocess' or 'docker'.

Source code in src/synthorg/tools/sandbox/protocol.py
def get_backend_type(self) -> NotBlankStr:
    """Return a short identifier for this backend type.

    Returns:
        A string like ``'subprocess'`` or ``'docker'``.
    """
    ...

config

Subprocess sandbox configuration model.

SubprocessSandboxConfig pydantic-model

Bases: BaseModel

Configuration for the subprocess sandbox backend.

Attributes:

Name Type Description
timeout_seconds float

Default command timeout in seconds.

workspace_only bool

When enabled, rejects commands whose working directory falls outside the workspace boundary.

restricted_path bool

When enabled, filters PATH entries to retain only known safe system directories.

env_allowlist tuple[str, ...]

Environment variable names allowed to pass through. Supports LC_* as a glob for all locale variables.

env_denylist_patterns tuple[str, ...]

fnmatch patterns to strip even if in the allowlist (e.g. *KEY* catches API_KEY). Includes secret-name heuristics and library injection vars.

extra_safe_path_prefixes tuple[str, ...]

Additional non-empty absolute PATH prefixes appended to platform defaults for the PATH filter.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • timeout_seconds (float)
  • workspace_only (bool)
  • restricted_path (bool)
  • env_allowlist (tuple[str, ...])
  • env_denylist_patterns (tuple[str, ...])
  • extra_safe_path_prefixes (tuple[str, ...])

Validators:

extra_safe_path_prefixes pydantic-field

extra_safe_path_prefixes = ()

Additional safe PATH prefixes appended to platform defaults.

Use this to allow tool-specific directories (e.g. a custom Git install location) through the PATH filter without modifying the built-in platform defaults.

result

Sandbox execution result model.

SandboxResult pydantic-model

Bases: BaseModel

Immutable result of a sandboxed command execution.

Attributes:

Name Type Description
stdout str

Captured standard output.

stderr str

Captured standard error.

returncode int

Process exit code.

timed_out bool

Whether the process was killed due to timeout.

success bool

Computed -- True when returncode is 0 and not timed out.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

  • stdout (str)
  • stderr (str)
  • returncode (int)
  • timed_out (bool)

success property

success

Whether the execution succeeded.

subprocess_sandbox

Subprocess-based sandbox backend.

Executes commands via asyncio.create_subprocess_exec with environment filtering, workspace boundary enforcement, timeout management, and PATH restriction.

SubprocessSandbox

SubprocessSandbox(*, config=None, workspace)

Subprocess sandbox backend.

Runs commands in child processes with filtered environment variables, workspace boundary checks, and configurable timeouts.

Attributes:

Name Type Description
config SubprocessSandboxConfig

Sandbox configuration.

workspace Path

Absolute path to the workspace root directory.

Initialize the subprocess sandbox.

Parameters:

Name Type Description Default
config SubprocessSandboxConfig | None

Sandbox configuration (defaults to standard config).

None
workspace Path

Absolute path to the workspace root. Must exist.

required

Raises:

Type Description
ValueError

If workspace is not absolute or does not exist.

Source code in src/synthorg/tools/sandbox/subprocess_sandbox.py
def __init__(
    self,
    *,
    config: SubprocessSandboxConfig | None = None,
    workspace: Path,
) -> None:
    """Initialize the subprocess sandbox.

    Args:
        config: Sandbox configuration (defaults to standard config).
        workspace: Absolute path to the workspace root. Must exist.

    Raises:
        ValueError: If *workspace* is not absolute or does not exist.
    """
    if not workspace.is_absolute():
        logger.warning(
            SANDBOX_WORKSPACE_VIOLATION,
            workspace=str(workspace),
            error="workspace must be an absolute path",
        )
        msg = f"workspace must be an absolute path, got: {workspace}"
        raise ValueError(msg)
    resolved = workspace.resolve()
    if not resolved.is_dir():
        logger.warning(
            SANDBOX_WORKSPACE_VIOLATION,
            workspace=str(resolved),
            error="workspace directory does not exist",
        )
        msg = f"workspace directory does not exist: {resolved}"
        raise ValueError(msg)
    self._config = config or _DEFAULT_CONFIG
    self._workspace = resolved

config property

config

Sandbox configuration.

workspace property

workspace

Workspace root directory.

execute async

execute(*, command, args, cwd=None, env_overrides=None, timeout=None)

Execute a command in the sandbox.

Parameters:

Name Type Description Default
command str

Executable name or path.

required
args tuple[str, ...]

Command arguments.

required
cwd Path | None

Working directory (defaults to workspace root).

None
env_overrides Mapping[str, str] | None

Extra env vars applied on top of filtered env.

None
timeout float | None

Seconds before the process is killed.

None

Returns:

Type Description
SandboxResult

A SandboxResult with captured output and exit status.

Raises:

Type Description
SandboxStartError

If the subprocess could not be started.

SandboxError

If cwd is outside the workspace boundary or if no safe PATH directories can be determined.

Source code in src/synthorg/tools/sandbox/subprocess_sandbox.py
async def execute(
    self,
    *,
    command: str,
    args: tuple[str, ...],
    cwd: Path | None = None,
    env_overrides: Mapping[str, str] | None = None,
    timeout: float | None = None,  # noqa: ASYNC109
) -> SandboxResult:
    """Execute a command in the sandbox.

    Args:
        command: Executable name or path.
        args: Command arguments.
        cwd: Working directory (defaults to workspace root).
        env_overrides: Extra env vars applied on top of filtered env.
        timeout: Seconds before the process is killed.

    Returns:
        A ``SandboxResult`` with captured output and exit status.

    Raises:
        SandboxStartError: If the subprocess could not be started.
        SandboxError: If cwd is outside the workspace boundary or
            if no safe PATH directories can be determined.
    """
    work_dir = cwd if cwd is not None else self._workspace
    self._validate_cwd(work_dir)

    effective_timeout = (
        timeout if timeout is not None else self._config.timeout_seconds
    )
    env = self._build_filtered_env(env_overrides)

    logger.debug(
        SANDBOX_EXECUTE_START,
        command=command,
        args=_redact_args(args),
        cwd=str(work_dir),
        timeout=effective_timeout,
    )

    proc = await self._spawn_process(command, args, work_dir, env)
    try:
        (
            stdout_bytes,
            stderr_bytes,
            timed_out,
        ) = await self._communicate_with_timeout(
            proc,
            command,
            args,
            effective_timeout,
        )
    finally:
        self._close_process(proc)

    stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
    stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
    returncode = proc.returncode if proc.returncode is not None else -1

    if timed_out:
        return SandboxResult(
            stdout=stdout,
            stderr=(stderr or f"Process timed out after {effective_timeout}s"),
            returncode=returncode,
            timed_out=True,
        )

    if returncode != 0:
        logger.warning(
            SANDBOX_EXECUTE_FAILED,
            command=command,
            args=_redact_args(args),
            returncode=returncode,
            stderr=stderr,
        )
    else:
        logger.debug(
            SANDBOX_EXECUTE_SUCCESS,
            command=command,
            args=_redact_args(args),
        )

    return SandboxResult(
        stdout=stdout,
        stderr=stderr,
        returncode=returncode,
    )

cleanup async

cleanup()

Subprocesses are ephemeral -- no resources to release.

Source code in src/synthorg/tools/sandbox/subprocess_sandbox.py
async def cleanup(self) -> None:
    """Subprocesses are ephemeral -- no resources to release."""
    logger.debug(SANDBOX_CLEANUP, backend="subprocess")

health_check async

health_check()

Return True if the workspace directory exists.

Source code in src/synthorg/tools/sandbox/subprocess_sandbox.py
async def health_check(self) -> bool:
    """Return ``True`` if the workspace directory exists."""
    healthy = self._workspace.is_dir()
    logger.debug(
        SANDBOX_HEALTH_CHECK,
        backend="subprocess",
        healthy=healthy,
        workspace=str(self._workspace),
    )
    return healthy

get_backend_type

get_backend_type()

Return 'subprocess'.

Source code in src/synthorg/tools/sandbox/subprocess_sandbox.py
def get_backend_type(self) -> NotBlankStr:
    """Return ``'subprocess'``."""
    return NotBlankStr("subprocess")

errors

Sandbox error hierarchy.

All sandbox errors inherit from ToolError so that sandbox failures surface through the standard tool error path.

SandboxError

SandboxError(message, *, context=None)

Bases: ToolError

Base exception for sandbox-layer errors.

Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

SandboxTimeoutError

SandboxTimeoutError(message, *, context=None)

Bases: SandboxError

Execution was killed because it exceeded the timeout.

Reserved for sandbox backends that need to signal timeout as an exception rather than a result flag. Currently unused -- both subprocess and Docker return SandboxResult.timed_out instead.

Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)

SandboxStartError

SandboxStartError(message, *, context=None)

Bases: SandboxError

Failed to start the sandbox execution environment.

Source code in src/synthorg/tools/errors.py
def __init__(
    self,
    message: str,
    *,
    context: dict[str, Any] | None = None,
) -> None:
    """Initialize a tool error.

    Args:
        message: Human-readable error description.
        context: Arbitrary metadata about the error. Stored as an
            immutable mapping; defaults to empty if not provided.
    """
    self.message = message
    self.context: MappingProxyType[str, Any] = MappingProxyType(
        dict(context) if context else {},
    )
    super().__init__(message)