Skip to content

HR

Agent lifecycle management: hiring, firing, onboarding, offboarding, performance tracking, and promotion/demotion.

Models

models

HR domain models.

Frozen Pydantic models for hiring, firing, onboarding, offboarding, and agent lifecycle events.

CandidateCard pydantic-model

Bases: BaseModel

Generated candidate for a hiring request.

Attributes:

Name Type Description
id NotBlankStr

Unique candidate identifier.

name NotBlankStr

Proposed agent name.

role NotBlankStr

Proposed role.

department NotBlankStr

Target department.

level SeniorityLevel

Proposed seniority level.

skills tuple[Skill, ...]

Agent skills.

rationale NotBlankStr

Why this candidate was generated.

estimated_monthly_cost float

Estimated monthly cost in the configured currency.

template_source NotBlankStr | None

Template used for generation, if any.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

id pydantic-field

id

Unique candidate identifier

name pydantic-field

name

Proposed agent name

role pydantic-field

role

Proposed role

department pydantic-field

department

Target department

level pydantic-field

level

Proposed seniority level

skills pydantic-field

skills = ()

Agent skills

rationale pydantic-field

rationale

Generation rationale

estimated_monthly_cost pydantic-field

estimated_monthly_cost

Estimated monthly cost in the configured currency

template_source pydantic-field

template_source = None

Template used for generation

HiringRequest pydantic-model

Bases: BaseModel

Request to hire a new agent.

Attributes:

Name Type Description
id NotBlankStr

Unique request identifier.

requested_by NotBlankStr

Agent or human who initiated the request.

department NotBlankStr

Target department.

role NotBlankStr

Desired role.

level SeniorityLevel

Desired seniority level.

required_skills tuple[NotBlankStr, ...]

Skills the candidate must have.

reason NotBlankStr

Business justification.

budget_limit_monthly float | None

Maximum monthly cost in the configured currency, if constrained.

template_name NotBlankStr | None

Template to use for candidate generation.

status HiringRequestStatus

Current request status.

created_at AwareDatetime

When the request was created.

candidates tuple[CandidateCard, ...]

Generated candidate cards.

selected_candidate_id NotBlankStr | None

ID of the chosen candidate.

approval_id NotBlankStr | None

ID of the associated approval item.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_status_candidate_consistency

id pydantic-field

id

Unique request identifier

requested_by pydantic-field

requested_by

Request initiator

department pydantic-field

department

Target department

role pydantic-field

role

Desired role

level pydantic-field

level

Desired seniority level

required_skills pydantic-field

required_skills = ()

Required skills

reason pydantic-field

reason

Business justification

budget_limit_monthly pydantic-field

budget_limit_monthly = None

Maximum monthly cost in the configured currency, if constrained

template_name pydantic-field

template_name = None

Template for candidate generation

status pydantic-field

status = PENDING

Current request status

created_at pydantic-field

created_at

When the request was created

candidates pydantic-field

candidates = ()

Generated candidate cards

selected_candidate_id pydantic-field

selected_candidate_id = None

Chosen candidate ID

approval_id pydantic-field

approval_id = None

Associated approval item ID

FiringRequest pydantic-model

Bases: BaseModel

Request to terminate an agent.

Attributes:

Name Type Description
id NotBlankStr

Unique request identifier.

agent_id NotBlankStr

Agent to be terminated.

agent_name NotBlankStr

Agent's display name.

reason FiringReason

Reason for termination.

requested_by NotBlankStr

Initiator of the firing.

details str

Additional context.

created_at AwareDatetime

When the request was created.

completed_at AwareDatetime | None

When the firing was completed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_temporal_order

id pydantic-field

id

Unique request identifier

agent_id pydantic-field

agent_id

Agent to terminate

agent_name pydantic-field

agent_name

Agent display name

reason pydantic-field

reason

Reason for termination

requested_by pydantic-field

requested_by

Firing initiator

details pydantic-field

details = ''

Additional context

created_at pydantic-field

created_at

When the request was created

completed_at pydantic-field

completed_at = None

When the firing was completed

OnboardingStepRecord pydantic-model

Bases: BaseModel

Record of a single onboarding step.

Attributes:

Name Type Description
step OnboardingStep

The onboarding step.

completed bool

Whether this step is complete.

completed_at AwareDatetime | None

When this step was completed.

notes str

Optional notes from the step.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_completed_consistency

step pydantic-field

step

The onboarding step

completed pydantic-field

completed = False

Whether step is complete

completed_at pydantic-field

completed_at = None

When completed

notes pydantic-field

notes = ''

Step notes

OnboardingChecklist pydantic-model

Bases: BaseModel

Agent onboarding checklist tracking all steps.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent being onboarded.

steps tuple[OnboardingStepRecord, ...]

Individual step records.

started_at AwareDatetime

When onboarding began.

completed_at AwareDatetime | None

When all steps were completed.

is_complete bool

Whether all steps are done (computed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_completion_consistency

agent_id pydantic-field

agent_id

Agent being onboarded

steps pydantic-field

steps

Individual step records

started_at pydantic-field

started_at

When onboarding began

completed_at pydantic-field

completed_at = None

When all steps were completed

is_complete property

is_complete

Whether all onboarding steps are completed.

OffboardingRecord pydantic-model

Bases: BaseModel

Record of a completed offboarding process.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent who was offboarded.

agent_name NotBlankStr

Agent's display name.

firing_request_id NotBlankStr

Associated firing request.

tasks_reassigned tuple[NotBlankStr, ...]

IDs of reassigned tasks.

memory_archive_id NotBlankStr | None

ID of the memory archive, if created.

org_memories_promoted int

Number of memories promoted to org.

team_notification_sent bool

Whether team was notified.

started_at AwareDatetime

When offboarding started.

completed_at AwareDatetime

When offboarding finished.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_temporal_order

agent_id pydantic-field

agent_id

Agent who was offboarded

agent_name pydantic-field

agent_name

Agent display name

firing_request_id pydantic-field

firing_request_id

Associated firing request

tasks_reassigned pydantic-field

tasks_reassigned = ()

IDs of reassigned tasks

memory_archive_id pydantic-field

memory_archive_id = None

Memory archive ID

org_memories_promoted pydantic-field

org_memories_promoted = 0

Memories promoted to org

team_notification_sent pydantic-field

team_notification_sent = False

Whether team was notified

started_at pydantic-field

started_at

When offboarding started

completed_at pydantic-field

completed_at

When offboarding finished

AgentLifecycleEvent pydantic-model

Bases: BaseModel

Record of an agent lifecycle event.

Attributes:

Name Type Description
id NotBlankStr

Unique event identifier.

agent_id NotBlankStr

Agent the event relates to.

agent_name NotBlankStr

Agent's display name.

event_type LifecycleEventType

Type of lifecycle event.

timestamp AwareDatetime

When the event occurred.

initiated_by NotBlankStr

Who triggered the event.

details str

Human-readable event details.

metadata dict[str, str]

Additional structured metadata.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

id pydantic-field

id

Unique event identifier

agent_id pydantic-field

agent_id

Agent the event relates to

agent_name pydantic-field

agent_name

Agent display name

event_type pydantic-field

event_type

Type of lifecycle event

timestamp pydantic-field

timestamp

When the event occurred

initiated_by pydantic-field

initiated_by

Who triggered the event

details pydantic-field

details = ''

Event details

metadata pydantic-field

metadata

Additional structured metadata

Registry

registry

Agent registry service.

Hot-pluggable agent registry for tracking active agents, their identities, and lifecycle status transitions (D8.3).

AgentRegistryService

AgentRegistryService(versioning=None)

Hot-pluggable agent registry.

Coroutine-safe via asyncio.Lock within a single event loop. Stores agent identities keyed by agent ID (string form of UUID).

Source code in src/synthorg/hr/registry.py
def __init__(
    self,
    versioning: VersioningService[AgentIdentity] | None = None,
) -> None:
    self._agents: dict[str, AgentIdentity] = {}
    self._lock = asyncio.Lock()
    self._versioning = versioning

has_versioning property

has_versioning

Return True when a versioning service is attached.

Public predicate used by the app factory's startup wiring so it doesn't need to read the private _versioning slot.

clear async

clear()

Reset all registered agents.

Holds the same self._lock as register / unregister / update_* so a concurrent caller cannot observe a partial clear -- the registry is either fully empty or in the state the contending writer claimed.

New async test fixtures should call await registry.clear() directly. The legacy sync entry point used to live on this class as reset_for_test_sync; it has been moved to :mod:synthorg.hr.registry_testing so the lock-bypass cannot be invoked from production code by autocomplete. Sync pytest fixtures (tests/unit/api/conftest.py) call reset_registry_for_test_sync(registry) from that module.

Source code in src/synthorg/hr/registry.py
async def clear(self) -> None:
    """Reset all registered agents.

    Holds the same ``self._lock`` as ``register`` / ``unregister``
    / ``update_*`` so a concurrent caller cannot observe a partial
    clear -- the registry is either fully empty or in the state
    the contending writer claimed.

    New async test fixtures should call ``await registry.clear()``
    directly. The legacy sync entry point used to live on this
    class as ``reset_for_test_sync``; it has been moved to
    :mod:`synthorg.hr.registry_testing` so the lock-bypass cannot
    be invoked from production code by autocomplete. Sync pytest
    fixtures (``tests/unit/api/conftest.py``) call
    ``reset_registry_for_test_sync(registry)`` from that module.
    """
    async with self._lock:
        cleared_count = len(self._agents)
        self._agents.clear()
    logger.info(HR_REGISTRY_CLEARED, cleared_count=cleared_count)

bind_versioning

bind_versioning(versioning)

Attach a versioning service after construction.

Enables the app factory to construct the registry synchronously in create_app() and wire versioning later in on_startup(), after the persistence backend is connected (its identity_versions property requires connect() to have run).

Source code in src/synthorg/hr/registry.py
def bind_versioning(
    self,
    versioning: VersioningService[AgentIdentity],
) -> None:
    """Attach a versioning service after construction.

    Enables the app factory to construct the registry synchronously in
    ``create_app()`` and wire versioning later in ``on_startup()``, after
    the persistence backend is connected (its ``identity_versions``
    property requires ``connect()`` to have run).
    """
    self._versioning = versioning

register async

register(identity, *, saved_by='system')

Register a new agent.

Parameters:

Name Type Description Default
identity AgentIdentity

The agent identity to register.

required
saved_by str

Actor triggering the registration (recorded in version history). Defaults to "system".

'system'

Raises:

Type Description
AgentAlreadyRegisteredError

If the agent is already registered.

Source code in src/synthorg/hr/registry.py
async def register(
    self,
    identity: AgentIdentity,
    *,
    saved_by: str = "system",
) -> None:
    """Register a new agent.

    Args:
        identity: The agent identity to register.
        saved_by: Actor triggering the registration (recorded in
            version history).  Defaults to ``"system"``.

    Raises:
        AgentAlreadyRegisteredError: If the agent is already registered.
    """
    agent_key = str(identity.id)
    async with self._lock:
        if agent_key in self._agents:
            msg = f"Agent {identity.name!r} ({agent_key}) is already registered"
            logger.warning(
                HR_REGISTRY_AGENT_REGISTERED,
                agent_id=agent_key,
                error=msg,
            )
            raise AgentAlreadyRegisteredError(msg)
        self._agents[agent_key] = identity

    logger.info(
        HR_REGISTRY_AGENT_REGISTERED,
        agent_id=agent_key,
        agent_name=str(identity.name),
        status=identity.status.value,
    )
    await self._snapshot(identity, saved_by=saved_by)

unregister async

unregister(agent_id)

Remove an agent from the registry.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier to remove.

required

Returns:

Type Description
AgentIdentity

The removed agent identity.

Raises:

Type Description
AgentNotFoundError

If the agent is not found.

Source code in src/synthorg/hr/registry.py
async def unregister(self, agent_id: NotBlankStr) -> AgentIdentity:
    """Remove an agent from the registry.

    Args:
        agent_id: The agent identifier to remove.

    Returns:
        The removed agent identity.

    Raises:
        AgentNotFoundError: If the agent is not found.
    """
    async with self._lock:
        identity = self._agents.pop(str(agent_id), None)
    if identity is None:
        msg = f"Agent {agent_id!r} not found in registry"
        logger.warning(
            HR_REGISTRY_AGENT_REMOVED,
            agent_id=str(agent_id),
            error=msg,
        )
        raise AgentNotFoundError(msg)

    logger.info(
        HR_REGISTRY_AGENT_REMOVED,
        agent_id=str(agent_id),
        agent_name=str(identity.name),
    )
    return identity

get async

get(agent_id)

Retrieve an agent identity by ID.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required

Returns:

Type Description
AgentIdentity | None

The agent identity, or None if not found.

Source code in src/synthorg/hr/registry.py
async def get(self, agent_id: NotBlankStr) -> AgentIdentity | None:
    """Retrieve an agent identity by ID.

    Args:
        agent_id: The agent identifier.

    Returns:
        The agent identity, or None if not found.
    """
    async with self._lock:
        return self._agents.get(str(agent_id))

get_by_name async

get_by_name(name)

Retrieve an agent identity by name.

Parameters:

Name Type Description Default
name NotBlankStr

The agent name to search for.

required

Returns:

Type Description
AgentIdentity | None

The first matching agent, or None.

Source code in src/synthorg/hr/registry.py
async def get_by_name(self, name: NotBlankStr) -> AgentIdentity | None:
    """Retrieve an agent identity by name.

    Args:
        name: The agent name to search for.

    Returns:
        The first matching agent, or None.
    """
    async with self._lock:
        return find_by_name_ci(self._agents.values(), str(name))

get_by_names async

get_by_names(names)

Batch lookup preserving input order with None for misses.

Acquires the registry lock exactly once regardless of batch size. Fanning out N separate get_by_name calls (the old pattern) required N lock acquisitions and serialised each lookup under a shared lock; this batch method reduces that to a single acquisition.

Parameters:

Name Type Description Default
names tuple[NotBlankStr, ...]

Ordered tuple of agent names to resolve (case-insensitive).

required

Returns:

Type Description
AgentIdentity | None

Tuple of resolved identities in the same order as

...

names. Each entry is the first matching agent or

tuple[AgentIdentity | None, ...]

None if no agent has that name. When multiple

tuple[AgentIdentity | None, ...]

registered agents share the same name (case-insensitive),

tuple[AgentIdentity | None, ...]

the first-registered identity wins, matching

tuple[AgentIdentity | None, ...]

get_by_name semantics.

Raises:

Type Description
ValueError

If len(names) exceeds MAX_BATCH_NAMES_LOOKUP; the registry lock must not be held for an unbounded scan when callers forward user-supplied name lists.

Source code in src/synthorg/hr/registry.py
async def get_by_names(
    self,
    names: tuple[NotBlankStr, ...],
) -> tuple[AgentIdentity | None, ...]:
    """Batch lookup preserving input order with ``None`` for misses.

    Acquires the registry lock exactly once regardless of batch
    size.  Fanning out N separate ``get_by_name`` calls (the old
    pattern) required N lock acquisitions and serialised each
    lookup under a shared lock; this batch method reduces that to
    a single acquisition.

    Args:
        names: Ordered tuple of agent names to resolve
            (case-insensitive).

    Returns:
        Tuple of resolved identities in the same order as
        ``names``.  Each entry is the first matching agent or
        ``None`` if no agent has that name.  When multiple
        registered agents share the same name (case-insensitive),
        the first-registered identity wins, matching
        ``get_by_name`` semantics.

    Raises:
        ValueError: If ``len(names)`` exceeds
            ``MAX_BATCH_NAMES_LOOKUP``; the registry lock must not
            be held for an unbounded scan when callers forward
            user-supplied name lists.
    """
    if not names:
        return ()
    if len(names) > MAX_BATCH_NAMES_LOOKUP:
        msg = (
            f"get_by_names batch of {len(names)} exceeds "
            f"MAX_BATCH_NAMES_LOOKUP={MAX_BATCH_NAMES_LOOKUP}"
        )
        raise ValueError(msg)
    async with self._lock:
        by_normalised_name: dict[str, AgentIdentity] = {}
        for identity in self._agents.values():
            key = normalize_identifier(str(identity.name))
            # First registration wins on name collision, matching
            # ``get_by_name`` semantics (which routes through
            # ``find_by_name_ci`` -- casefold + whitespace strip).
            by_normalised_name.setdefault(key, identity)
        return tuple(
            by_normalised_name.get(normalize_identifier(str(name)))
            for name in names
        )

list_active async

list_active()

List all agents with ACTIVE status.

Returns:

Type Description
tuple[AgentIdentity, ...]

Tuple of active agent identities.

Source code in src/synthorg/hr/registry.py
async def list_active(self) -> tuple[AgentIdentity, ...]:
    """List all agents with ACTIVE status.

    Returns:
        Tuple of active agent identities.
    """
    async with self._lock:
        return tuple(
            a for a in self._agents.values() if a.status == AgentStatus.ACTIVE
        )

list_by_department async

list_by_department(department)

List agents in a specific department.

Parameters:

Name Type Description Default
department NotBlankStr

Department name to filter by.

required

Returns:

Type Description
tuple[AgentIdentity, ...]

Tuple of matching agent identities.

Source code in src/synthorg/hr/registry.py
async def list_by_department(
    self,
    department: NotBlankStr,
) -> tuple[AgentIdentity, ...]:
    """List agents in a specific department.

    Args:
        department: Department name to filter by.

    Returns:
        Tuple of matching agent identities.
    """
    async with self._lock:
        return tuple(
            a
            for a in self._agents.values()
            if compare_ci(str(a.department), str(department))
        )

update_status async

update_status(agent_id, status)

Update an agent's lifecycle status.

Emits HR_AGENT_STATUS_TRANSITIONED AFTER the registry write succeeds, carrying from_status / to_status / agent_id so observers can audit every persisted hop on the agent lifecycle. No-op transitions (status unchanged) skip the transition event but still log HR_REGISTRY_STATUS_UPDATED for the write itself.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required
status AgentStatus

New status.

required

Returns:

Type Description
AgentIdentity

Updated agent identity.

Raises:

Type Description
AgentNotFoundError

If the agent is not found.

Source code in src/synthorg/hr/registry.py
async def update_status(
    self,
    agent_id: NotBlankStr,
    status: AgentStatus,
) -> AgentIdentity:
    """Update an agent's lifecycle status.

    Emits ``HR_AGENT_STATUS_TRANSITIONED`` AFTER the registry
    write succeeds, carrying ``from_status`` / ``to_status`` /
    ``agent_id`` so observers can audit every persisted hop on
    the agent lifecycle.  No-op transitions (status unchanged)
    skip the transition event but still log
    ``HR_REGISTRY_STATUS_UPDATED`` for the write itself.

    Args:
        agent_id: The agent identifier.
        status: New status.

    Returns:
        Updated agent identity.

    Raises:
        AgentNotFoundError: If the agent is not found.
    """
    key = str(agent_id)
    async with self._lock:
        identity = self._agents.get(key)
        if identity is None:
            msg = f"Agent {agent_id!r} not found in registry"
            logger.warning(
                HR_REGISTRY_STATUS_UPDATED,
                agent_id=key,
                error=msg,
            )
            raise AgentNotFoundError(msg)
        from_status = identity.status
        updated = identity.model_copy(update={"status": status})
        self._agents[key] = updated

    logger.info(
        HR_REGISTRY_STATUS_UPDATED,
        agent_id=key,
        status=status.value,
    )
    if from_status != status:
        logger.info(
            HR_AGENT_STATUS_TRANSITIONED,
            agent_id=key,
            from_status=from_status.value,
            to_status=status.value,
        )
    return updated

update_identity async

update_identity(agent_id, **updates)

Update agent identity fields via model_copy(update=...).

Only fields in _UPDATABLE_FIELDS are accepted. Use update_status for status changes.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required
**updates Any

Fields to update on the AgentIdentity.

{}

Returns:

Type Description
AgentIdentity

Updated agent identity.

Raises:

Type Description
AgentNotFoundError

If the agent is not found.

ValueError

If any field is not in the allowlist.

Source code in src/synthorg/hr/registry.py
async def update_identity(
    self,
    agent_id: NotBlankStr,
    **updates: Any,
) -> AgentIdentity:
    """Update agent identity fields via model_copy(update=...).

    Only fields in ``_UPDATABLE_FIELDS`` are accepted.  Use
    ``update_status`` for status changes.

    Args:
        agent_id: The agent identifier.
        **updates: Fields to update on the AgentIdentity.

    Returns:
        Updated agent identity.

    Raises:
        AgentNotFoundError: If the agent is not found.
        ValueError: If any field is not in the allowlist.
    """
    disallowed = set(updates.keys()) - self._UPDATABLE_FIELDS
    if disallowed:
        msg = (
            f"Fields not allowed for update_identity: "
            f"{sorted(disallowed)}; allowed: {sorted(self._UPDATABLE_FIELDS)}"
        )
        logger.warning(
            HR_REGISTRY_IDENTITY_UPDATED,
            agent_id=str(agent_id),
            error=msg,
        )
        raise ValueError(msg)

    key = str(agent_id)
    async with self._lock:
        identity = self._agents.get(key)
        if identity is None:
            msg = f"Agent {agent_id!r} not found in registry"
            logger.warning(
                HR_REGISTRY_IDENTITY_UPDATED,
                agent_id=key,
                error=msg,
            )
            raise AgentNotFoundError(msg)
        updated = identity.model_copy(update=updates)
        self._agents[key] = updated

    logger.info(
        HR_REGISTRY_IDENTITY_UPDATED,
        agent_id=key,
        updated_fields=sorted(updates.keys()),
    )
    await self._snapshot(updated, saved_by=f"update_identity:{key}")
    return updated

evolve_identity async

evolve_identity(agent_id, evolved_identity, *, evolution_rationale)

Apply an evolved identity after evolution guards have passed.

Replaces the agent's identity wholesale. Unlike update_identity (which restricts to an allowlist), this method accepts any field changes because the evolution pipeline has already validated them through guards.

Immutable identifiers (id, name, department) must match the existing identity.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent to evolve.

required
evolved_identity AgentIdentity

The complete new identity.

required
evolution_rationale str

Human-readable reason (for audit).

required

Returns:

Type Description
AgentIdentity

The updated agent identity.

Raises:

Type Description
AgentNotFoundError

If agent not found.

ValueError

If immutable fields differ.

Source code in src/synthorg/hr/registry.py
async def evolve_identity(
    self,
    agent_id: NotBlankStr,
    evolved_identity: AgentIdentity,
    *,
    evolution_rationale: str,
) -> AgentIdentity:
    """Apply an evolved identity after evolution guards have passed.

    Replaces the agent's identity wholesale. Unlike
    ``update_identity`` (which restricts to an allowlist), this
    method accepts any field changes because the evolution pipeline
    has already validated them through guards.

    Immutable identifiers (``id``, ``name``, ``department``) must
    match the existing identity.

    Args:
        agent_id: The agent to evolve.
        evolved_identity: The complete new identity.
        evolution_rationale: Human-readable reason (for audit).

    Returns:
        The updated agent identity.

    Raises:
        AgentNotFoundError: If agent not found.
        ValueError: If immutable fields differ.
    """
    key = str(agent_id)
    async with self._lock:
        current = self._agents.get(key)
        if current is None:
            msg = f"Agent {agent_id!r} not found in registry"
            logger.warning(
                HR_REGISTRY_IDENTITY_EVOLVED,
                agent_id=key,
                error=msg,
            )
            raise AgentNotFoundError(msg)
        if str(evolved_identity.id) != str(current.id):
            msg = (
                f"evolved_identity.id {evolved_identity.id} "
                f"does not match current id {current.id}"
            )
            logger.warning(
                HR_REGISTRY_IDENTITY_EVOLVED,
                agent_id=key,
                error=msg,
            )
            raise ValueError(msg)
        if str(evolved_identity.name) != str(current.name):
            msg = "name cannot be changed during evolution"
            logger.warning(
                HR_REGISTRY_IDENTITY_EVOLVED,
                agent_id=key,
                error=msg,
            )
            raise ValueError(msg)
        if str(evolved_identity.department) != str(current.department):
            msg = "department cannot be changed during evolution"
            logger.warning(
                HR_REGISTRY_IDENTITY_EVOLVED,
                agent_id=key,
                error=msg,
            )
            raise ValueError(msg)
        self._agents[key] = evolved_identity

    logger.info(
        HR_REGISTRY_IDENTITY_EVOLVED,
        agent_id=key,
        agent_name=str(evolved_identity.name),
        evolution_rationale=evolution_rationale,
    )
    await self._snapshot(
        evolved_identity,
        saved_by=f"evolution:{evolution_rationale}",
    )
    return evolved_identity

apply_identity_update async

apply_identity_update(agent_id, updates, *, saved_by)

Mutate any allowed field on the registered identity.

Designed for the MCP write surface, which is privileged and must be able to update everything the REST API can. Only the truly-immutable identifiers (id, name, department) and the lifecycle status slot (which has its own update_status path) are rejected.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required
updates dict[str, Any]

Mapping of field name to new value.

required
saved_by str

Actor recorded in the version snapshot.

required

Returns:

Type Description
AgentIdentity

Updated agent identity (a new frozen instance).

Raises:

Type Description
AgentNotFoundError

If the agent is not registered.

ValueError

If updates contains a blocked field.

Source code in src/synthorg/hr/registry.py
async def apply_identity_update(
    self,
    agent_id: NotBlankStr,
    updates: dict[str, Any],
    *,
    saved_by: str,
) -> AgentIdentity:
    """Mutate any allowed field on the registered identity.

    Designed for the MCP write surface, which is privileged and
    must be able to update everything the REST API can. Only the
    truly-immutable identifiers (``id``, ``name``, ``department``)
    and the lifecycle ``status`` slot (which has its own
    ``update_status`` path) are rejected.

    Args:
        agent_id: The agent identifier.
        updates: Mapping of field name to new value.
        saved_by: Actor recorded in the version snapshot.

    Returns:
        Updated agent identity (a new frozen instance).

    Raises:
        AgentNotFoundError: If the agent is not registered.
        ValueError: If ``updates`` contains a blocked field.
    """
    blocked = set(updates.keys()) & self._BLOCKED_UPDATE_FIELDS
    if blocked:
        msg = (
            f"Fields are immutable via apply_identity_update: "
            f"{sorted(blocked)}. Use update_status / evolve_identity "
            f"or accept the immutability for {sorted(blocked)}."
        )
        logger.warning(
            HR_REGISTRY_IDENTITY_UPDATED,
            agent_id=str(agent_id),
            error=msg,
            updated_fields=sorted(updates.keys()),
        )
        raise ValueError(msg)

    key = str(agent_id)
    async with self._lock:
        identity = self._agents.get(key)
        if identity is None:
            msg = f"Agent {agent_id!r} not found in registry"
            logger.warning(
                HR_REGISTRY_IDENTITY_UPDATED,
                agent_id=key,
                error=msg,
            )
            raise AgentNotFoundError(msg)
        if not updates:
            # No-op: avoid an unnecessary model_copy + snapshot.
            return identity
        # ``model_copy(update=...)`` bypasses Pydantic validation,
        # so callers (notably the MCP ``synthorg_agents_update``
        # tool) could otherwise smuggle a wrong runtime type for
        # any allowed field (e.g. an int for a ``NotBlankStr``).
        # Re-run validation on the merged dump to enforce the same
        # type / constraint guarantees the construction path
        # already provides.
        from pydantic import ValidationError  # noqa: PLC0415

        from synthorg.core.agent import AgentIdentity  # noqa: PLC0415

        merged = identity.model_copy(update=dict(updates)).model_dump()
        try:
            updated = AgentIdentity.model_validate(merged)
        except ValidationError as exc:
            logger.warning(
                HR_REGISTRY_IDENTITY_UPDATED,
                agent_id=key,
                error="invalid update payload",
                updated_fields=sorted(updates.keys()),
            )
            msg = (
                f"Update payload for agent {agent_id!r} failed validation: "
                f"{safe_error_description(exc)}"
            )
            raise ValueError(msg) from exc
        self._agents[key] = updated

    logger.info(
        HR_REGISTRY_IDENTITY_UPDATED,
        agent_id=key,
        updated_fields=sorted(updates.keys()),
    )
    await self._snapshot(updated, saved_by=saved_by)
    return updated

update_autonomy async

update_autonomy(agent_id, update, *, approval_store=None)

Request an autonomy level change for an agent.

Mirrors the REST endpoint: the change is requested, never applied directly. SECURITY_AUTONOMY_PROMOTION_REQUESTED is logged for the audit trail; an approval item is enqueued when an approval_store is wired so the queue can drive the human review; SECURITY_AUTONOMY_PROMOTION_DENIED is logged because the request did not produce an immediate runtime change.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent whose autonomy is being changed.

required
update AutonomyUpdate

The autonomy change request.

required
approval_store ApprovalStoreProtocol | None

Optional approval store; when provided, the request is enqueued and the returned approval_id identifies it.

None

Returns:

Type Description
AutonomyUpdateResult

AutonomyUpdateResult describing the outcome.

Raises:

Type Description
AgentNotFoundError

If the agent is not registered.

Source code in src/synthorg/hr/registry.py
async def update_autonomy(
    self,
    agent_id: NotBlankStr,
    update: AutonomyUpdate,
    *,
    approval_store: ApprovalStoreProtocol | None = None,
) -> AutonomyUpdateResult:
    """Request an autonomy level change for an agent.

    Mirrors the REST endpoint: the change is *requested*, never
    applied directly. ``SECURITY_AUTONOMY_PROMOTION_REQUESTED`` is logged
    for the audit trail; an approval item is enqueued when an
    ``approval_store`` is wired so the queue can drive the human
    review; ``SECURITY_AUTONOMY_PROMOTION_DENIED`` is logged because the
    request did not produce an immediate runtime change.

    Args:
        agent_id: The agent whose autonomy is being changed.
        update: The autonomy change request.
        approval_store: Optional approval store; when provided, the
            request is enqueued and the returned ``approval_id``
            identifies it.

    Returns:
        ``AutonomyUpdateResult`` describing the outcome.

    Raises:
        AgentNotFoundError: If the agent is not registered.
    """
    key = str(agent_id)
    async with self._lock:
        identity = self._agents.get(key)
        if identity is None:
            msg = f"Agent {agent_id!r} not found in registry"
            logger.warning(
                SECURITY_AUTONOMY_PROMOTION_REQUESTED,
                agent_id=key,
                error=msg,
            )
            raise AgentNotFoundError(msg)
        current_level: AutonomyLevel = (
            identity.autonomy_level
            if identity.autonomy_level is not None
            else AutonomyLevel.SUPERVISED
        )

    logger.info(
        SECURITY_AUTONOMY_PROMOTION_REQUESTED,
        agent_id=key,
        requested_level=update.requested_level.value,
        current_level=current_level.value,
        reason=update.reason,
        requested_by=update.requested_by,
    )

    # The strategy verdict is enforced, not audit-only: a granting
    # strategy auto-decides the approval and the level change is
    # applied here; the HUMAN_ONLY default leaves it pending.
    granted = update.granted_by_strategy is not None
    now = datetime.now(UTC)
    # 16 hex chars (64 bits) keeps collision probability negligible
    # for approval-queue volumes while still fitting compactly into
    # log lines and audit trails.
    approval_id = f"approval-{uuid.uuid4().hex[:16]}"
    # Local import breaks the import cycle:
    # ``synthorg.core.approval`` -> ``synthorg.ontology.decorator`` ->
    # ... -> ``synthorg.communication.meeting.participant`` ->
    # ``synthorg.hr.registry``. Deferring to call time keeps module
    # bootstrap acyclic without weakening the call-site contract.
    from synthorg.core.approval import (  # noqa: PLC0415
        ApprovalItem as _ApprovalItem,
    )

    requested_by = update.requested_by or "system"
    base_metadata = {
        "agent_id": key,
        "current_level": current_level.value,
        "requested_level": update.requested_level.value,
    }
    title = (
        f"Autonomy change for {key}: "
        f"{current_level.value} -> {update.requested_level.value}"
    )

    if not granted:
        # HUMAN_ONLY (default): the request pends; nothing mutates
        # the agent's identity until a human decides. A PENDING row
        # is non-terminal, so persisting it before any mutation is
        # the designed behaviour, not a false audit.
        approval_enqueued = False
        if approval_store is not None:
            await approval_store.add(
                _ApprovalItem(
                    id=approval_id,
                    action_type="autonomy:promote",
                    title=title,
                    description=update.reason,
                    requested_by=requested_by,
                    risk_level=ApprovalRiskLevel.HIGH,
                    status=ApprovalStatus.PENDING,
                    created_at=now,
                    metadata=base_metadata,
                ),
            )
            approval_enqueued = True
        logger.info(
            SECURITY_AUTONOMY_PROMOTION_DENIED,
            agent_id=key,
            requested_level=update.requested_level.value,
            reason="Autonomy level changes require human approval",
        )
        return AutonomyUpdateResult(
            agent_id=key,
            current_level=current_level,
            requested_level=update.requested_level,
            promotion_pending=True,
            approval_enqueued=approval_enqueued,
            approval_id=approval_id if approval_enqueued else None,
        )

    # Strategy granted: apply the level change FIRST so a terminal
    # (APPROVED) approval row is only persisted once the mutation
    # has actually succeeded -- otherwise a failure in the await
    # gap (agent unregistered / registry cleared) would leave an
    # APPROVED audit row claiming a promotion that never happened.
    async with self._lock:
        live = self._agents.get(key)
        if live is None:
            msg = f"Agent {agent_id!r} not found in registry"
            raise AgentNotFoundError(msg)
        applied = live.model_copy(
            update={"autonomy_level": update.requested_level},
        )
        self._agents[key] = applied
    await self._snapshot(
        applied,
        saved_by=f"autonomy_strategy_grant:{key}",
    )

    approval_enqueued = False
    if approval_store is not None:
        # Dual-write: the autonomy mutation above is the source of
        # truth and is already persisted via _snapshot. The
        # APPROVED row is a best-effort audit artifact -- if its
        # write fails we log loudly and report the (correct)
        # promotion, rather than roll back a valid state change or
        # 5xx the caller. Any add/mutate ordering only moves the
        # failure window; soft-failing the audit write is what
        # makes the operation safe regardless of order.
        try:
            await approval_store.add(
                _ApprovalItem(
                    id=approval_id,
                    action_type="autonomy:promote",
                    title=title,
                    description=update.reason,
                    requested_by=requested_by,
                    risk_level=ApprovalRiskLevel.HIGH,
                    # Auto-decided: the queue stays the apply driver
                    # and the audit trail is intact. ``decided_at``
                    # / ``decided_by`` satisfy the APPROVED
                    # invariant.
                    status=ApprovalStatus.APPROVED,
                    created_at=now,
                    decided_at=now,
                    decided_by=f"strategy:{update.granted_by_strategy}",
                    metadata={
                        **base_metadata,
                        "granted_by_strategy": str(
                            update.granted_by_strategy,
                        ),
                    },
                ),
            )
            approval_enqueued = True
        except MemoryError, RecursionError:
            raise
        except Exception as exc:
            logger.error(
                SECURITY_AUTONOMY_PROMOTION_AUDIT_FAILED,
                agent_id=key,
                approval_id=approval_id,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
                note=(
                    "autonomy promotion applied; audit row write "
                    "failed -- promotion is the source of truth"
                ),
            )
    result_id = approval_id if approval_enqueued else None
    # State transition logged AFTER the persistence write.
    logger.info(
        SECURITY_AUTONOMY_PROMOTION_GRANTED,
        agent_id=key,
        previous_level=current_level.value,
        requested_level=update.requested_level.value,
        granted_by_strategy=str(update.granted_by_strategy),
        approval_id=result_id,
    )
    return AutonomyUpdateResult(
        agent_id=key,
        current_level=update.requested_level,
        requested_level=update.requested_level,
        promotion_pending=False,
        approval_enqueued=approval_enqueued,
        approval_id=result_id,
    )

agent_count async

agent_count()

Number of agents currently in the registry.

Source code in src/synthorg/hr/registry.py
async def agent_count(self) -> int:
    """Number of agents currently in the registry."""
    async with self._lock:
        return len(self._agents)

Hiring Service

hiring_service

Hiring service.

Orchestrates the hiring pipeline: request creation, candidate generation, approval submission, and agent instantiation.

HiringService

HiringService(
    *, registry, approval_store=None, onboarding_service=None, default_model_config=None
)

Orchestrates the hiring pipeline.

Manages the lifecycle of hiring requests from creation through candidate generation, approval, and agent instantiation.

Parameters:

Name Type Description Default
registry AgentRegistryService

Agent registry for registering new agents.

required
approval_store ApprovalStoreProtocol | None

Optional approval store for human approval.

None
onboarding_service OnboardingService | None

Optional onboarding service to start onboarding after instantiation.

None
default_model_config ModelConfig | None

Optional default model configuration for newly created agents. Falls back to generic defaults if not provided.

None
Source code in src/synthorg/hr/hiring_service.py
def __init__(
    self,
    *,
    registry: AgentRegistryService,
    approval_store: ApprovalStoreProtocol | None = None,
    onboarding_service: OnboardingService | None = None,
    default_model_config: ModelConfig | None = None,
) -> None:
    self._registry = registry
    self._approval_store = approval_store
    self._onboarding_service = onboarding_service
    self._default_model_config = default_model_config
    self._requests: dict[str, HiringRequest] = {}

create_request async

create_request(
    *,
    requested_by,
    department,
    role,
    level,
    required_skills=(),
    reason,
    budget_limit_monthly=None,
    template_name=None,
)

Create a new hiring request.

Parameters:

Name Type Description Default
requested_by NotBlankStr

Request initiator.

required
department NotBlankStr

Target department.

required
role NotBlankStr

Desired role.

required
level str

Desired seniority level.

required
required_skills tuple[NotBlankStr, ...]

Required skills.

()
reason NotBlankStr

Business justification.

required
budget_limit_monthly float | None

Optional monthly budget limit.

None
template_name str | None

Template for candidate generation.

None

Returns:

Type Description
HiringRequest

The created hiring request.

Source code in src/synthorg/hr/hiring_service.py
async def create_request(  # noqa: PLR0913
    self,
    *,
    requested_by: NotBlankStr,
    department: NotBlankStr,
    role: NotBlankStr,
    level: str,
    required_skills: tuple[NotBlankStr, ...] = (),
    reason: NotBlankStr,
    budget_limit_monthly: float | None = None,
    template_name: str | None = None,
) -> HiringRequest:
    """Create a new hiring request.

    Args:
        requested_by: Request initiator.
        department: Target department.
        role: Desired role.
        level: Desired seniority level.
        required_skills: Required skills.
        reason: Business justification.
        budget_limit_monthly: Optional monthly budget limit.
        template_name: Template for candidate generation.

    Returns:
        The created hiring request.
    """
    try:
        parsed_level = SeniorityLevel(level)
    except ValueError as exc:
        msg = f"Invalid seniority level {level!r} for hiring request"
        logger.warning(
            HR_HIRING_REQUEST_CREATED,
            error=msg,
            level=level,
        )
        raise HiringError(msg) from exc

    request = HiringRequest(
        requested_by=requested_by,
        department=department,
        role=role,
        level=parsed_level,
        required_skills=required_skills,
        reason=reason,
        budget_limit_monthly=budget_limit_monthly,
        template_name=template_name,
        created_at=datetime.now(UTC),
    )
    self._requests[str(request.id)] = request

    logger.info(
        HR_HIRING_REQUEST_CREATED,
        request_id=str(request.id),
        department=str(department),
        role=str(role),
    )
    return request

generate_candidate async

generate_candidate(request)

Generate a candidate card for a hiring request.

Builds a CandidateCard from role/level defaults. In the future, this can be extended with template presets and LLM customization.

Parameters:

Name Type Description Default
request HiringRequest

The hiring request to generate a candidate for.

required

Returns:

Type Description
HiringRequest

Updated request with the new candidate appended.

Source code in src/synthorg/hr/hiring_service.py
async def generate_candidate(
    self,
    request: HiringRequest,
) -> HiringRequest:
    """Generate a candidate card for a hiring request.

    Builds a ``CandidateCard`` from role/level defaults. In the
    future, this can be extended with template presets and LLM
    customization.

    Args:
        request: The hiring request to generate a candidate for.

    Returns:
        Updated request with the new candidate appended.
    """
    request = self._get_request(str(request.id))

    candidate = CandidateCard(
        name=NotBlankStr(f"{request.role}-{request.department}-agent"),
        role=request.role,
        department=request.department,
        level=request.level,
        skills=tuple(Skill(id=s, name=s) for s in request.required_skills),
        rationale=NotBlankStr(
            f"Generated for: {request.reason}",
        ),
        estimated_monthly_cost=(
            request.budget_limit_monthly
            if request.budget_limit_monthly is not None
            else 50.0
        ),
        template_source=request.template_name,
    )

    updated = request.model_copy(
        update={"candidates": (*request.candidates, candidate)},
    )
    self._requests[str(updated.id)] = updated

    logger.info(
        HR_HIRING_CANDIDATE_GENERATED,
        request_id=str(request.id),
        candidate_id=str(candidate.id),
    )
    return updated

submit_for_approval async

submit_for_approval(request, candidate_id)

Submit a candidate for approval.

If no approval store is configured, auto-approves the request.

Parameters:

Name Type Description Default
request HiringRequest

The hiring request.

required
candidate_id str

ID of the candidate to approve.

required

Returns:

Type Description
HiringRequest

Updated request with approval status.

Raises:

Type Description
InvalidCandidateError

If the candidate ID is not found.

Source code in src/synthorg/hr/hiring_service.py
async def submit_for_approval(
    self,
    request: HiringRequest,
    candidate_id: str,
) -> HiringRequest:
    """Submit a candidate for approval.

    If no approval store is configured, auto-approves the request.

    Args:
        request: The hiring request.
        candidate_id: ID of the candidate to approve.

    Returns:
        Updated request with approval status.

    Raises:
        InvalidCandidateError: If the candidate ID is not found.
    """
    request = self._get_request(str(request.id))

    candidate = next(
        (c for c in request.candidates if str(c.id) == candidate_id),
        None,
    )
    if candidate is None:
        msg = f"Candidate {candidate_id!r} not found on request {request.id!r}"
        logger.warning(
            HR_HIRING_APPROVAL_SUBMITTED,
            request_id=str(request.id),
            error=msg,
        )
        raise InvalidCandidateError(msg)

    previous_status = request.status
    if self._approval_store is None:
        # Auto-approve when no approval store.
        updated = request.model_copy(
            update={
                "status": HiringRequestStatus.APPROVED,
                "selected_candidate_id": candidate_id,
            },
        )
    else:
        # Create an approval item.
        updated = await self._submit_approval_item(request, candidate, candidate_id)

    self._requests[str(updated.id)] = updated

    # Emit the status-transition log only when the status actually
    # flipped: the auto-approve branch goes PENDING -> APPROVED,
    # but the manual-approval branch keeps the request at
    # ``previous_status`` (the approval-store flow only stamps a
    # selected candidate / approval id).  Logging in the
    # no-transition case would lie about state.
    if updated.status != previous_status:
        logger.info(
            HIRING_REQUEST_STATUS_TRANSITIONED,
            request_id=str(updated.id),
            from_status=previous_status.value,
            to_status=updated.status.value,
        )

    logger.info(
        HR_HIRING_APPROVAL_SUBMITTED,
        request_id=str(request.id),
        candidate_id=candidate_id,
        auto_approved=self._approval_store is None,
    )
    return updated

instantiate_agent async

instantiate_agent(request)

Instantiate an agent from an approved hiring request.

Parameters:

Name Type Description Default
request HiringRequest

The approved hiring request.

required

Returns:

Type Description
AgentIdentity

The newly created agent identity.

Raises:

Type Description
HiringApprovalRequiredError

If request is not approved.

HiringRejectedError

If request was rejected.

InvalidCandidateError

If no candidate is selected.

HiringError

If instantiation fails.

Source code in src/synthorg/hr/hiring_service.py
async def instantiate_agent(
    self,
    request: HiringRequest,
) -> AgentIdentity:
    """Instantiate an agent from an approved hiring request.

    Args:
        request: The approved hiring request.

    Returns:
        The newly created agent identity.

    Raises:
        HiringApprovalRequiredError: If request is not approved.
        HiringRejectedError: If request was rejected.
        InvalidCandidateError: If no candidate is selected.
        HiringError: If instantiation fails.
    """
    request = self._get_request(str(request.id))
    self._validate_instantiation_status(request)
    candidate = self._find_selected_candidate(request)

    identity = self._build_agent_identity(candidate)
    await self._register_agent(identity, request)

    # Update request status.
    previous_status = request.status
    updated = request.model_copy(
        update={"status": HiringRequestStatus.INSTANTIATED},
    )
    self._requests[str(updated.id)] = updated

    # Status flip is logged AFTER the dict write succeeds and
    # before downstream callbacks (``_try_onboard``); a failure in
    # the onboarding hook should not mask the persisted
    # transition.  ``_validate_instantiation_status`` enforces the
    # ``previous_status == APPROVED`` invariant, so we always emit
    # an APPROVED -> INSTANTIATED transition here.
    logger.info(
        HIRING_REQUEST_STATUS_TRANSITIONED,
        request_id=str(updated.id),
        from_status=previous_status.value,
        to_status=updated.status.value,
    )

    # Start onboarding if service is available.
    await self._try_onboard(identity)

    logger.info(
        HR_HIRING_INSTANTIATED,
        request_id=str(request.id),
        agent_id=str(identity.id),
        agent_name=str(identity.name),
    )
    return identity

Onboarding Service

onboarding_service

Onboarding service.

Manages agent onboarding checklists, step tracking, and automatic activation upon checklist completion.

OnboardingService

OnboardingService(*, registry)

Manages onboarding checklists and step tracking.

Creates checklists with all OnboardingStep values when onboarding starts. When all steps are completed, automatically transitions the agent to ACTIVE status via the registry.

Parameters:

Name Type Description Default
registry AgentRegistryService

Agent registry for status updates.

required
Source code in src/synthorg/hr/onboarding_service.py
def __init__(
    self,
    *,
    registry: AgentRegistryService,
) -> None:
    self._registry = registry
    self._checklists: dict[str, OnboardingChecklist] = {}

start_onboarding async

start_onboarding(agent_id)

Start onboarding for a newly hired agent.

Creates a checklist with all onboarding steps in PENDING state.

Parameters:

Name Type Description Default
agent_id str

Agent to onboard.

required

Returns:

Type Description
OnboardingChecklist

The created onboarding checklist.

Raises:

Type Description
OnboardingError

If a checklist already exists.

Source code in src/synthorg/hr/onboarding_service.py
async def start_onboarding(self, agent_id: str) -> OnboardingChecklist:
    """Start onboarding for a newly hired agent.

    Creates a checklist with all onboarding steps in PENDING state.

    Args:
        agent_id: Agent to onboard.

    Returns:
        The created onboarding checklist.

    Raises:
        OnboardingError: If a checklist already exists.
    """
    agent = await self._registry.get(NotBlankStr(agent_id))
    if agent is None:
        msg = f"Agent {agent_id!r} not found in registry"
        logger.warning(HR_ONBOARDING_STARTED, agent_id=agent_id, error=msg)
        raise OnboardingError(msg)

    if agent_id in self._checklists:
        msg = f"Onboarding checklist already exists for agent {agent_id!r}"
        logger.warning(HR_ONBOARDING_STARTED, agent_id=agent_id, error=msg)
        raise OnboardingError(msg)

    steps = tuple(OnboardingStepRecord(step=step) for step in OnboardingStep)
    checklist = OnboardingChecklist(
        agent_id=agent_id,
        steps=steps,
        started_at=datetime.now(UTC),
    )
    self._checklists[agent_id] = checklist

    logger.info(
        HR_ONBOARDING_STARTED,
        agent_id=agent_id,
        step_count=len(steps),
    )
    return checklist

complete_step async

complete_step(agent_id, step, *, notes='')

Mark an onboarding step as complete.

When all steps are completed, automatically transitions the agent to ACTIVE status.

Parameters:

Name Type Description Default
agent_id str

Agent being onboarded.

required
step OnboardingStep

The step to complete.

required
notes str

Optional notes for the step.

''

Returns:

Type Description
OnboardingChecklist

Updated onboarding checklist.

Raises:

Type Description
OnboardingError

If no checklist exists for the agent.

Source code in src/synthorg/hr/onboarding_service.py
async def complete_step(
    self,
    agent_id: str,
    step: OnboardingStep,
    *,
    notes: str = "",
) -> OnboardingChecklist:
    """Mark an onboarding step as complete.

    When all steps are completed, automatically transitions the
    agent to ACTIVE status.

    Args:
        agent_id: Agent being onboarded.
        step: The step to complete.
        notes: Optional notes for the step.

    Returns:
        Updated onboarding checklist.

    Raises:
        OnboardingError: If no checklist exists for the agent.
    """
    checklist = self._checklists.get(agent_id)
    if checklist is None:
        msg = f"No onboarding checklist for agent {agent_id!r}"
        logger.warning(
            HR_ONBOARDING_STEP_COMPLETE,
            agent_id=agent_id,
            error=msg,
        )
        raise OnboardingError(msg)

    now = datetime.now(UTC)
    step_found = any(s.step == step and not s.completed for s in checklist.steps)
    if not step_found:
        logger.warning(
            HR_ONBOARDING_STEP_COMPLETE,
            agent_id=agent_id,
            step=step.value,
            skipped="step_not_found_or_already_completed",
        )
        return checklist

    updated_steps = tuple(
        s.model_copy(
            update={
                "completed": True,
                "completed_at": now,
                "notes": notes,
            },
        )
        if s.step == step and not s.completed
        else s
        for s in checklist.steps
    )

    updated = checklist.model_copy(update={"steps": updated_steps})

    # Check if all steps are now complete.
    if updated.is_complete and not checklist.is_complete:
        updated = updated.model_copy(update={"completed_at": now})
        await self._registry.update_status(agent_id, AgentStatus.ACTIVE)
        logger.info(HR_ONBOARDING_COMPLETE, agent_id=agent_id)

    self._checklists[agent_id] = updated

    logger.info(
        HR_ONBOARDING_STEP_COMPLETE,
        agent_id=agent_id,
        step=step.value,
    )
    return updated

get_checklist async

get_checklist(agent_id)

Retrieve the onboarding checklist for an agent.

Parameters:

Name Type Description Default
agent_id str

Agent to look up.

required

Returns:

Type Description
OnboardingChecklist | None

The checklist, or None if not found.

Source code in src/synthorg/hr/onboarding_service.py
async def get_checklist(
    self,
    agent_id: str,
) -> OnboardingChecklist | None:
    """Retrieve the onboarding checklist for an agent.

    Args:
        agent_id: Agent to look up.

    Returns:
        The checklist, or None if not found.
    """
    return self._checklists.get(agent_id)

Offboarding Service

offboarding_service

Offboarding service.

Orchestrates the firing/offboarding pipeline: task reassignment, memory archival, team notification, and agent termination.

OffboardingService

OffboardingService(
    *,
    registry,
    reassignment_strategy=None,
    archival_strategy=None,
    memory_backend=None,
    archival_store=None,
    org_memory_backend=None,
    message_bus=None,
    task_repository=None,
)

Orchestrates the firing/offboarding pipeline.

Pipeline steps
  1. Get active tasks and reassign via strategy.
  2. Archive memory via archival strategy.
  3. Notify team via message bus.
  4. Update agent status to TERMINATED and return record.

Parameters:

Name Type Description Default
registry AgentRegistryService

Agent registry for status updates.

required
reassignment_strategy TaskReassignmentStrategy | None

Strategy for task reassignment. When None, defaults to :class:QueueReturnStrategy so production wiring does not have to import the concrete class. Override to swap in an alternative reassignment policy (e.g. specific-agent transfer, reject-on-fail).

None
archival_strategy MemoryArchivalStrategy | None

Strategy for memory archival. When None, defaults to :class:FullSnapshotStrategy. Override to swap in selective archival or summary-only policies.

None
memory_backend MemoryBackend | None

Optional hot memory store.

None
archival_store ArchivalStore | None

Optional cold archival storage.

None
org_memory_backend OrgMemoryBackend | None

Optional org memory for promotion.

None
message_bus MessageBus | None

Optional message bus for notifications.

None
task_repository TaskRepository | None

Optional task repository for queries.

None
Source code in src/synthorg/hr/offboarding_service.py
def __init__(  # noqa: PLR0913
    self,
    *,
    registry: AgentRegistryService,
    reassignment_strategy: TaskReassignmentStrategy | None = None,
    archival_strategy: MemoryArchivalStrategy | None = None,
    memory_backend: MemoryBackend | None = None,
    archival_store: ArchivalStore | None = None,
    org_memory_backend: OrgMemoryBackend | None = None,
    message_bus: MessageBus | None = None,
    task_repository: TaskRepository | None = None,
) -> None:
    self._registry = registry
    self._reassignment_strategy: TaskReassignmentStrategy = (
        reassignment_strategy
        if reassignment_strategy is not None
        else QueueReturnStrategy()
    )
    self._archival_strategy: MemoryArchivalStrategy = (
        archival_strategy
        if archival_strategy is not None
        else FullSnapshotStrategy()
    )
    self._memory_backend = memory_backend
    self._archival_store = archival_store
    self._org_memory_backend = org_memory_backend
    self._message_bus = message_bus
    self._task_repository = task_repository

offboard async

offboard(request)

Execute the full offboarding pipeline.

Parameters:

Name Type Description Default
request FiringRequest

The firing request to process.

required

Returns:

Type Description
OffboardingRecord

Record of the completed offboarding.

Raises:

Type Description
OffboardingError

If task reassignment fails (fatal).

AgentNotFoundError

If the agent is not in the registry (fatal).

Note

Memory archival and team notification failures are logged but non-fatal -- offboarding continues.

Source code in src/synthorg/hr/offboarding_service.py
async def offboard(
    self,
    request: FiringRequest,
) -> OffboardingRecord:
    """Execute the full offboarding pipeline.

    Args:
        request: The firing request to process.

    Returns:
        Record of the completed offboarding.

    Raises:
        OffboardingError: If task reassignment fails (fatal).
        AgentNotFoundError: If the agent is not in the registry
            (fatal).

    Note:
        Memory archival and team notification failures are logged
        but non-fatal -- offboarding continues.
    """
    started_at = datetime.now(UTC)
    agent_id = str(request.agent_id)

    logger.info(
        HR_FIRING_INITIATED,
        agent_id=agent_id,
        reason=request.reason.value,
    )

    # Verify agent exists in registry.
    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found in registry"
        logger.warning(HR_FIRING_INITIATED, agent_id=agent_id, error=msg)
        raise AgentNotFoundError(msg)

    # Step 1: Get active tasks and reassign.
    tasks_reassigned = await self._reassign_tasks(agent_id)

    # Step 2: Archive memory.
    archival_result = await self._archive_memory(agent_id, identity)

    # Step 3: Notify team.
    team_notified = await self._notify_team(
        agent_id, identity, request.reason.value
    )

    # Step 4: Terminate agent.
    await self._terminate_agent(agent_id)

    completed_at = datetime.now(UTC)
    record = OffboardingRecord(
        agent_id=NotBlankStr(agent_id),
        agent_name=identity.name,
        firing_request_id=request.id,
        tasks_reassigned=tasks_reassigned,
        memory_archive_id=None,
        org_memories_promoted=archival_result.promoted_to_org,
        team_notification_sent=team_notified,
        started_at=started_at,
        completed_at=completed_at,
    )

    logger.info(
        HR_FIRING_COMPLETE,
        agent_id=agent_id,
        tasks_reassigned=len(tasks_reassigned),
        memories_archived=archival_result.total_archived,
    )
    return record

Enums

enums

HR domain enumerations.

HiringRequestStatus

Bases: StrEnum

Status of a hiring request through the approval pipeline.

FiringReason

Bases: StrEnum

Reason for agent termination.

OnboardingStep

Bases: StrEnum

Steps in the agent onboarding checklist.

LifecycleEventType

Bases: StrEnum

Type of agent lifecycle event.

ActivityEventType

Bases: StrEnum

Event types produced by the activity feed timeline.

Superset of LifecycleEventType plus operational event types generated from task metrics, cost records, tool invocations, and delegation records.

PromotionDirection

Bases: StrEnum

Direction of a seniority level change.

TrendDirection

Bases: StrEnum

Direction of a performance metric trend.

Errors

errors

HR domain error hierarchy.

All HR errors default to is_retryable = False. HR operations are either deterministic lookups (agent registry, personality catalogue, training session store) or write-ops against authoritative state (hiring, promotion, pruning) where silent retries would double-apply. Subclasses that genuinely represent a transient network/I/O failure should override is_retryable = True explicitly.

HRError

HRError(message=None)

Bases: DomainError

Base error for all HR operations.

is_retryable defaults to False so the provider-retry layer surfaces HR errors immediately; subclasses override to True only for genuine transient I/O / network failures.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

HiringError

HiringError(message=None)

Bases: HRError

Error during the hiring process.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

HiringApprovalRequiredError

HiringApprovalRequiredError(message=None)

Bases: HiringError

Hiring request requires approval before instantiation.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

HiringRejectedError

HiringRejectedError(message=None)

Bases: HiringError

Hiring request was rejected.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

InvalidCandidateError

InvalidCandidateError(message=None)

Bases: HiringError

Candidate card is invalid or does not exist on the request.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

FiringError

FiringError(message=None)

Bases: HRError

Error during the firing process.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

OffboardingError

OffboardingError(message=None)

Bases: HRError

Error during the offboarding pipeline.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TaskReassignmentError

TaskReassignmentError(message=None)

Bases: OffboardingError

Failed to reassign tasks from a departing agent.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

MemoryArchivalError

MemoryArchivalError(message=None)

Bases: OffboardingError

Failed to archive agent memories during offboarding.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

OnboardingError

OnboardingError(message=None)

Bases: HRError

Error during the onboarding process.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

AgentRegistryError

AgentRegistryError(message=None)

Bases: HRError

Error in the agent registry.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

AgentNotFoundError

AgentNotFoundError(message=None)

Bases: AgentRegistryError

Agent not found in the registry.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

AgentAlreadyRegisteredError

AgentAlreadyRegisteredError(message=None)

Bases: AgentRegistryError

Agent is already registered.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PerformanceError

PerformanceError(message=None)

Bases: HRError

Error in the performance tracking system.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

InsufficientDataError

InsufficientDataError(message=None)

Bases: PerformanceError

Not enough data points for a meaningful computation.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PromotionError

PromotionError(message=None)

Bases: HRError

Error during the promotion/demotion process.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PromotionCooldownError

PromotionCooldownError(message=None)

Bases: PromotionError

Promotion is blocked by the cooldown period.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PromotionApprovalRequiredError

PromotionApprovalRequiredError(message=None)

Bases: PromotionError

Promotion requires human approval before proceeding.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PruningError

PruningError(message=None)

Bases: HRError

Error during the pruning process.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PruningUnrestartableError

PruningUnrestartableError(message=None)

Bases: PruningError

Raised when PruningService.start() is called after a timed-out stop.

Mirrors :class:BackupUnrestartableError: a stuck drain leaves an orphan loop that may still hold references the new instance would race; the canonical lifecycle pattern marks the service unrestartable and forces operators to construct a fresh one.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PersonalityError

PersonalityError(message=None)

Bases: HRError

Error in the personality preset catalogue.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

PersonalityNotFoundError

PersonalityNotFoundError(message=None)

Bases: PersonalityError

Personality preset not found in the catalogue.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TrainingError

TrainingError(message=None)

Bases: HRError

Error in the training pipeline.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

TrainingSessionNotFoundError

TrainingSessionNotFoundError(message=None)

Bases: TrainingError

Training session not found in the session store.

Source code in src/synthorg/core/domain_errors.py
def __init__(self, message: str | None = None) -> None:
    super().__init__(message or self.default_message)

Performance

config

Performance tracking configuration.

PerformanceConfig pydantic-model

Bases: BaseModel

Configuration for the performance tracking system.

Attributes:

Name Type Description
min_data_points int

Minimum data points for meaningful aggregation.

windows tuple[NotBlankStr, ...]

Time window labels for rolling metrics.

improving_threshold float

Slope threshold for improving trend.

declining_threshold float

Slope threshold for declining trend.

collaboration_weights dict[str, float] | None

Optional custom weights for collaboration scoring components.

llm_sampling_rate float

Fraction of collaboration events sampled by LLM (0.01 = 1%).

llm_sampling_model NotBlankStr | None

Model ID for LLM calibration sampling (None = disabled).

calibration_retention_days int

Days to retain LLM calibration records.

quality_judge_model NotBlankStr | None

Model ID for LLM quality judge (None = disabled).

quality_judge_provider NotBlankStr | None

Provider name for LLM quality judge (None = auto from model ref). Requires quality_judge_model.

quality_ci_weight float

Weight for CI signal in composite quality score (default 0.4).

quality_llm_weight float

Weight for LLM judge in composite quality score (default 0.6).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_quality_judge_provider_requires_model
  • _validate_threshold_ordering
  • _validate_quality_weights_sum

min_data_points pydantic-field

min_data_points = 5

Minimum data points for meaningful aggregation

windows pydantic-field

windows = (NotBlankStr('7d'), NotBlankStr('30d'), NotBlankStr('90d'))

Time window labels for rolling metrics

improving_threshold pydantic-field

improving_threshold = 0.05

Slope threshold for improving trend

declining_threshold pydantic-field

declining_threshold = -0.05

Slope threshold for declining trend

collaboration_weights pydantic-field

collaboration_weights = None

Custom weights for collaboration scoring components

llm_sampling_rate pydantic-field

llm_sampling_rate = 0.01

Fraction of collaboration events sampled by LLM (0.01 = 1%)

llm_sampling_model pydantic-field

llm_sampling_model = None

Model ID for LLM calibration sampling (None = disabled)

calibration_retention_days pydantic-field

calibration_retention_days = 90

Days to retain LLM calibration records

quality_judge_model pydantic-field

quality_judge_model = None

Model ID for LLM quality judge (None = disabled)

quality_judge_provider pydantic-field

quality_judge_provider = None

Provider name for LLM quality judge (None = auto from model ref)

quality_ci_weight pydantic-field

quality_ci_weight = 0.4

Weight for CI signal in composite quality score. Together with quality_llm_weight, must sum to 1.0.

quality_llm_weight pydantic-field

quality_llm_weight = 0.6

Weight for LLM judge in composite quality score. Together with quality_ci_weight, must sum to 1.0.

models

Performance tracking domain models.

Frozen Pydantic models for task metrics, collaboration metrics, quality/collaboration scoring results, trend detection, and rolling-window aggregates.

TaskMetricRecord pydantic-model

Bases: BaseModel

Record of a single task completion for performance tracking.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent who completed the task.

task_id NotBlankStr

Task identifier.

task_type TaskType

Classification of the task.

started_at AwareDatetime | None

When the task started (None if not tracked).

completed_at AwareDatetime

When the task was completed.

is_success bool

Whether the task completed successfully.

duration_seconds float

Wall-clock execution time.

cost float

Numeric cost of the task, denominated in currency.

currency CurrencyCode

ISO 4217 currency code for cost.

turns_used int

Number of LLM turns used.

tokens_used int

Total tokens consumed.

quality_score float | None

Quality score (0.0-10.0), None if not scored.

complexity Complexity

Estimated task complexity.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_temporal_ordering

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent who completed the task

task_id pydantic-field

task_id

Task identifier

task_type pydantic-field

task_type

Classification of the task

started_at pydantic-field

started_at = None

When the task started (None if not tracked)

completed_at pydantic-field

completed_at

When the task was completed

is_success pydantic-field

is_success

Whether the task completed successfully

duration_seconds pydantic-field

duration_seconds

Wall-clock execution time

cost pydantic-field

cost

Numeric cost of the task, denominated in currency

currency pydantic-field

currency

ISO 4217 currency code for cost

turns_used pydantic-field

turns_used

Number of LLM turns used

tokens_used pydantic-field

tokens_used

Total tokens consumed

quality_score pydantic-field

quality_score = None

Quality score (0.0-10.0)

complexity pydantic-field

complexity

Estimated task complexity

CollaborationMetricRecord pydantic-model

Bases: BaseModel

Record of a collaboration behavior data point.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent being measured.

recorded_at AwareDatetime

When the observation was recorded.

delegation_success bool | None

Whether a delegation was successful.

delegation_response_seconds float | None

Response time for a delegation.

conflict_constructiveness float | None

How constructively conflict was handled.

meeting_contribution float | None

Quality of meeting contribution.

loop_triggered bool

Whether the agent triggered a delegation loop.

handoff_completeness float | None

Completeness of task handoff (0.0-1.0).

interaction_summary NotBlankStr | None

Text summary of the interaction for LLM calibration (None if not available).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent being measured

recorded_at pydantic-field

recorded_at

When the observation was recorded

delegation_success pydantic-field

delegation_success = None

Whether a delegation was successful

delegation_response_seconds pydantic-field

delegation_response_seconds = None

Response time for a delegation

conflict_constructiveness pydantic-field

conflict_constructiveness = None

How constructively conflict was handled

meeting_contribution pydantic-field

meeting_contribution = None

Quality of meeting contribution

loop_triggered pydantic-field

loop_triggered = False

Whether the agent triggered a delegation loop

handoff_completeness pydantic-field

handoff_completeness = None

Completeness of task handoff

interaction_summary pydantic-field

interaction_summary = None

Text summary of the interaction for LLM calibration

QualityScoreResult pydantic-model

Bases: BaseModel

Result of a quality scoring evaluation.

Attributes:

Name Type Description
score float

Overall quality score (0.0-10.0).

strategy_name NotBlankStr

Name of the scoring strategy used.

breakdown tuple[tuple[NotBlankStr, float], ...]

Score components as (name, value) pairs.

confidence float

Confidence in the score (0.0-1.0).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

score pydantic-field

score

Overall quality score

strategy_name pydantic-field

strategy_name

Scoring strategy used

breakdown pydantic-field

breakdown = ()

Score components as (name, value) pairs

confidence pydantic-field

confidence

Confidence in the score

CollaborationScoreResult pydantic-model

Bases: BaseModel

Result of a collaboration scoring evaluation.

Attributes:

Name Type Description
score float

Overall collaboration score (0.0-10.0).

strategy_name NotBlankStr

Name of the scoring strategy used.

component_scores tuple[tuple[NotBlankStr, float], ...]

Per-component scores as (name, value) pairs.

confidence float

Confidence in the score (0.0-1.0).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

score pydantic-field

score

Overall collaboration score

strategy_name pydantic-field

strategy_name

Scoring strategy used

component_scores pydantic-field

component_scores = ()

Per-component scores as (name, value) pairs

confidence pydantic-field

confidence

Confidence in the score

override_active pydantic-field

override_active = False

Whether a human override is active

LlmCalibrationRecord pydantic-model

Bases: BaseModel

Record of an LLM calibration sample for collaboration scoring.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent being evaluated.

sampled_at AwareDatetime

When the LLM evaluation occurred.

interaction_record_id NotBlankStr

ID of the sampled CollaborationMetricRecord.

llm_score float

LLM-assigned collaboration score (0.0-10.0).

behavioral_score float

Behavioral strategy score at time of sampling.

drift float

Absolute difference between LLM and behavioral scores (computed).

rationale NotBlankStr

LLM's explanation for the score.

model_used NotBlankStr

Which LLM model was used for evaluation.

cost float

Numeric cost of the LLM call, denominated in currency.

currency CurrencyCode

ISO 4217 currency code for cost.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent being evaluated

sampled_at pydantic-field

sampled_at

When the LLM evaluation occurred

interaction_record_id pydantic-field

interaction_record_id

ID of the sampled CollaborationMetricRecord

llm_score pydantic-field

llm_score

LLM-assigned collaboration score

behavioral_score pydantic-field

behavioral_score

Behavioral strategy score at time of sampling

drift property

drift

Absolute difference between LLM and behavioral scores.

rationale pydantic-field

rationale

LLM's explanation for the score

model_used pydantic-field

model_used

Which LLM model was used for evaluation

cost pydantic-field

cost

Numeric cost of the LLM call, denominated in currency

currency pydantic-field

currency

ISO 4217 currency code for cost

CollaborationOverride pydantic-model

Bases: _BaseOverride

Human-applied override for an agent's collaboration score.

Fields:

Validators:

  • _validate_expiration_ordering

QualityOverride pydantic-model

Bases: _BaseOverride

Human-applied override for an agent's quality score.

Fields:

Validators:

  • _validate_expiration_ordering

TrendResult pydantic-model

Bases: BaseModel

Result of a trend detection analysis.

Attributes:

Name Type Description
metric_name NotBlankStr

Name of the metric being trended.

window_size NotBlankStr

Time window label (e.g. '7d', '30d').

direction TrendDirection

Detected trend direction.

slope float

Computed slope of the trend line.

data_point_count int

Number of data points used.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

metric_name pydantic-field

metric_name

Metric being trended

window_size pydantic-field

window_size

Time window label

direction pydantic-field

direction

Detected trend direction

slope pydantic-field

slope

Slope of the trend line

data_point_count pydantic-field

data_point_count

Number of data points used

WindowMetrics pydantic-model

Bases: BaseModel

Aggregate metrics for a rolling time window.

Attributes:

Name Type Description
window_size NotBlankStr

Time window label (e.g. '7d', '30d').

data_point_count int

Number of records in the window.

tasks_completed int

Number of successful tasks.

tasks_failed int

Number of failed tasks.

avg_quality_score float | None

Average quality score, None if insufficient data.

avg_cost_per_task float | None

Average cost per task, None if insufficient data.

currency CurrencyCode | None

ISO 4217 currency code for avg_cost_per_task. Required whenever avg_cost_per_task is set; the reverse is not enforced -- a snapshot may carry a configured currency ahead of any cost signal (e.g. a freshly provisioned agent whose window has produced tasks but no LLM spend). See _validate_currency_presence for the validator contract.

avg_completion_time_seconds float | None

Average time, None if insufficient data.

avg_tokens_per_task float | None

Average tokens, None if insufficient data.

success_rate float | None

Task success rate (0.0-1.0), None if no tasks.

collaboration_score float | None

Collaboration score, None if not computed.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_task_counts
  • _validate_currency_presence

window_size pydantic-field

window_size

Time window label

data_point_count pydantic-field

data_point_count

Records in the window

tasks_completed pydantic-field

tasks_completed

Number of successful tasks

tasks_failed pydantic-field

tasks_failed

Number of failed tasks

avg_quality_score pydantic-field

avg_quality_score = None

Average quality score

avg_cost_per_task pydantic-field

avg_cost_per_task = None

Average cost per task, denominated in currency

currency pydantic-field

currency = None

ISO 4217 currency code for avg_cost_per_task; None when avg_cost_per_task is None

avg_completion_time_seconds pydantic-field

avg_completion_time_seconds = None

Average completion time

avg_tokens_per_task pydantic-field

avg_tokens_per_task = None

Average tokens per task

success_rate pydantic-field

success_rate = None

Task success rate

collaboration_score pydantic-field

collaboration_score = None

Collaboration score

CollaborationCalibration pydantic-model

Bases: BaseModel

Stable, ops-facing readout of the collaboration scoring strategy.

Returned by PerformanceTracker.get_collaboration_calibration for the MCP synthorg_collaboration_get_calibration tool. The shape is intentionally curated -- callers see strategy_name and the bounded component_weights tuple, but they do not see strategy- private internals. Swapping the underlying :class:~synthorg.hr.performance.protocols.CollaborationScoringStrategy therefore does not change the envelope shape MCP consumers depend on.

Attributes:

Name Type Description
agent_id NotBlankStr

The agent the calibration was computed for.

strategy_name NotBlankStr

Name of the active scoring strategy.

window_sizes tuple[NotBlankStr, ...]

Rolling-window labels the strategy aggregates over.

component_weights tuple[tuple[NotBlankStr, float], ...]

Ordered tuple of (component_name, weight) pairs, where component_name is a stable identifier like "handoff_acceptance". A tuple-of-tuples (rather than a dict) is used so the wire shape is fully ordered and stays JSON-serialisable; callers that need lookup semantics should convert via dict(component_weights) at the call site.

active_override CollaborationOverride | None

Currently-active human override, if any.

sample_size int

Number of collaboration samples backing the active window. 0 is valid (cold-start agents).

last_calibrated_at AwareDatetime | None

Timestamp of the most recent calibration sample, or None when no samples exist.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_id pydantic-field

agent_id

Agent identifier

strategy_name pydantic-field

strategy_name

Active strategy name

window_sizes pydantic-field

window_sizes = ()

Rolling-window labels aggregated over

component_weights pydantic-field

component_weights = ()

Per-component weights as (name, weight) pairs

active_override pydantic-field

active_override = None

Active human override, when present

sample_size pydantic-field

sample_size

Number of collaboration samples available

last_calibrated_at pydantic-field

last_calibrated_at = None

Timestamp of the most recent calibration sample

AgentPerformanceSnapshot pydantic-model

Bases: BaseModel

Complete performance snapshot for an agent at a point in time.

Attributes:

Name Type Description
agent_id NotBlankStr

The agent being evaluated.

computed_at AwareDatetime

When this snapshot was computed.

windows tuple[WindowMetrics, ...]

Rolling window metrics.

trends tuple[TrendResult, ...]

Detected trends per metric.

overall_quality_score float | None

Aggregate quality score.

overall_collaboration_score float | None

Aggregate collaboration score.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

agent_id pydantic-field

agent_id

Agent being evaluated

computed_at pydantic-field

computed_at

When this snapshot was computed

windows pydantic-field

windows = ()

Rolling window metrics

trends pydantic-field

trends = ()

Detected trends per metric

overall_quality_score pydantic-field

overall_quality_score = None

Aggregate quality score

overall_collaboration_score pydantic-field

overall_collaboration_score = None

Aggregate collaboration score

tracker

Performance tracker service.

Central service for recording and querying agent performance metrics. Delegates scoring, windowing, and trend detection to pluggable strategies.

PerformanceTracker

PerformanceTracker(
    *,
    quality_strategy=None,
    collaboration_strategy=None,
    window_strategy=None,
    trend_strategy=None,
    config=None,
    sampler=None,
    override_store=None,
    quality_override_store=None,
    inflection_sink=None,
)

Central service for recording and querying agent performance metrics.

In-memory storage keyed by agent_id. Delegates scoring, windowing, and trend detection to injected strategy implementations.

When strategies are not provided, sensible defaults are constructed (window and trend strategies use values from PerformanceConfig).

Parameters:

Name Type Description Default
quality_strategy QualityScoringStrategy | None

Strategy for scoring task quality.

None
collaboration_strategy CollaborationScoringStrategy | None

Strategy for scoring collaboration.

None
window_strategy MetricsWindowStrategy | None

Strategy for computing rolling windows.

None
trend_strategy TrendDetectionStrategy | None

Strategy for detecting trends.

None
config PerformanceConfig | None

Performance tracking configuration.

None
sampler LlmCalibrationSampler | None

LLM calibration sampler (None = disabled).

None
override_store CollaborationOverrideStore | None

Collaboration override store (None = disabled).

None
quality_override_store QualityOverrideStore | None

Quality override store (None = disabled).

None
Source code in src/synthorg/hr/performance/tracker.py
def __init__(  # noqa: PLR0913
    self,
    *,
    quality_strategy: QualityScoringStrategy | None = None,
    collaboration_strategy: CollaborationScoringStrategy | None = None,
    window_strategy: MetricsWindowStrategy | None = None,
    trend_strategy: TrendDetectionStrategy | None = None,
    config: PerformanceConfig | None = None,
    sampler: LlmCalibrationSampler | None = None,
    override_store: CollaborationOverrideStore | None = None,
    quality_override_store: QualityOverrideStore | None = None,
    inflection_sink: InflectionSink | None = None,
) -> None:
    cfg = config or PerformanceConfig()
    self._config = cfg
    self._quality_strategy = quality_strategy or self._default_quality()
    self._collaboration_strategy = (
        collaboration_strategy or self._default_collaboration(cfg)
    )
    self._window_strategy = window_strategy or self._default_window(cfg)
    self._trend_strategy = trend_strategy or self._default_trend(cfg)
    self._sampler = sampler
    self._override_store = override_store
    self._quality_override_store = quality_override_store
    self._inflection_sink = inflection_sink
    self._trend_direction_cache: dict[tuple[str, str, str], TrendDirection] = {}
    self._task_metrics: dict[str, list[TaskMetricRecord]] = {}
    self._collab_metrics: dict[str, list[CollaborationMetricRecord]] = {}
    self._contributions: dict[str, list[AgentContribution]] = {}
    self._background_tasks: set[asyncio.Task[None]] = set()
    self._metrics_lock = asyncio.Lock()
    # Set to True while ``aclose()`` is draining so new background
    # tasks cannot be enqueued between the task-set snapshot and
    # the clear. Guarded by ``_metrics_lock`` on both read and
    # write sides.
    self._closing: bool = False

override_store property

override_store

Return the collaboration override store, if configured.

quality_override_store property

quality_override_store

Return the quality override store, if configured.

sampler property

sampler

Return the LLM calibration sampler, if configured.

inflection_sink property writable

inflection_sink

Return the inflection sink, if configured.

clear

clear()

Reset all recorded metrics for test isolation.

Cancels pending background tasks via Task.cancel() but does not await them. This is a synchronous method intended for use in sync test fixtures (where no running event loop is available, or where the previous test's event loop is already closed and awaiting would fail). Any in-flight work in those tasks is discarded.

For production shutdown where tasks must drain cleanly, use :meth:aclose instead -- it cancels and awaits.

Source code in src/synthorg/hr/performance/tracker.py
def clear(self) -> None:
    """Reset all recorded metrics for test isolation.

    Cancels pending background tasks via ``Task.cancel()`` but does
    **not** await them.  This is a synchronous method intended for
    use in sync test fixtures (where no running event loop is
    available, or where the previous test's event loop is already
    closed and awaiting would fail).  Any in-flight work in those
    tasks is discarded.

    For production shutdown where tasks must drain cleanly, use
    :meth:`aclose` instead -- it cancels and awaits.
    """
    tasks_cancelled = len(self._background_tasks)
    task_metrics_cleared = len(self._task_metrics)
    collab_metrics_cleared = len(self._collab_metrics)
    contributions_cleared = len(self._contributions)
    trend_cache_cleared = len(self._trend_direction_cache)
    # Iterate over a snapshot: task done-callbacks remove from the
    # set, so iterating the live set would raise
    # ``RuntimeError: set changed size during iteration``.
    for t in list(self._background_tasks):
        t.cancel()
    self._background_tasks.clear()
    self._task_metrics.clear()
    self._collab_metrics.clear()
    self._contributions.clear()
    self._trend_direction_cache.clear()
    logger.info(
        PERF_TRACKER_CLEARED,
        tasks_cancelled=tasks_cancelled,
        task_metrics_cleared=task_metrics_cleared,
        collab_metrics_cleared=collab_metrics_cleared,
        contributions_cleared=contributions_cleared,
        trend_cache_cleared=trend_cache_cleared,
    )

aclose async

aclose()

Cancel and await all pending background tasks.

Should be called during application shutdown to prevent RuntimeError: Task was destroyed but it is pending! warnings.

Sets _closing under _metrics_lock before snapshotting so concurrent record_collaboration_event / get_snapshot calls (which schedule under the same lock) refuse to enqueue new background tasks once shutdown has started. Without that gate a task scheduled right after the snapshot would survive aclose() with the result that the caller sees aclose() returned while a live sampling / inflection task keeps running and can still repopulate cache state.

Source code in src/synthorg/hr/performance/tracker.py
async def aclose(self) -> None:
    """Cancel and await all pending background tasks.

    Should be called during application shutdown to prevent
    ``RuntimeError: Task was destroyed but it is pending!``
    warnings.

    Sets ``_closing`` under ``_metrics_lock`` before snapshotting
    so concurrent ``record_collaboration_event`` / ``get_snapshot``
    calls (which schedule under the same lock) refuse to enqueue
    new background tasks once shutdown has started. Without that
    gate a task scheduled right after the snapshot would survive
    aclose() with the result that the caller sees
    ``aclose() returned`` while a live sampling / inflection task
    keeps running and can still repopulate cache state.
    """
    async with self._metrics_lock:
        self._closing = True
        tasks = list(self._background_tasks)
        self._background_tasks.clear()
    for t in tasks:
        t.cancel()
    results = await asyncio.gather(*tasks, return_exceptions=True)
    # Preserve system-error signals: ``_maybe_sample`` and
    # ``_do_emit_inflections`` explicitly re-raise MemoryError /
    # RecursionError (and other BaseException subclasses other
    # than CancelledError). Discarding them here would silently
    # mask OS-level failures; log unexpected non-cancellation
    # exceptions and re-raise the first BaseException seen so the
    # lifecycle layer can surface it.
    system_error: BaseException | None = None
    for result in results:
        if not isinstance(result, BaseException):
            continue
        if isinstance(result, asyncio.CancelledError):
            continue
        if isinstance(result, Exception):
            logger.warning(
                PERF_BACKGROUND_TASK_FAILED,
                error_type=type(result).__name__,
            )
            continue
        if system_error is None:
            system_error = result
    if system_error is not None:
        raise system_error

aclear async

aclear()

Async-safe reset of all recorded metrics.

Acquires _metrics_lock so no recorder can observe a partial clear and no reader can race the mutation of _task_metrics / _collab_metrics / _contributions / _trend_direction_cache. Cancels pending background tasks without awaiting them (matches :meth:clear semantics) so the call is cheap in hot tests.

Production callers that must drain outstanding tasks cleanly should call :meth:aclose instead.

Source code in src/synthorg/hr/performance/tracker.py
async def aclear(self) -> None:
    """Async-safe reset of all recorded metrics.

    Acquires ``_metrics_lock`` so no recorder can observe a partial
    clear and no reader can race the mutation of ``_task_metrics``
    / ``_collab_metrics`` / ``_contributions`` /
    ``_trend_direction_cache``. Cancels pending background tasks
    *without* awaiting them (matches :meth:`clear` semantics) so
    the call is cheap in hot tests.

    Production callers that must drain outstanding tasks cleanly
    should call :meth:`aclose` instead.
    """
    async with self._metrics_lock:
        tasks_cancelled = len(self._background_tasks)
        task_metrics_cleared = len(self._task_metrics)
        collab_metrics_cleared = len(self._collab_metrics)
        contributions_cleared = len(self._contributions)
        trend_cache_cleared = len(self._trend_direction_cache)
        for t in list(self._background_tasks):
            t.cancel()
        self._background_tasks.clear()
        self._task_metrics.clear()
        self._collab_metrics.clear()
        self._contributions.clear()
        self._trend_direction_cache.clear()
    logger.info(
        PERF_TRACKER_CLEARED,
        tasks_cancelled=tasks_cancelled,
        task_metrics_cleared=task_metrics_cleared,
        collab_metrics_cleared=collab_metrics_cleared,
        contributions_cleared=contributions_cleared,
        trend_cache_cleared=trend_cache_cleared,
    )

record_task_metric async

record_task_metric(record)

Record a task completion metric.

Parameters:

Name Type Description Default
record TaskMetricRecord

The task metric record to store.

required

Returns:

Type Description
TaskMetricRecord

The stored record.

Source code in src/synthorg/hr/performance/tracker.py
async def record_task_metric(
    self,
    record: TaskMetricRecord,
) -> TaskMetricRecord:
    """Record a task completion metric.

    Args:
        record: The task metric record to store.

    Returns:
        The stored record.
    """
    async with self._metrics_lock:
        agent_key = str(record.agent_id)
        if agent_key not in self._task_metrics:
            self._task_metrics[agent_key] = []
        self._task_metrics[agent_key].append(record)

    logger.info(
        PERF_METRIC_RECORDED,
        agent_id=record.agent_id,
        task_id=record.task_id,
        is_success=record.is_success,
    )
    return record

record_coordination_contributions async

record_coordination_contributions(contributions)

Store per-agent contributions from coordination.

Parameters:

Name Type Description Default
contributions tuple[AgentContribution, ...]

Attribution records from a coordinated run.

required
Source code in src/synthorg/hr/performance/tracker.py
async def record_coordination_contributions(
    self,
    contributions: tuple[AgentContribution, ...],
) -> None:
    """Store per-agent contributions from coordination.

    Args:
        contributions: Attribution records from a coordinated run.
    """
    async with self._metrics_lock:
        for contrib in contributions:
            agent_key = str(contrib.agent_id)
            self._contributions.setdefault(agent_key, []).append(contrib)

    if contributions:
        logger.info(
            PERF_METRIC_RECORDED,
            contribution_count=len(contributions),
            avg_score=round(
                sum(c.contribution_score for c in contributions)
                / len(contributions),
                3,
            ),
        )

score_task_quality async

score_task_quality(*, agent_id, task_id, task_result, acceptance_criteria=())

Score task quality and update the record.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent who completed the task.

required
task_id NotBlankStr

Task identifier.

required
task_result TaskMetricRecord

Recorded task metrics.

required
acceptance_criteria tuple[AcceptanceCriterion, ...]

Criteria to evaluate against.

()

Returns:

Type Description
TaskMetricRecord

Updated record with quality score.

Source code in src/synthorg/hr/performance/tracker.py
async def score_task_quality(
    self,
    *,
    agent_id: NotBlankStr,
    task_id: NotBlankStr,
    task_result: TaskMetricRecord,
    acceptance_criteria: tuple[AcceptanceCriterion, ...] = (),
) -> TaskMetricRecord:
    """Score task quality and update the record.

    Args:
        agent_id: Agent who completed the task.
        task_id: Task identifier.
        task_result: Recorded task metrics.
        acceptance_criteria: Criteria to evaluate against.

    Returns:
        Updated record with quality score.
    """
    result = await self._quality_strategy.score(
        agent_id=agent_id,
        task_id=task_id,
        task_result=task_result,
        acceptance_criteria=acceptance_criteria,
    )
    return task_result.model_copy(update={"quality_score": result.score})

record_collaboration_event async

record_collaboration_event(record)

Record a collaboration behavior data point.

If an LLM sampler is configured and the record has an interaction_summary, the sampler is invoked probabilistically.

Parameters:

Name Type Description Default
record CollaborationMetricRecord

Collaboration metric record to store.

required
Source code in src/synthorg/hr/performance/tracker.py
async def record_collaboration_event(
    self,
    record: CollaborationMetricRecord,
) -> None:
    """Record a collaboration behavior data point.

    If an LLM sampler is configured and the record has an
    ``interaction_summary``, the sampler is invoked probabilistically.

    Args:
        record: Collaboration metric record to store.
    """
    agent_key = str(record.agent_id)
    async with self._metrics_lock:
        if agent_key not in self._collab_metrics:
            self._collab_metrics[agent_key] = []
        self._collab_metrics[agent_key].append(record)
        # Schedule inside the lock so a concurrent aclear() cannot
        # snapshot the tasks, cancel, and return before this task
        # is added to ``_background_tasks`` -- otherwise the new
        # task would survive the clear and could repopulate cache
        # state after aclear() returned.
        self._schedule_sampling(record)

    logger.debug(
        PERF_METRIC_RECORDED,
        agent_id=record.agent_id,
        metric_type="collaboration",
    )

get_collaboration_score async

get_collaboration_score(agent_id, *, now=None)

Compute collaboration score for an agent.

Returns the active human override if one exists; otherwise delegates to the collaboration scoring strategy.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
now AwareDatetime | None

Reference time for override expiration check (defaults to current UTC time).

None

Returns:

Type Description
CollaborationScoreResult

Collaboration score result.

Source code in src/synthorg/hr/performance/tracker.py
async def get_collaboration_score(
    self,
    agent_id: NotBlankStr,
    *,
    now: AwareDatetime | None = None,
) -> CollaborationScoreResult:
    """Compute collaboration score for an agent.

    Returns the active human override if one exists; otherwise
    delegates to the collaboration scoring strategy.

    Args:
        agent_id: Agent to evaluate.
        now: Reference time for override expiration check
            (defaults to current UTC time).

    Returns:
        Collaboration score result.
    """
    if self._override_store is not None:
        override = self._override_store.get_active_override(
            agent_id,
            now=now,
        )
        if override is not None:
            logger.info(
                PERF_OVERRIDE_APPLIED,
                agent_id=agent_id,
                score=override.score,
                applied_by=override.applied_by,
            )
            return CollaborationScoreResult(
                score=override.score,
                strategy_name=NotBlankStr("human_override"),
                component_scores=(),
                confidence=1.0,
                override_active=True,
            )

    # Snapshot under the lock so a future refactor that introduces
    # an ``await`` between the dict read and the tuple copy cannot
    # tear the records list. Strategy scoring runs *outside* the
    # lock -- it may do unbounded work and must not serialize
    # concurrent record writes.
    async with self._metrics_lock:
        records = tuple(self._collab_metrics.get(str(agent_id), []))
    return await self._collaboration_strategy.score(
        agent_id=agent_id,
        records=records,
    )

get_collaboration_calibration async

get_collaboration_calibration(agent_id)

Return a stable calibration readout for an agent.

The shape is deliberately curated -- strategy_name and the bounded component_weights map describe the active scoring strategy without leaking strategy-private internals. Swapping the underlying strategy never changes the envelope shape.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to read calibration for.

required

Returns:

Type Description
CollaborationCalibration

CollaborationCalibration covering the active strategy,

CollaborationCalibration

window labels, sample size, override (if any), and last

CollaborationCalibration

calibration timestamp.

Source code in src/synthorg/hr/performance/tracker.py
async def get_collaboration_calibration(
    self,
    agent_id: NotBlankStr,
) -> CollaborationCalibration:
    """Return a stable calibration readout for an agent.

    The shape is deliberately curated -- ``strategy_name`` and the
    bounded ``component_weights`` map describe the active scoring
    strategy without leaking strategy-private internals. Swapping
    the underlying strategy never changes the envelope shape.

    Args:
        agent_id: Agent to read calibration for.

    Returns:
        ``CollaborationCalibration`` covering the active strategy,
        window labels, sample size, override (if any), and last
        calibration timestamp.
    """
    strategy = self._collaboration_strategy
    strategy_name = NotBlankStr(strategy.name)

    # ``describe_weights`` is optional on the protocol; empty tuple
    # is a valid response for strategies that do not advertise
    # weights (or for hand-rolled stubs in tests).
    describe = getattr(strategy, "describe_weights", None)
    weights: tuple[tuple[NotBlankStr, float], ...] = ()
    if callable(describe):
        try:
            raw = describe()
            weights = _coerce_finite_weights(raw)
        except MemoryError, RecursionError:
            raise
        except Exception as exc:
            # Coercion failures (malformed pair, blank component name,
            # non-numeric weight) are folded into the same fail-soft
            # path as ``describe_weights`` raising directly so one bad
            # strategy descriptor cannot take down the whole calibration
            # readout. The caller surfaces ``component_weights=()`` and
            # the warning lets ops trace the source.
            logger.warning(
                PERF_SNAPSHOT_FAILED,
                agent_id=str(agent_id),
                error=safe_error_description(exc),
                error_type=type(exc).__name__,
                where="describe_weights",
            )
            weights = ()

    active_override: CollaborationOverride | None = None
    if self._override_store is not None:
        active_override = self._override_store.get_active_override(agent_id)

    async with self._metrics_lock:
        records = tuple(self._collab_metrics.get(str(agent_id), []))
    last_calibrated_at: AwareDatetime | None = None
    if records:
        last_calibrated_at = max(r.recorded_at for r in records)

    return CollaborationCalibration(
        agent_id=agent_id,
        strategy_name=strategy_name,
        window_sizes=tuple(self._config.windows),
        component_weights=weights,
        active_override=active_override,
        sample_size=len(records),
        last_calibrated_at=last_calibrated_at,
    )

get_snapshots async

get_snapshots(agent_ids, *, now=None)

Compute performance snapshots for a batch of agents.

Order-preserving: the returned tuple has one entry per input id in the same order. Entries are None when snapshot computation raises (e.g. insufficient data, strategy error). Single-agent log emissions are preserved so existing observability pipelines keep working.

Parameters:

Name Type Description Default
agent_ids tuple[NotBlankStr, ...]

Ordered tuple of agent identifiers.

required
now AwareDatetime | None

Reference time (defaults to current UTC time).

None

Returns:

Type Description
tuple[AgentPerformanceSnapshot | None, ...]

Tuple of snapshots (or None on failure) in input order.

Raises:

Type Description
ValueError

If len(agent_ids) exceeds MAX_BATCH_SNAPSHOTS_LOOKUP. Snapshot computation is O(N) in the batch size; an unbounded batch from a user-controllable caller would let a single request monopolise scoring / window / trend work.

Source code in src/synthorg/hr/performance/tracker.py
async def get_snapshots(
    self,
    agent_ids: tuple[NotBlankStr, ...],
    *,
    now: AwareDatetime | None = None,
) -> tuple[AgentPerformanceSnapshot | None, ...]:
    """Compute performance snapshots for a batch of agents.

    Order-preserving: the returned tuple has one entry per input
    id in the same order.  Entries are ``None`` when snapshot
    computation raises (e.g. insufficient data, strategy error).
    Single-agent log emissions are preserved so existing
    observability pipelines keep working.

    Args:
        agent_ids: Ordered tuple of agent identifiers.
        now: Reference time (defaults to current UTC time).

    Returns:
        Tuple of snapshots (or ``None`` on failure) in input order.

    Raises:
        ValueError: If ``len(agent_ids)`` exceeds
            ``MAX_BATCH_SNAPSHOTS_LOOKUP``.  Snapshot computation is
            O(N) in the batch size; an unbounded batch from a
            user-controllable caller would let a single request
            monopolise scoring / window / trend work.
    """
    if not agent_ids:
        return ()
    if len(agent_ids) > MAX_BATCH_SNAPSHOTS_LOOKUP:
        msg = (
            f"get_snapshots batch of {len(agent_ids)} exceeds "
            f"MAX_BATCH_SNAPSHOTS_LOOKUP={MAX_BATCH_SNAPSHOTS_LOOKUP}"
        )
        raise ValueError(msg)
    results: list[AgentPerformanceSnapshot | None] = []
    for agent_id in agent_ids:
        try:
            snapshot = await self.get_snapshot(agent_id, now=now)
        except MemoryError, RecursionError:
            raise
        except Exception as exc:
            logger.warning(
                PERF_SNAPSHOT_FAILED,
                agent_id=str(agent_id),
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
            )
            results.append(None)
        else:
            results.append(snapshot)
    return tuple(results)

get_snapshot async

get_snapshot(agent_id, *, now=None)

Compute a full performance snapshot for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
now AwareDatetime | None

Reference time (defaults to current UTC time).

None

Returns:

Type Description
AgentPerformanceSnapshot

Complete performance snapshot with windows and trends.

Source code in src/synthorg/hr/performance/tracker.py
async def get_snapshot(
    self,
    agent_id: NotBlankStr,
    *,
    now: AwareDatetime | None = None,
) -> AgentPerformanceSnapshot:
    """Compute a full performance snapshot for an agent.

    Args:
        agent_id: Agent to evaluate.
        now: Reference time (defaults to current UTC time).

    Returns:
        Complete performance snapshot with windows and trends.
    """
    if now is None:
        now = datetime.now(UTC)

    agent_key = str(agent_id)
    task_records = tuple(self._task_metrics.get(agent_key, []))

    # Compute windows.
    windows = self._window_strategy.compute_windows(
        task_records,
        now=now,
    )

    # Compute trends for quality and cost metrics.
    trends = self._compute_trends(task_records, windows, now=now)

    # Emit inflection events for trend direction changes.
    if self._inflection_sink is not None and trends:
        # Schedule inside the lock so a concurrent aclear() cannot
        # snapshot the tasks, cancel, and return before this task
        # is added to ``_background_tasks`` -- otherwise the new
        # task would survive the clear and could repopulate
        # ``_trend_direction_cache`` after aclear() returned.
        async with self._metrics_lock:
            self._schedule_inflection_emission(agent_id, trends)

    # Overall quality: average of all scored records.
    scored = [r.quality_score for r in task_records if r.quality_score is not None]
    overall_quality = round(sum(scored) / len(scored), 4) if scored else None

    # Overall collaboration score (respects active overrides).
    collab_result = await self.get_collaboration_score(
        agent_id,
        now=now,
    )
    overall_collab = collab_result.score if collab_result.confidence > 0.0 else None

    snapshot = AgentPerformanceSnapshot(
        agent_id=agent_id,
        computed_at=now,
        windows=windows,
        trends=tuple(trends),
        overall_quality_score=overall_quality,
        overall_collaboration_score=overall_collab,
    )

    logger.info(
        PERF_SNAPSHOT_COMPUTED,
        agent_id=agent_id,
        window_count=len(windows),
        trend_count=len(trends),
    )
    return snapshot

get_task_metrics

get_task_metrics(*, agent_id=None, since=None, until=None)

Query raw task metric records with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent.

None
since AwareDatetime | None

Include records after this time.

None
until AwareDatetime | None

Include records before this time.

None

Returns:

Type Description
tuple[TaskMetricRecord, ...]

Matching task metric records.

Source code in src/synthorg/hr/performance/tracker.py
def get_task_metrics(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    since: AwareDatetime | None = None,
    until: AwareDatetime | None = None,
) -> tuple[TaskMetricRecord, ...]:
    """Query raw task metric records with optional filters.

    Args:
        agent_id: Filter by agent.
        since: Include records after this time.
        until: Include records before this time.

    Returns:
        Matching task metric records.
    """
    if agent_id is not None:
        records = list(self._task_metrics.get(str(agent_id), []))
    else:
        records = [r for recs in self._task_metrics.values() for r in recs]

    if since is not None:
        records = [r for r in records if r.completed_at >= since]
    if until is not None:
        records = [r for r in records if r.completed_at < until]
    return tuple(records)

get_collaboration_metrics

get_collaboration_metrics(*, agent_id=None, since=None, until=None)

Query collaboration metric records with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent.

None
since AwareDatetime | None

Include records after this time.

None
until AwareDatetime | None

Include records before this time.

None

Returns:

Type Description
tuple[CollaborationMetricRecord, ...]

Matching collaboration metric records.

Source code in src/synthorg/hr/performance/tracker.py
def get_collaboration_metrics(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    since: AwareDatetime | None = None,
    until: AwareDatetime | None = None,
) -> tuple[CollaborationMetricRecord, ...]:
    """Query collaboration metric records with optional filters.

    Args:
        agent_id: Filter by agent.
        since: Include records after this time.
        until: Include records before this time.

    Returns:
        Matching collaboration metric records.
    """
    if agent_id is not None:
        records = list(self._collab_metrics.get(str(agent_id), []))
    else:
        records = [r for recs in self._collab_metrics.values() for r in recs]

    if since is not None:
        records = [r for r in records if r.recorded_at >= since]
    if until is not None:
        records = [r for r in records if r.recorded_at < until]
    return tuple(records)

set_inflection_sink async

set_inflection_sink(value)

Atomically set the inflection sink under _metrics_lock.

The async counterpart to the sync :attr:inflection_sink setter. Two concurrent callers will be serialized; exactly one succeeds, the loser raises ValueError. Use this from any async context where concurrent binding is possible (task engine observers, rolling evolution triggers, etc.).

Parameters:

Name Type Description Default
value InflectionSink | None

The inflection sink to assign.

required

Raises:

Type Description
ValueError

If an inflection sink is already configured.

Source code in src/synthorg/hr/performance/tracker.py
async def set_inflection_sink(self, value: InflectionSink | None) -> None:
    """Atomically set the inflection sink under ``_metrics_lock``.

    The async counterpart to the sync :attr:`inflection_sink`
    setter. Two concurrent callers will be serialized; exactly one
    succeeds, the loser raises ``ValueError``. Use this from any
    async context where concurrent binding is possible (task
    engine observers, rolling evolution triggers, etc.).

    Args:
        value: The inflection sink to assign.

    Raises:
        ValueError: If an inflection sink is already configured.
    """
    async with self._metrics_lock:
        if self._inflection_sink is not None and value is not None:
            logger.warning(
                PERF_INFLECTION_SINK_BIND_REJECTED,
                reason="already_configured",
                path="async_setter",
            )
            msg = "Inflection sink is already configured"
            raise ValueError(msg)
        self._inflection_sink = value
        if value is None:
            logger.info(PERF_INFLECTION_SINK_CLEARED, path="async_setter")
        else:
            logger.info(PERF_INFLECTION_SINK_BOUND, path="async_setter")

quality_protocol

Quality scoring strategy protocol.

Defines the interface for pluggable quality scoring strategies that evaluate task completion quality (see Agents design page, D2).

QualityScoringStrategy

Bases: Protocol

Strategy for scoring task completion quality.

Implementations evaluate task results against acceptance criteria and other quality signals to produce a normalized score.

name property

name

Human-readable strategy name.

score async

score(*, agent_id, task_id, task_result, acceptance_criteria)

Score task completion quality.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent who completed the task.

required
task_id NotBlankStr

Task identifier.

required
task_result TaskMetricRecord

Recorded task metrics.

required
acceptance_criteria tuple[AcceptanceCriterion, ...]

Criteria to evaluate against.

required

Returns:

Type Description
QualityScoreResult

Quality score result with breakdown and confidence.

Source code in src/synthorg/hr/performance/quality_protocol.py
async def score(
    self,
    *,
    agent_id: NotBlankStr,
    task_id: NotBlankStr,
    task_result: TaskMetricRecord,
    acceptance_criteria: tuple[AcceptanceCriterion, ...],
) -> QualityScoreResult:
    """Score task completion quality.

    Args:
        agent_id: Agent who completed the task.
        task_id: Task identifier.
        task_result: Recorded task metrics.
        acceptance_criteria: Criteria to evaluate against.

    Returns:
        Quality score result with breakdown and confidence.
    """
    ...

collaboration_protocol

Collaboration scoring strategy protocol.

Defines the interface for pluggable collaboration scoring strategies that evaluate agent collaboration behavior (see Agents design page, D3).

CollaborationScoringStrategy

Bases: Protocol

Strategy for scoring agent collaboration behavior.

Implementations evaluate behavioral telemetry records to produce a normalized collaboration score.

name property

name

Human-readable strategy name.

score async

score(*, agent_id, records, role_weights=None)

Score agent collaboration behavior.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent being evaluated.

required
records tuple[CollaborationMetricRecord, ...]

Collaboration metric records to evaluate.

required
role_weights dict[str, float] | None

Optional per-component weight overrides.

None

Returns:

Type Description
CollaborationScoreResult

Collaboration score result with component scores.

Source code in src/synthorg/hr/performance/collaboration_protocol.py
async def score(
    self,
    *,
    agent_id: NotBlankStr,
    records: tuple[CollaborationMetricRecord, ...],
    role_weights: dict[str, float] | None = None,
) -> CollaborationScoreResult:
    """Score agent collaboration behavior.

    Args:
        agent_id: Agent being evaluated.
        records: Collaboration metric records to evaluate.
        role_weights: Optional per-component weight overrides.

    Returns:
        Collaboration score result with component scores.
    """
    ...

trend_protocol

Trend detection strategy protocol.

Defines the interface for pluggable trend detection strategies that analyze metric time series (see Agents design page, D12).

TrendDetectionStrategy

Bases: Protocol

Strategy for detecting trends in metric time series.

Implementations analyze sequences of (timestamp, value) pairs to determine whether a metric is improving, stable, or declining.

name property

name

Human-readable strategy name.

detect

detect(*, metric_name, values, window_size)

Detect the trend direction in a metric time series.

Parameters:

Name Type Description Default
metric_name NotBlankStr

Name of the metric being analyzed.

required
values tuple[tuple[AwareDatetime, float], ...]

Time series data as (timestamp, value) pairs.

required
window_size NotBlankStr

Time window label for context.

required

Returns:

Type Description
TrendResult

Trend detection result with direction and slope.

Source code in src/synthorg/hr/performance/trend_protocol.py
def detect(
    self,
    *,
    metric_name: NotBlankStr,
    values: tuple[tuple[AwareDatetime, float], ...],
    window_size: NotBlankStr,
) -> TrendResult:
    """Detect the trend direction in a metric time series.

    Args:
        metric_name: Name of the metric being analyzed.
        values: Time series data as (timestamp, value) pairs.
        window_size: Time window label for context.

    Returns:
        Trend detection result with direction and slope.
    """
    ...

Promotion

config

Promotion configuration models.

Defines PromotionConfig and sub-configs for controlling promotion/demotion behavior.

PromotionCriteriaConfig pydantic-model

Bases: BaseModel

Configuration for promotion criteria evaluation.

Attributes:

Name Type Description
min_criteria_met int

Minimum number of criteria that must be met.

required_criteria tuple[NotBlankStr, ...]

Criteria names that must always be met.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

min_criteria_met pydantic-field

min_criteria_met = 2

Minimum number of criteria that must be met (max 3)

required_criteria pydantic-field

required_criteria = ()

Criteria names that must always be met

PromotionApprovalConfig pydantic-model

Bases: BaseModel

Configuration for promotion approval decisions.

Attributes:

Name Type Description
human_approval_from_level SeniorityLevel

Seniority level from which human approval is required for promotion.

auto_demote_cost_saving bool

Auto-apply cost-saving demotions.

human_demote_authority bool

Require human approval for authority-reducing demotions.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

human_approval_from_level pydantic-field

human_approval_from_level = SENIOR

Level from which human approval is required

auto_demote_cost_saving pydantic-field

auto_demote_cost_saving = True

Auto-apply cost-saving demotions

human_demote_authority pydantic-field

human_demote_authority = True

Human approval for authority-reducing demotions

ModelMappingConfig pydantic-model

Bases: BaseModel

Configuration for model mapping on seniority changes.

Attributes:

Name Type Description
model_follows_seniority bool

Whether model changes with seniority.

seniority_model_map Mapping[str, str]

Explicit level-to-model overrides.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_model_map_keys

model_follows_seniority pydantic-field

model_follows_seniority = True

Whether model follows seniority level

seniority_model_map pydantic-field

seniority_model_map

Explicit seniority level to model ID overrides (wrapped as MappingProxyType after validation)

PromotionConfig pydantic-model

Bases: BaseModel

Top-level promotion/demotion configuration.

Attributes:

Name Type Description
enabled bool

Whether the promotion subsystem is enabled.

cooldown_hours int

Hours between consecutive promotions/demotions.

criteria PromotionCriteriaConfig

Promotion criteria configuration.

approval PromotionApprovalConfig

Promotion approval configuration.

model_mapping ModelMappingConfig

Model mapping configuration.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

enabled pydantic-field

enabled = True

Whether the promotion subsystem is enabled

cooldown_hours pydantic-field

cooldown_hours = 24

Hours between consecutive promotions/demotions

criteria pydantic-field

criteria

Promotion criteria configuration

approval pydantic-field

approval

Promotion approval configuration

model_mapping pydantic-field

model_mapping

Model mapping configuration

models

Promotion domain models.

Frozen Pydantic models for promotion criteria results, evaluations, approval decisions, records, and requests.

CriterionResult pydantic-model

Bases: BaseModel

Result of a single promotion/demotion criterion evaluation.

Attributes:

Name Type Description
name NotBlankStr

Criterion name.

met bool

Whether the criterion was met.

current_value float

Agent's current value for this criterion.

threshold float

Required threshold value.

weight float | None

Weight of this criterion (None if not weighted).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

name pydantic-field

name

Criterion name

met pydantic-field

met

Whether the criterion was met

current_value pydantic-field

current_value

Agent's current value

threshold pydantic-field

threshold

Required threshold value

weight pydantic-field

weight = None

Weight of this criterion

PromotionEvaluation pydantic-model

Bases: BaseModel

Result of evaluating an agent for promotion or demotion.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent being evaluated.

current_level SeniorityLevel

Current seniority level.

target_level SeniorityLevel

Target seniority level.

direction PromotionDirection

Whether this is a promotion or demotion.

criteria_results tuple[CriterionResult, ...]

Individual criterion results.

required_criteria_met bool

Whether all required criteria are met.

eligible bool

Whether the agent is eligible for the change.

evaluated_at AwareDatetime

When the evaluation was performed.

strategy_name NotBlankStr

Strategy that performed the evaluation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_direction_consistency

agent_id pydantic-field

agent_id

Agent being evaluated

current_level pydantic-field

current_level

Current seniority level

target_level pydantic-field

target_level

Target seniority level

direction pydantic-field

direction

Promotion or demotion

criteria_results pydantic-field

criteria_results = ()

Individual criterion results

required_criteria_met pydantic-field

required_criteria_met

Whether all required criteria are met

eligible pydantic-field

eligible

Whether the agent is eligible for the change

evaluated_at pydantic-field

evaluated_at

When the evaluation was performed

strategy_name pydantic-field

strategy_name

Strategy that performed the evaluation

criteria_met_count property

criteria_met_count

Number of criteria that were met.

PromotionApprovalDecision pydantic-model

Bases: BaseModel

Decision on whether a promotion needs human approval.

Attributes:

Name Type Description
auto_approve bool

Whether the promotion can be auto-approved.

reason NotBlankStr

Explanation for the decision.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

auto_approve pydantic-field

auto_approve

Whether auto-approved

reason pydantic-field

reason

Explanation for the decision

requires_human property

requires_human

Whether human approval is required (inverse of auto_approve).

PromotionRecord pydantic-model

Bases: BaseModel

Record of a completed promotion or demotion.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent who was promoted/demoted.

agent_name NotBlankStr

Agent display name.

old_level SeniorityLevel

Previous seniority level.

new_level SeniorityLevel

New seniority level.

direction PromotionDirection

Whether this was a promotion or demotion.

evaluation PromotionEvaluation

The evaluation that led to this change.

approved_by NotBlankStr | None

Who approved the change ("auto" if auto-approved, "human" if human-approved via approval_id).

approval_id NotBlankStr | None

Approval item ID if human-approved.

effective_at AwareDatetime

When the change took effect.

initiated_by NotBlankStr

Who initiated the promotion process.

model_changed bool

Whether the model was changed.

old_model_id NotBlankStr | None

Previous model ID (None if not changed).

new_model_id NotBlankStr | None

New model ID (None if not changed).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_model_fields

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent who was promoted/demoted

agent_name pydantic-field

agent_name

Agent display name

old_level pydantic-field

old_level

Previous seniority level

new_level pydantic-field

new_level

New seniority level

direction pydantic-field

direction

Promotion or demotion

evaluation pydantic-field

evaluation

Evaluation that led to this change

approved_by pydantic-field

approved_by = None

Who approved the change

approval_id pydantic-field

approval_id = None

Approval item ID if human-approved

effective_at pydantic-field

effective_at

When the change took effect

initiated_by pydantic-field

initiated_by

Who initiated the promotion process

model_changed pydantic-field

model_changed = False

Whether the model was changed

old_model_id pydantic-field

old_model_id = None

Previous model ID

new_model_id pydantic-field

new_model_id = None

New model ID

PromotionRequest pydantic-model

Bases: BaseModel

A pending promotion or demotion request.

Attributes:

Name Type Description
id NotBlankStr

Unique request identifier.

agent_id NotBlankStr

Agent being promoted/demoted.

agent_name NotBlankStr

Agent display name.

current_level SeniorityLevel

Current seniority level.

target_level SeniorityLevel

Target seniority level.

direction PromotionDirection

Whether this is a promotion or demotion.

evaluation PromotionEvaluation

The evaluation supporting this request.

status ApprovalStatus

Current approval status.

created_at AwareDatetime

When the request was created.

approval_id NotBlankStr | None

Linked approval item ID (for human approval).

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

id pydantic-field

id

Unique request identifier

agent_id pydantic-field

agent_id

Agent being promoted/demoted

agent_name pydantic-field

agent_name

Agent display name

current_level pydantic-field

current_level

Current seniority level

target_level pydantic-field

target_level

Target seniority level

direction pydantic-field

direction

Promotion or demotion

evaluation pydantic-field

evaluation

Evaluation supporting this request

status pydantic-field

status = PENDING

Current approval status

created_at pydantic-field

created_at

When the request was created

approval_id pydantic-field

approval_id = None

Linked approval item ID

service

Promotion service orchestrator.

Central service for managing agent promotions and demotions, including criteria evaluation, approval decisions, model mapping, and trust integration.

PromotionService

PromotionService(
    *,
    criteria_strategy,
    approval_strategy,
    model_mapping_strategy,
    registry,
    tracker,
    config,
    approval_store=None,
    trust_service=None,
    on_notification=None,
)

Orchestrates agent promotions and demotions.

Coordinates criteria evaluation, approval decisions, model mapping, registry updates, and optional trust re-evaluation.

Parameters:

Name Type Description Default
criteria_strategy PromotionCriteriaStrategy

Strategy for evaluating promotion criteria.

required
approval_strategy PromotionApprovalStrategy

Strategy for approval decisions.

required
model_mapping_strategy ModelMappingStrategy

Strategy for model resolution.

required
registry AgentRegistryService

Agent registry service.

required
tracker PerformanceTracker

Performance tracker.

required
config PromotionConfig

Promotion configuration.

required
approval_store ApprovalStoreProtocol | None

Optional approval store for human approval.

None
trust_service TrustService | None

Optional trust service for re-evaluation.

None
on_notification PromotionNotificationCallback | None

Optional callback to notify agents/teams of promotion or demotion events. Wired by the communication layer when available.

None
Source code in src/synthorg/hr/promotion/service.py
def __init__(  # noqa: PLR0913
    self,
    *,
    criteria_strategy: PromotionCriteriaStrategy,
    approval_strategy: PromotionApprovalStrategy,
    model_mapping_strategy: ModelMappingStrategy,
    registry: AgentRegistryService,
    tracker: PerformanceTracker,
    config: PromotionConfig,
    approval_store: ApprovalStoreProtocol | None = None,
    trust_service: TrustService | None = None,
    on_notification: PromotionNotificationCallback | None = None,
) -> None:
    self._criteria = criteria_strategy
    self._approval = approval_strategy
    self._model_mapping = model_mapping_strategy
    self._registry = registry
    self._tracker = tracker
    self._config = config
    self._approval_store = approval_store
    self._trust_service = trust_service
    self._on_notification = on_notification
    self._promotion_history: dict[str, list[PromotionRecord]] = {}
    self._cooldown_until: dict[str, AwareDatetime] = {}

evaluate_promotion async

evaluate_promotion(agent_id)

Evaluate whether an agent qualifies for promotion.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required

Returns:

Type Description
PromotionEvaluation

Promotion evaluation result.

Raises:

Type Description
PromotionError

If the agent cannot be promoted.

Source code in src/synthorg/hr/promotion/service.py
async def evaluate_promotion(
    self,
    agent_id: NotBlankStr,
) -> PromotionEvaluation:
    """Evaluate whether an agent qualifies for promotion.

    Args:
        agent_id: Agent to evaluate.

    Returns:
        Promotion evaluation result.

    Raises:
        PromotionError: If the agent cannot be promoted.
    """
    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    target = _next_level(identity.level)
    if target is None:
        msg = f"Agent {agent_id!r} is already at maximum seniority"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            current_level=identity.level.value,
            error=msg,
        )
        raise PromotionError(msg)

    logger.debug(
        PROMOTION_EVALUATE_START,
        agent_id=agent_id,
        current_level=identity.level.value,
        target_level=target.value,
    )

    snapshot = await self._tracker.get_snapshot(agent_id)

    evaluation = await self._criteria.evaluate(
        agent_id=agent_id,
        current_level=identity.level,
        target_level=target,
        snapshot=snapshot,
    )

    logger.debug(
        PROMOTION_EVALUATE_COMPLETE,
        agent_id=agent_id,
        eligible=evaluation.eligible,
    )
    return evaluation

evaluate_demotion async

evaluate_demotion(agent_id)

Evaluate whether an agent should be demoted.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required

Returns:

Type Description
PromotionEvaluation

Demotion evaluation result.

Raises:

Type Description
PromotionError

If the agent cannot be demoted.

Source code in src/synthorg/hr/promotion/service.py
async def evaluate_demotion(
    self,
    agent_id: NotBlankStr,
) -> PromotionEvaluation:
    """Evaluate whether an agent should be demoted.

    Args:
        agent_id: Agent to evaluate.

    Returns:
        Demotion evaluation result.

    Raises:
        PromotionError: If the agent cannot be demoted.
    """
    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    target = _prev_level(identity.level)
    if target is None:
        msg = f"Agent {agent_id!r} is already at minimum seniority"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            current_level=identity.level.value,
            error=msg,
        )
        raise PromotionError(msg)

    logger.debug(
        PROMOTION_EVALUATE_START,
        agent_id=agent_id,
        current_level=identity.level.value,
        target_level=target.value,
        direction="demotion",
    )

    snapshot = await self._tracker.get_snapshot(agent_id)

    return await self._criteria.evaluate(
        agent_id=agent_id,
        current_level=identity.level,
        target_level=target,
        snapshot=snapshot,
    )

request_promotion async

request_promotion(agent_id, evaluation, *, initiated_by=_SYSTEM_INITIATOR)

Create a promotion/demotion request.

Checks cooldown, evaluates approval decision, and creates an approval item if human approval is needed.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to promote/demote.

required
evaluation PromotionEvaluation

The evaluation result.

required
initiated_by NotBlankStr

Who initiated the request.

_SYSTEM_INITIATOR

Returns:

Type Description
PromotionRequest

Promotion request.

Raises:

Type Description
PromotionCooldownError

If in cooldown period.

PromotionError

If agent not found.

Source code in src/synthorg/hr/promotion/service.py
async def request_promotion(
    self,
    agent_id: NotBlankStr,
    evaluation: PromotionEvaluation,
    *,
    initiated_by: NotBlankStr = _SYSTEM_INITIATOR,
) -> PromotionRequest:
    """Create a promotion/demotion request.

    Checks cooldown, evaluates approval decision, and creates
    an approval item if human approval is needed.

    Args:
        agent_id: Agent to promote/demote.
        evaluation: The evaluation result.
        initiated_by: Who initiated the request.

    Returns:
        Promotion request.

    Raises:
        PromotionCooldownError: If in cooldown period.
        PromotionError: If agent not found.
    """
    if not evaluation.eligible:
        msg = f"Agent {agent_id!r} is not eligible for {evaluation.direction.value}"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    if self.is_in_cooldown(agent_id):
        until = self._cooldown_until.get(str(agent_id))
        msg = f"Agent {agent_id!r} is in cooldown until {until}"
        logger.info(
            PROMOTION_COOLDOWN_ACTIVE,
            agent_id=agent_id,
            until=str(until),
        )
        raise PromotionCooldownError(msg)

    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found"
        logger.warning(
            PROMOTION_REQUESTED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    decision = await self._approval.decide(
        evaluation=evaluation,
        agent_identity=identity,
    )

    now = datetime.now(UTC)
    approval_id: NotBlankStr | None = None
    status = ApprovalStatus.PENDING

    if decision.auto_approve:
        status = ApprovalStatus.APPROVED
    elif decision.requires_human:
        if self._approval_store is None:
            msg = (
                f"Promotion for agent {agent_id!r} requires human "
                f"approval but no approval store is configured"
            )
            logger.warning(
                PROMOTION_REQUESTED,
                agent_id=agent_id,
                error=msg,
            )
            raise PromotionError(msg)
        approval_id = await self._create_approval(
            agent_id=agent_id,
            evaluation=evaluation,
            initiated_by=initiated_by,
        )

    request = PromotionRequest(
        agent_id=agent_id,
        agent_name=identity.name,
        current_level=evaluation.current_level,
        target_level=evaluation.target_level,
        direction=evaluation.direction,
        evaluation=evaluation,
        status=status,
        created_at=now,
        approval_id=approval_id,
    )

    logger.info(
        PROMOTION_REQUESTED,
        agent_id=agent_id,
        direction=evaluation.direction.value,
        status=status.value,
    )
    return request

apply_promotion async

apply_promotion(request, *, initiated_by=_SYSTEM_INITIATOR)

Apply a promotion/demotion from an approved request.

Updates the agent's seniority level, resolves model mapping, triggers trust re-evaluation, and records the lifecycle event.

Parameters:

Name Type Description Default
request PromotionRequest

Approved promotion request.

required
initiated_by NotBlankStr

Who initiated the application.

_SYSTEM_INITIATOR

Returns:

Type Description
PromotionRecord

Promotion record.

Raises:

Type Description
PromotionApprovalRequiredError

If request is not approved.

PromotionError

If agent not found.

Source code in src/synthorg/hr/promotion/service.py
async def apply_promotion(
    self,
    request: PromotionRequest,
    *,
    initiated_by: NotBlankStr = _SYSTEM_INITIATOR,
) -> PromotionRecord:
    """Apply a promotion/demotion from an approved request.

    Updates the agent's seniority level, resolves model mapping,
    triggers trust re-evaluation, and records the lifecycle event.

    Args:
        request: Approved promotion request.
        initiated_by: Who initiated the application.

    Returns:
        Promotion record.

    Raises:
        PromotionApprovalRequiredError: If request is not approved.
        PromotionError: If agent not found.
    """
    if request.status != ApprovalStatus.APPROVED:
        event = (
            PROMOTION_REJECTED
            if request.status == ApprovalStatus.REJECTED
            else PROMOTION_REQUESTED
        )
        logger.warning(
            event,
            agent_id=request.agent_id,
            status=request.status.value,
        )
        msg = f"Cannot apply promotion: request status is {request.status.value}"
        raise PromotionApprovalRequiredError(msg)

    await self._verify_approval(request)

    identity = await self._registry.get(request.agent_id)
    if identity is None:
        msg = f"Agent {request.agent_id!r} not found"
        logger.warning(
            PROMOTION_APPLIED,
            agent_id=request.agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    # Resolve model mapping
    new_model_id = self._model_mapping.resolve_model(
        agent_identity=identity,
        new_level=request.target_level,
    )

    updates: dict[str, object] = {"level": request.target_level}
    if new_model_id is not None:
        updates["model"] = identity.model.model_copy(
            update={"model_id": NotBlankStr(new_model_id)},
        )
        logger.info(
            PROMOTION_MODEL_CHANGED,
            agent_id=request.agent_id,
            old_model=str(identity.model.model_id),
            new_model=new_model_id,
        )

    await self._registry.update_identity(
        request.agent_id,
        **updates,
    )

    now = datetime.now(UTC)
    record = PromotionRecord(
        agent_id=request.agent_id,
        agent_name=request.agent_name,
        old_level=request.current_level,
        new_level=request.target_level,
        direction=request.direction,
        evaluation=request.evaluation,
        approved_by=(
            NotBlankStr("auto")
            if request.approval_id is None
            else NotBlankStr("human")
        ),
        approval_id=request.approval_id,
        effective_at=now,
        initiated_by=initiated_by,
        model_changed=new_model_id is not None,
        old_model_id=(
            identity.model.model_id if new_model_id is not None else None
        ),
        new_model_id=(
            NotBlankStr(new_model_id) if new_model_id is not None else None
        ),
    )

    self._promotion_history.setdefault(
        str(request.agent_id),
        [],
    ).append(record)

    if self._config.cooldown_hours > 0:
        self._cooldown_until[str(request.agent_id)] = now + timedelta(
            hours=self._config.cooldown_hours
        )

    # Best-effort trust re-evaluation -- promotion is already applied,
    # so failures here must not prevent the record from being returned.
    if self._trust_service is not None:
        try:
            snapshot = await self._tracker.get_snapshot(request.agent_id)
            await self._trust_service.evaluate_agent(
                request.agent_id,
                snapshot,
            )
        except Exception:
            logger.warning(
                PROMOTION_APPLIED,
                agent_id=request.agent_id,
                error="Trust re-evaluation failed after promotion; "
                "promotion still applied",
            )

    event = (
        PROMOTION_APPLIED
        if request.direction == PromotionDirection.PROMOTION
        else DEMOTION_APPLIED
    )
    logger.info(
        event,
        agent_id=request.agent_id,
        old_level=record.old_level.value,
        new_level=record.new_level.value,
        model_changed=record.model_changed,
    )

    # Notify agent and team -- best-effort, must not block the record.
    if self._on_notification is not None:
        try:
            await self._on_notification(record)
            logger.debug(
                PROMOTION_NOTIFICATION_SENT,
                agent_id=request.agent_id,
                direction=request.direction.value,
            )
        except Exception:
            logger.warning(
                PROMOTION_NOTIFICATION_SENT,
                agent_id=request.agent_id,
                error="Notification callback failed; promotion still applied",
            )

    return record

get_promotion_history

get_promotion_history(agent_id)

Get promotion/demotion history for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
tuple[PromotionRecord, ...]

Tuple of promotion records.

Source code in src/synthorg/hr/promotion/service.py
def get_promotion_history(
    self,
    agent_id: NotBlankStr,
) -> tuple[PromotionRecord, ...]:
    """Get promotion/demotion history for an agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Tuple of promotion records.
    """
    return tuple(self._promotion_history.get(str(agent_id), []))

is_in_cooldown

is_in_cooldown(agent_id)

Check whether an agent is in the promotion cooldown period.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
bool

True if in cooldown.

Source code in src/synthorg/hr/promotion/service.py
def is_in_cooldown(self, agent_id: NotBlankStr) -> bool:
    """Check whether an agent is in the promotion cooldown period.

    Args:
        agent_id: Agent identifier.

    Returns:
        True if in cooldown.
    """
    until = self._cooldown_until.get(str(agent_id))
    if until is None:
        return False
    return datetime.now(UTC) < until

model_mapping_protocol

Model mapping strategy protocol.

Defines the pluggable interface for mapping seniority levels to LLM model identifiers.

ModelMappingStrategy

Bases: Protocol

Protocol for mapping seniority to LLM models.

Implementations determine which model an agent should use after a seniority level change.

name property

name

Strategy name identifier.

resolve_model

resolve_model(*, agent_identity, new_level)

Resolve the model for an agent at a new seniority level.

Parameters:

Name Type Description Default
agent_identity AgentIdentity

The agent's current identity.

required
new_level SeniorityLevel

The new seniority level.

required

Returns:

Type Description
str | None

New model_id, or None if no change needed.

Source code in src/synthorg/hr/promotion/model_mapping_protocol.py
def resolve_model(
    self,
    *,
    agent_identity: AgentIdentity,
    new_level: SeniorityLevel,
) -> str | None:
    """Resolve the model for an agent at a new seniority level.

    Args:
        agent_identity: The agent's current identity.
        new_level: The new seniority level.

    Returns:
        New model_id, or None if no change needed.
    """
    ...

criteria_protocol

Promotion criteria strategy protocol.

Defines the pluggable interface for evaluating promotion/demotion criteria.

PromotionCriteriaStrategy

Bases: Protocol

Protocol for promotion criteria evaluation.

Implementations define what criteria must be met for an agent to be promoted or demoted between seniority levels.

name property

name

Strategy name identifier.

evaluate async

evaluate(*, agent_id, current_level, target_level, snapshot)

Evaluate whether an agent meets criteria for level change.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
current_level SeniorityLevel

Current seniority level.

required
target_level SeniorityLevel

Target seniority level.

required
snapshot AgentPerformanceSnapshot

Agent performance snapshot.

required

Returns:

Type Description
PromotionEvaluation

Evaluation result with criteria details.

Source code in src/synthorg/hr/promotion/criteria_protocol.py
async def evaluate(
    self,
    *,
    agent_id: NotBlankStr,
    current_level: SeniorityLevel,
    target_level: SeniorityLevel,
    snapshot: AgentPerformanceSnapshot,
) -> PromotionEvaluation:
    """Evaluate whether an agent meets criteria for level change.

    Args:
        agent_id: Agent to evaluate.
        current_level: Current seniority level.
        target_level: Target seniority level.
        snapshot: Agent performance snapshot.

    Returns:
        Evaluation result with criteria details.
    """
    ...

approval_protocol

Promotion approval strategy protocol.

Defines the pluggable interface for deciding whether promotions require human approval.

PromotionApprovalStrategy

Bases: Protocol

Protocol for promotion approval decisions.

Implementations determine whether a promotion/demotion can be auto-approved or requires human intervention.

name property

name

Strategy name identifier.

decide async

decide(*, evaluation, agent_identity)

Decide whether a promotion needs human approval.

Parameters:

Name Type Description Default
evaluation PromotionEvaluation

The promotion evaluation result.

required
agent_identity AgentIdentity

The agent's current identity.

required

Returns:

Type Description
PromotionApprovalDecision

Approval decision.

Source code in src/synthorg/hr/promotion/approval_protocol.py
async def decide(
    self,
    *,
    evaluation: PromotionEvaluation,
    agent_identity: AgentIdentity,
) -> PromotionApprovalDecision:
    """Decide whether a promotion needs human approval.

    Args:
        evaluation: The promotion evaluation result.
        agent_identity: The agent's current identity.

    Returns:
        Approval decision.
    """
    ...

Pruning

models

Pruning domain models.

Frozen Pydantic models for pruning evaluations, requests, records, and service configuration.

PruningEvaluation pydantic-model

Bases: BaseModel

Result of a pruning policy evaluation.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent being evaluated.

eligible bool

Whether agent should be pruned.

reasons tuple[NotBlankStr, ...]

Human-readable justifications.

scores dict[str, float]

Debug scores from evaluation criteria.

policy_name NotBlankStr

Which policy produced this evaluation.

snapshot AgentPerformanceSnapshot

Performance snapshot used for evaluation.

evaluated_at AwareDatetime

When evaluation occurred.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_eligible_reasons

agent_id pydantic-field

agent_id

Agent being evaluated

eligible pydantic-field

eligible

Whether agent should be pruned

reasons pydantic-field

reasons = ()

Human-readable justifications

scores pydantic-field

scores

Debug scores from evaluation criteria

policy_name pydantic-field

policy_name

Which policy produced this evaluation

snapshot pydantic-field

snapshot

Performance snapshot used for evaluation

evaluated_at pydantic-field

evaluated_at

When evaluation occurred

PruningRequest pydantic-model

Bases: BaseModel

Request to prune an agent pending human approval.

Attributes:

Name Type Description
id NotBlankStr

Unique request identifier.

agent_id NotBlankStr

Agent to be pruned.

agent_name NotBlankStr

Agent's display name.

evaluation PruningEvaluation

The evaluation result.

approval_id NotBlankStr

Associated approval item ID.

status ApprovalStatus

Current approval status.

created_at AwareDatetime

When request was created.

decided_at AwareDatetime | None

When approval decision was made.

decided_by NotBlankStr | None

Who made the approval decision.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_decision_fields

id pydantic-field

id

Unique request identifier

agent_id pydantic-field

agent_id

Agent to be pruned

agent_name pydantic-field

agent_name

Agent display name

evaluation pydantic-field

evaluation

Evaluation result

approval_id pydantic-field

approval_id

Associated approval item ID

status pydantic-field

status = PENDING

Current approval status

created_at pydantic-field

created_at

When request was created

decided_at pydantic-field

decided_at = None

When approval decision was made

decided_by pydantic-field

decided_by = None

Who made the approval decision

PruningRecord pydantic-model

Bases: BaseModel

Record of a completed pruning process.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent who was pruned.

agent_name NotBlankStr

Agent's display name.

pruning_request_id NotBlankStr

Associated pruning request.

firing_request_id NotBlankStr

Firing request ID from offboarding.

reason NotBlankStr

Why agent was pruned.

approval_id NotBlankStr

Which approval authorized it.

initiated_by NotBlankStr

System or human who initiated.

created_at AwareDatetime

When process started.

completed_at AwareDatetime

When process finished.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_temporal_order

agent_id pydantic-field

agent_id

Agent who was pruned

agent_name pydantic-field

agent_name

Agent display name

pruning_request_id pydantic-field

pruning_request_id

Associated pruning request

firing_request_id pydantic-field

firing_request_id

Firing request ID from offboarding

reason pydantic-field

reason

Why agent was pruned

approval_id pydantic-field

approval_id

Approval that authorized it

initiated_by pydantic-field

initiated_by

Who initiated the pruning

created_at pydantic-field

created_at

When process started

completed_at pydantic-field

completed_at

When process finished

PruningJobRun pydantic-model

Bases: BaseModel

Metadata about a single pruning scheduler cycle.

Attributes:

Name Type Description
job_id NotBlankStr

Unique cycle identifier.

run_at AwareDatetime

When the cycle started.

agents_evaluated int

Count of agents checked.

agents_eligible int

Count found eligible for pruning.

approval_requests_created int

Count of new approvals.

elapsed_seconds float

How long the cycle took.

errors tuple[NotBlankStr, ...]

Non-fatal errors encountered.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

Validators:

  • _validate_count_relationships

job_id pydantic-field

job_id

Unique cycle identifier

run_at pydantic-field

run_at

When the cycle started

agents_evaluated pydantic-field

agents_evaluated

Agents checked

agents_eligible pydantic-field

agents_eligible

Agents eligible for pruning

approval_requests_created pydantic-field

approval_requests_created

New approvals created

elapsed_seconds pydantic-field

elapsed_seconds

Cycle duration

errors pydantic-field

errors = ()

Non-fatal errors encountered

PruningServiceConfig pydantic-model

Bases: BaseModel

Configuration for the pruning service.

Attributes:

Name Type Description
evaluation_interval_seconds float

How often to run pruning cycles.

max_approvals_per_cycle int

Limit on approvals created per cycle.

approval_expiry_days int

Days until pending approval expires.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

evaluation_interval_seconds pydantic-field

evaluation_interval_seconds = 3600.0

How often to run pruning cycles

max_approvals_per_cycle pydantic-field

max_approvals_per_cycle = 5

Limit on approvals created per cycle

approval_expiry_days pydantic-field

approval_expiry_days = 7

Days until pending approval expires

policy

Pruning policy protocol and implementations.

Defines pluggable strategies for evaluating whether an agent should be pruned based on performance data.

PruningPolicy

Bases: Protocol

Strategy for evaluating whether an agent should be pruned.

evaluate async

evaluate(agent_id, snapshot)

Evaluate if agent should be pruned based on performance data.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent being evaluated.

required
snapshot AgentPerformanceSnapshot

Current performance snapshot.

required

Returns:

Type Description
PruningEvaluation

Evaluation result with eligibility and reasons.

Source code in src/synthorg/hr/pruning/policy.py
async def evaluate(
    self,
    agent_id: NotBlankStr,
    snapshot: AgentPerformanceSnapshot,
) -> PruningEvaluation:
    """Evaluate if agent should be pruned based on performance data.

    Args:
        agent_id: The agent being evaluated.
        snapshot: Current performance snapshot.

    Returns:
        Evaluation result with eligibility and reasons.
    """
    ...

ThresholdPruningPolicyConfig pydantic-model

Bases: BaseModel

Configuration for threshold-based pruning.

Attributes:

Name Type Description
quality_threshold float

Quality score floor (0-10).

collaboration_threshold float

Collaboration score floor (0-10).

minimum_consecutive_windows int

Windows that must be below threshold.

minimum_window_data_points int

Minimum records to evaluate a window.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

quality_threshold pydantic-field

quality_threshold = 3.5

Quality score floor

collaboration_threshold pydantic-field

collaboration_threshold = 3.5

Collaboration score floor

minimum_consecutive_windows pydantic-field

minimum_consecutive_windows = 2

Consecutive windows below threshold required

minimum_window_data_points pydantic-field

minimum_window_data_points = 5

Minimum data points to evaluate a window

ThresholdPruningPolicy

ThresholdPruningPolicy(config)

Prune agents with quality and collaboration below thresholds.

Agent is eligible if N+ consecutive windows (ordered by size) have both avg_quality_score and collaboration_score strictly below the configured thresholds, with sufficient data points.

Windows with None scores or insufficient data points break the consecutive streak.

Source code in src/synthorg/hr/pruning/policy.py
def __init__(self, config: ThresholdPruningPolicyConfig) -> None:
    self._config = config

evaluate async

evaluate(agent_id, snapshot)

Check quality/collaboration against thresholds.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent being evaluated.

required
snapshot AgentPerformanceSnapshot

Current performance snapshot.

required

Returns:

Type Description
PruningEvaluation

Evaluation result with eligibility and reasons.

Source code in src/synthorg/hr/pruning/policy.py
async def evaluate(
    self,
    agent_id: NotBlankStr,
    snapshot: AgentPerformanceSnapshot,
) -> PruningEvaluation:
    """Check quality/collaboration against thresholds.

    Args:
        agent_id: The agent being evaluated.
        snapshot: Current performance snapshot.

    Returns:
        Evaluation result with eligibility and reasons.
    """
    now = datetime.now(UTC)
    windows_by_size = {str(w.window_size): w for w in snapshot.windows}

    consecutive = 0
    max_consecutive = 0
    current_failing: list[str] = []
    best_failing: list[str] = []

    for size in _EXPECTED_WINDOWS:
        window = windows_by_size.get(size)
        if window and self._window_qualifies(window):
            consecutive += 1
            current_failing.append(size)
            if consecutive > max_consecutive:
                max_consecutive = consecutive
                best_failing = current_failing.copy()
        else:
            consecutive = 0
            current_failing.clear()

    eligible = max_consecutive >= self._config.minimum_consecutive_windows

    reasons: tuple[NotBlankStr, ...] = ()
    if eligible:
        windows_str = ", ".join(best_failing)
        reasons = (
            NotBlankStr(
                f"Quality and collaboration below thresholds "
                f"in {windows_str} windows"
            ),
        )

    scores = self._build_scores(snapshot)

    logger.info(
        HR_PRUNING_EVALUATION_COMPLETE,
        agent_id=str(agent_id),
        policy="threshold",
        eligible=eligible,
        consecutive_windows=max_consecutive,
    )

    return PruningEvaluation(
        agent_id=agent_id,
        eligible=eligible,
        reasons=reasons,
        scores=scores,
        policy_name=NotBlankStr("threshold"),
        snapshot=snapshot,
        evaluated_at=now,
    )

TrendPruningPolicyConfig pydantic-model

Bases: BaseModel

Configuration for trend-based pruning.

Attributes:

Name Type Description
minimum_data_points_per_window int

Min data points per trend window.

metric_name NotBlankStr

Which metric to track for trend evaluation.

Config:

  • frozen: True
  • allow_inf_nan: False
  • extra: forbid

Fields:

minimum_data_points_per_window pydantic-field

minimum_data_points_per_window = 5

Minimum data points per trend window

metric_name pydantic-field

metric_name = NotBlankStr('quality_score')

Metric to track for trend evaluation

TrendPruningPolicy

TrendPruningPolicy(config)

Prune agents with sustained negative trends across all windows.

Agent is eligible if all three windows (7d, 30d, 90d) show DECLINING direction for the configured metric, with sufficient data points per window.

Trends with INSUFFICIENT_DATA direction or below minimum data points are treated as non-declining (agent not eligible).

Source code in src/synthorg/hr/pruning/policy.py
def __init__(self, config: TrendPruningPolicyConfig) -> None:
    self._config = config

evaluate async

evaluate(agent_id, snapshot)

Check all windows for consistent declining trends.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent being evaluated.

required
snapshot AgentPerformanceSnapshot

Current performance snapshot.

required

Returns:

Type Description
PruningEvaluation

Evaluation result with eligibility and reasons.

Source code in src/synthorg/hr/pruning/policy.py
async def evaluate(
    self,
    agent_id: NotBlankStr,
    snapshot: AgentPerformanceSnapshot,
) -> PruningEvaluation:
    """Check all windows for consistent declining trends.

    Args:
        agent_id: The agent being evaluated.
        snapshot: Current performance snapshot.

    Returns:
        Evaluation result with eligibility and reasons.
    """
    now = datetime.now(UTC)

    qualifying_trends = [
        t
        for t in snapshot.trends
        if (
            str(t.metric_name) == str(self._config.metric_name)
            and t.data_point_count >= self._config.minimum_data_points_per_window
        )
    ]

    declining_windows: dict[str, float] = {}
    for trend in qualifying_trends:
        if trend.direction == TrendDirection.DECLINING:
            declining_windows[str(trend.window_size)] = trend.slope

    eligible = all(w in declining_windows for w in _EXPECTED_WINDOWS)

    reasons: tuple[NotBlankStr, ...] = ()
    if eligible:
        reasons = (
            NotBlankStr(
                f"Declining {self._config.metric_name} trend "
                f"across all windows (7d, 30d, 90d)"
            ),
        )

    scores: dict[str, float] = {
        f"slope_{w}": s for w, s in declining_windows.items()
    }

    logger.info(
        HR_PRUNING_EVALUATION_COMPLETE,
        agent_id=str(agent_id),
        policy="trend",
        eligible=eligible,
        declining_windows=len(declining_windows),
    )

    return PruningEvaluation(
        agent_id=agent_id,
        eligible=eligible,
        reasons=reasons,
        scores=scores,
        policy_name=NotBlankStr("trend"),
        snapshot=snapshot,
        evaluated_at=now,
    )

service

Pruning service -- performance-driven agent removal with human approval.

Periodically evaluates active agents against pruning policies, creates approval items for eligible candidates, and delegates to OffboardingService once human approval is granted.

Note

_pending_requests and _processed_approval_ids are in-memory only. If the service restarts, already-decided approvals may be reprocessed. The pruning_request_id stored in approval metadata mitigates data loss for the audit trail, but full durability requires a persistent backend (planned).

PruningService

PruningService(
    *,
    policies,
    registry,
    tracker,
    approval_store,
    offboarding_service,
    config=None,
    on_notification=None,
)

Orchestrates performance-driven agent pruning with human approval.

Parameters:

Name Type Description Default
policies tuple[PruningPolicy, ...]

Pruning policy strategies to evaluate.

required
registry AgentRegistryService

Agent registry for listing active agents.

required
tracker PerformanceTracker

Performance tracker for snapshots.

required
approval_store ApprovalStoreProtocol

Approval store for human decisions.

required
offboarding_service OffboardingService

Service to delegate offboarding to.

required
config PruningServiceConfig | None

Pruning service configuration.

None
on_notification Callable[[PruningRecord], Awaitable[None]] | None

Optional callback for completion notifications.

None
Source code in src/synthorg/hr/pruning/service.py
def __init__(  # noqa: PLR0913
    self,
    *,
    policies: tuple[PruningPolicy, ...],
    registry: AgentRegistryService,
    tracker: PerformanceTracker,
    approval_store: ApprovalStoreProtocol,
    offboarding_service: OffboardingService,
    config: PruningServiceConfig | None = None,
    on_notification: (Callable[[PruningRecord], Awaitable[None]] | None) = None,
) -> None:
    self._policies = policies
    self._registry = registry
    self._tracker = tracker
    self._approval_store = approval_store
    self._offboarding_service = offboarding_service
    self._config = config or PruningServiceConfig()
    self._on_notification = on_notification
    self._task: asyncio.Task[None] | None = None
    # Eager init: wake / stop events are signalled by callers that
    # may run before the background loop, so half-published event
    # attributes would race with the first ``stop()`` or
    # ``request_immediate_run``.
    self._wake_event = asyncio.Event()  # lint-allow: loop-bound-init -- see.
    self._stop_event = asyncio.Event()  # lint-allow: loop-bound-init -- see.
    # Per ``docs/reference/lifecycle-sync.md``: dedicated lifecycle
    # primitives, kept distinct from the hot-path
    # ``_processing_lock`` so a concurrent pruning cycle cannot
    # block lifecycle transitions. Eager init: ``stop()`` must be
    # safe before any ``start()`` call.
    self._lifecycle_lock = asyncio.Lock()  # lint-allow: loop-bound-init -- see.
    self._stop_failed: bool = False
    self._stop_drain_timeout_seconds: float = 30.0
    self._pending_requests: dict[str, PruningRequest] = {}
    self._completed: list[PruningRecord] = []
    self._processed_approval_ids: set[str] = set()
    # Approval ids whose ``PRUNING_REQUEST_STATUS_TRANSITIONED``
    # log has already been emitted. Distinct from
    # ``_processed_approval_ids``: that set only adds ids on
    # successful offboarding, so a failed/retried approval would
    # log the transition on every sweep. This set advances the
    # moment the log fires (which itself happens after the
    # approval-store persistence) and is the canonical "this hop
    # has been observed" idempotency anchor for the transition
    # log.
    self._logged_transition_approval_ids: set[str] = set()
    # Approval ids currently being handled by a cycle.  Two
    # concurrent ``_process_decided_approvals`` cycles MUST NOT
    # both enter ``_handle_approved`` / ``_handle_rejected`` for
    # the same id; the in-flight set closes that window without
    # serialising the slow offboarding I/O path.  Transient
    # failures (e.g. offboarding returns ``None``) leave the id
    # out of ``_processed_approval_ids`` so the next cycle
    # retries; absence from the processed set is intentional on
    # those paths, not an oversight.
    self._in_flight_approvals: set[str] = set()
    # Eager init: hot-path lock used by ``_process_decided_approvals``
    # which may run before ``start()`` if a manual sweep is invoked.
    self._processing_lock = asyncio.Lock()  # lint-allow: loop-bound-init -- see.

is_running property

is_running

Whether the scheduler loop is currently active.

start async

start()

Start the background pruning scheduler.

Idempotent + concurrent-safe per docs/reference/lifecycle-sync.md: serialises on self._lifecycle_lock so concurrent callers cannot double-spawn the run loop.

Source code in src/synthorg/hr/pruning/service.py
async def start(self) -> None:
    """Start the background pruning scheduler.

    Idempotent + concurrent-safe per ``docs/reference/lifecycle-sync.md``:
    serialises on ``self._lifecycle_lock`` so concurrent callers
    cannot double-spawn the run loop.
    """
    async with self._lifecycle_lock:
        if self._stop_failed:
            msg = (
                "PruningService is unrestartable after a "
                "timed-out stop; construct a fresh service instead"
            )
            logger.warning(
                HR_PRUNING_POLICY_ERROR,
                error=msg,
                note="unrestartable",
            )
            raise PruningUnrestartableError(msg)
        if self.is_running:
            return
        self._wake_event.clear()
        self._stop_event.clear()
        self._task = asyncio.create_task(
            self._run_loop(),
            name="pruning-scheduler",
        )
        logger.info(HR_PRUNING_SCHEDULER_STARTED)

stop async

stop()

Stop the background scheduler gracefully.

First signals the run loop to exit cleanly via _stop_event and waits up to _stop_drain_timeout_seconds for the in-flight pruning cycle to finish. Only escalates to task.cancel() if the cooperative drain times out -- on timeout the service is also marked unrestartable so a fresh start() does not stack a second loop on top of the orphan task that may still own pruning state.

Source code in src/synthorg/hr/pruning/service.py
async def stop(self) -> None:
    """Stop the background scheduler gracefully.

    First signals the run loop to exit cleanly via ``_stop_event``
    and waits up to ``_stop_drain_timeout_seconds`` for the
    in-flight pruning cycle to finish. Only escalates to
    ``task.cancel()`` if the cooperative drain times out -- on
    timeout the service is also marked unrestartable so a fresh
    ``start()`` does not stack a second loop on top of the orphan
    task that may still own pruning state.
    """
    async with self._lifecycle_lock:
        self._stop_event.set()
        self._wake_event.set()
        task = self._task
        if task is None:
            return
        try:
            await asyncio.wait_for(
                asyncio.shield(task),
                timeout=self._stop_drain_timeout_seconds,
            )
        except TimeoutError:
            # Cooperative drain missed the deadline. Cancel hard,
            # mark unrestartable, and re-raise so the caller sees
            # the timeout. The running cycle owns repository
            # state we cannot safely interrupt twice, so we do
            # NOT chase the cancellation with a second wait.
            task.cancel()
            self._stop_failed = True
            logger.error(
                HR_PRUNING_POLICY_ERROR,
                error=("stop exceeded hard deadline; service marked unrestartable"),
                timeout_seconds=self._stop_drain_timeout_seconds,
            )
            raise
        except asyncio.CancelledError:
            # Distinguish external cancellation of ``stop()`` from
            # the running cycle being cancelled before observing
            # ``_stop_event``. If the loop task is still alive, the
            # cancel was aimed at us, not at it -- re-raise so the
            # external cancellation propagates and we don't pretend
            # the service drained when it actually didn't.
            if not task.done():
                raise
            # Loop already finished; drained successfully.
        except MemoryError, RecursionError:
            raise
        except Exception as exc:
            logger.warning(
                HR_PRUNING_POLICY_ERROR,
                error_type=type(exc).__name__,
                error=safe_error_description(exc),
                note="shutdown",
            )
        self._task = None
        # Recreate the loop-bound events WHILE holding the
        # lifecycle lock. Outside the lock, a racing ``start()``
        # could spawn the run loop bound to the OLD events
        # before these assignments land, leaving a later stop()
        # signalling different events than the running task is
        # waiting on. ``self._lifecycle_lock`` itself MUST stay
        # the same instance for the service's lifetime; only the
        # events are swapped.
        self._stop_event = asyncio.Event()
        self._wake_event = asyncio.Event()
        logger.info(HR_PRUNING_SCHEDULER_STOPPED)

wake

wake()

Trigger an early pruning cycle.

Source code in src/synthorg/hr/pruning/service.py
def wake(self) -> None:
    """Trigger an early pruning cycle."""
    self._wake_event.set()

run_pruning_cycle async

run_pruning_cycle(*, now=None)

Execute a single pruning evaluation cycle.

Parameters:

Name Type Description Default
now datetime | None

Override for current time (testing).

None

Returns:

Type Description
PruningJobRun

Job run metadata with cycle statistics.

Source code in src/synthorg/hr/pruning/service.py
async def run_pruning_cycle(
    self,
    *,
    now: datetime | None = None,
) -> PruningJobRun:
    """Execute a single pruning evaluation cycle.

    Args:
        now: Override for current time (testing).

    Returns:
        Job run metadata with cycle statistics.
    """
    if now is None:
        now = datetime.now(UTC)

    cycle_start = datetime.now(UTC)
    job_id = NotBlankStr(str(uuid4()))
    logger.info(HR_PRUNING_CYCLE_STARTED, job_id=str(job_id))

    errors: list[NotBlankStr] = []
    await self._process_decided_approvals()

    active_agents = await self._registry.list_active()
    eligible = await self._evaluate_all(active_agents, now, errors)
    approvals = await self._submit_approvals(eligible, now, errors)

    elapsed = (datetime.now(UTC) - cycle_start).total_seconds()
    job_run = PruningJobRun(
        job_id=job_id,
        run_at=now,
        agents_evaluated=len(active_agents),
        agents_eligible=len(eligible),
        approval_requests_created=approvals,
        elapsed_seconds=elapsed,
        errors=tuple(errors),
    )

    logger.info(
        HR_PRUNING_CYCLE_COMPLETE,
        job_id=str(job_id),
        agents_evaluated=len(active_agents),
        agents_eligible=len(eligible),
        approvals_created=approvals,
        elapsed_seconds=elapsed,
    )
    return job_run