Skip to content

HR

Agent lifecycle management -- hiring, firing, onboarding, offboarding, performance tracking, and promotion/demotion.

Models

models

HR domain models.

Frozen Pydantic models for hiring, firing, onboarding, offboarding, and agent lifecycle events.

CandidateCard pydantic-model

Bases: BaseModel

Generated candidate for a hiring request.

Attributes:

Name Type Description
id NotBlankStr

Unique candidate identifier.

name NotBlankStr

Proposed agent name.

role NotBlankStr

Proposed role.

department NotBlankStr

Target department.

level SeniorityLevel

Proposed seniority level.

skills tuple[NotBlankStr, ...]

Agent skills.

rationale NotBlankStr

Why this candidate was generated.

estimated_monthly_cost float

Estimated monthly cost in USD (base currency).

template_source NotBlankStr | None

Template used for generation, if any.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique candidate identifier

name pydantic-field

name

Proposed agent name

role pydantic-field

role

Proposed role

department pydantic-field

department

Target department

level pydantic-field

level

Proposed seniority level

skills pydantic-field

skills = ()

Agent skills

rationale pydantic-field

rationale

Generation rationale

estimated_monthly_cost pydantic-field

estimated_monthly_cost

Estimated monthly cost in USD (base currency)

template_source pydantic-field

template_source = None

Template used for generation

HiringRequest pydantic-model

Bases: BaseModel

Request to hire a new agent.

Attributes:

Name Type Description
id NotBlankStr

Unique request identifier.

requested_by NotBlankStr

Agent or human who initiated the request.

department NotBlankStr

Target department.

role NotBlankStr

Desired role.

level SeniorityLevel

Desired seniority level.

required_skills tuple[NotBlankStr, ...]

Skills the candidate must have.

reason NotBlankStr

Business justification.

budget_limit_monthly float | None

Maximum monthly cost, if constrained.

template_name NotBlankStr | None

Template to use for candidate generation.

status HiringRequestStatus

Current request status.

created_at AwareDatetime

When the request was created.

candidates tuple[CandidateCard, ...]

Generated candidate cards.

selected_candidate_id NotBlankStr | None

ID of the chosen candidate.

approval_id NotBlankStr | None

ID of the associated approval item.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_status_candidate_consistency

id pydantic-field

id

Unique request identifier

requested_by pydantic-field

requested_by

Request initiator

department pydantic-field

department

Target department

role pydantic-field

role

Desired role

level pydantic-field

level

Desired seniority level

required_skills pydantic-field

required_skills = ()

Required skills

reason pydantic-field

reason

Business justification

budget_limit_monthly pydantic-field

budget_limit_monthly = None

Maximum monthly cost

template_name pydantic-field

template_name = None

Template for candidate generation

status pydantic-field

status = PENDING

Current request status

created_at pydantic-field

created_at

When the request was created

candidates pydantic-field

candidates = ()

Generated candidate cards

selected_candidate_id pydantic-field

selected_candidate_id = None

Chosen candidate ID

approval_id pydantic-field

approval_id = None

Associated approval item ID

FiringRequest pydantic-model

Bases: BaseModel

Request to terminate an agent.

Attributes:

Name Type Description
id NotBlankStr

Unique request identifier.

agent_id NotBlankStr

Agent to be terminated.

agent_name NotBlankStr

Agent's display name.

reason FiringReason

Reason for termination.

requested_by NotBlankStr

Initiator of the firing.

details str

Additional context.

created_at AwareDatetime

When the request was created.

completed_at AwareDatetime | None

When the firing was completed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_temporal_order

id pydantic-field

id

Unique request identifier

agent_id pydantic-field

agent_id

Agent to terminate

agent_name pydantic-field

agent_name

Agent display name

reason pydantic-field

reason

Reason for termination

requested_by pydantic-field

requested_by

Firing initiator

details pydantic-field

details = ''

Additional context

created_at pydantic-field

created_at

When the request was created

completed_at pydantic-field

completed_at = None

When the firing was completed

OnboardingStepRecord pydantic-model

Bases: BaseModel

Record of a single onboarding step.

Attributes:

Name Type Description
step OnboardingStep

The onboarding step.

completed bool

Whether this step is complete.

completed_at AwareDatetime | None

When this step was completed.

notes str

Optional notes from the step.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_completed_consistency

step pydantic-field

step

The onboarding step

completed pydantic-field

completed = False

Whether step is complete

completed_at pydantic-field

completed_at = None

When completed

notes pydantic-field

notes = ''

Step notes

OnboardingChecklist pydantic-model

Bases: BaseModel

Agent onboarding checklist tracking all steps.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent being onboarded.

steps tuple[OnboardingStepRecord, ...]

Individual step records.

started_at AwareDatetime

When onboarding began.

completed_at AwareDatetime | None

When all steps were completed.

is_complete bool

Whether all steps are done (computed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_completion_consistency

agent_id pydantic-field

agent_id

Agent being onboarded

steps pydantic-field

steps

Individual step records

started_at pydantic-field

started_at

When onboarding began

completed_at pydantic-field

completed_at = None

When all steps were completed

is_complete property

is_complete

Whether all onboarding steps are completed.

OffboardingRecord pydantic-model

Bases: BaseModel

Record of a completed offboarding process.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent who was offboarded.

agent_name NotBlankStr

Agent's display name.

firing_request_id NotBlankStr

Associated firing request.

tasks_reassigned tuple[NotBlankStr, ...]

IDs of reassigned tasks.

memory_archive_id NotBlankStr | None

ID of the memory archive, if created.

org_memories_promoted int

Number of memories promoted to org.

team_notification_sent bool

Whether team was notified.

started_at AwareDatetime

When offboarding started.

completed_at AwareDatetime

When offboarding finished.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_temporal_order

agent_id pydantic-field

agent_id

Agent who was offboarded

agent_name pydantic-field

agent_name

Agent display name

firing_request_id pydantic-field

firing_request_id

Associated firing request

tasks_reassigned pydantic-field

tasks_reassigned = ()

IDs of reassigned tasks

memory_archive_id pydantic-field

memory_archive_id = None

Memory archive ID

org_memories_promoted pydantic-field

org_memories_promoted = 0

Memories promoted to org

team_notification_sent pydantic-field

team_notification_sent = False

Whether team was notified

started_at pydantic-field

started_at

When offboarding started

completed_at pydantic-field

completed_at

When offboarding finished

AgentLifecycleEvent pydantic-model

Bases: BaseModel

Record of an agent lifecycle event.

Attributes:

Name Type Description
id NotBlankStr

Unique event identifier.

agent_id NotBlankStr

Agent the event relates to.

agent_name NotBlankStr

Agent's display name.

event_type LifecycleEventType

Type of lifecycle event.

timestamp AwareDatetime

When the event occurred.

initiated_by NotBlankStr

Who triggered the event.

details str

Human-readable event details.

metadata dict[str, str]

Additional structured metadata.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique event identifier

agent_id pydantic-field

agent_id

Agent the event relates to

agent_name pydantic-field

agent_name

Agent display name

event_type pydantic-field

event_type

Type of lifecycle event

timestamp pydantic-field

timestamp

When the event occurred

initiated_by pydantic-field

initiated_by

Who triggered the event

details pydantic-field

details = ''

Event details

metadata pydantic-field

metadata

Additional structured metadata

Registry

registry

Agent registry service.

Hot-pluggable agent registry for tracking active agents, their identities, and lifecycle status transitions (D8.3).

AgentRegistryService

AgentRegistryService()

Hot-pluggable agent registry.

Coroutine-safe via asyncio.Lock within a single event loop. Stores agent identities keyed by agent ID (string form of UUID).

Source code in src/synthorg/hr/registry.py
def __init__(self) -> None:
    self._agents: dict[str, AgentIdentity] = {}
    self._lock = asyncio.Lock()

register async

register(identity)

Register a new agent.

Parameters:

Name Type Description Default
identity AgentIdentity

The agent identity to register.

required

Raises:

Type Description
AgentAlreadyRegisteredError

If the agent is already registered.

Source code in src/synthorg/hr/registry.py
async def register(self, identity: AgentIdentity) -> None:
    """Register a new agent.

    Args:
        identity: The agent identity to register.

    Raises:
        AgentAlreadyRegisteredError: If the agent is already registered.
    """
    agent_key = str(identity.id)
    async with self._lock:
        if agent_key in self._agents:
            msg = f"Agent {identity.name!r} ({agent_key}) is already registered"
            logger.warning(
                HR_REGISTRY_AGENT_REGISTERED,
                agent_id=agent_key,
                error=msg,
            )
            raise AgentAlreadyRegisteredError(msg)
        self._agents[agent_key] = identity

    logger.info(
        HR_REGISTRY_AGENT_REGISTERED,
        agent_id=agent_key,
        agent_name=str(identity.name),
        status=identity.status.value,
    )

unregister async

unregister(agent_id)

Remove an agent from the registry.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier to remove.

required

Returns:

Type Description
AgentIdentity

The removed agent identity.

Raises:

Type Description
AgentNotFoundError

If the agent is not found.

Source code in src/synthorg/hr/registry.py
async def unregister(self, agent_id: NotBlankStr) -> AgentIdentity:
    """Remove an agent from the registry.

    Args:
        agent_id: The agent identifier to remove.

    Returns:
        The removed agent identity.

    Raises:
        AgentNotFoundError: If the agent is not found.
    """
    async with self._lock:
        identity = self._agents.pop(str(agent_id), None)
    if identity is None:
        msg = f"Agent {agent_id!r} not found in registry"
        logger.warning(
            HR_REGISTRY_AGENT_REMOVED,
            agent_id=str(agent_id),
            error=msg,
        )
        raise AgentNotFoundError(msg)

    logger.info(
        HR_REGISTRY_AGENT_REMOVED,
        agent_id=str(agent_id),
        agent_name=str(identity.name),
    )
    return identity

get async

get(agent_id)

Retrieve an agent identity by ID.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required

Returns:

Type Description
AgentIdentity | None

The agent identity, or None if not found.

Source code in src/synthorg/hr/registry.py
async def get(self, agent_id: NotBlankStr) -> AgentIdentity | None:
    """Retrieve an agent identity by ID.

    Args:
        agent_id: The agent identifier.

    Returns:
        The agent identity, or None if not found.
    """
    async with self._lock:
        return self._agents.get(str(agent_id))

get_by_name async

get_by_name(name)

Retrieve an agent identity by name.

Parameters:

Name Type Description Default
name NotBlankStr

The agent name to search for.

required

Returns:

Type Description
AgentIdentity | None

The first matching agent, or None.

Source code in src/synthorg/hr/registry.py
async def get_by_name(self, name: NotBlankStr) -> AgentIdentity | None:
    """Retrieve an agent identity by name.

    Args:
        name: The agent name to search for.

    Returns:
        The first matching agent, or None.
    """
    async with self._lock:
        name_lower = str(name).lower()
        for identity in self._agents.values():
            if str(identity.name).lower() == name_lower:
                return identity
        return None

list_active async

list_active()

List all agents with ACTIVE status.

Returns:

Type Description
tuple[AgentIdentity, ...]

Tuple of active agent identities.

Source code in src/synthorg/hr/registry.py
async def list_active(self) -> tuple[AgentIdentity, ...]:
    """List all agents with ACTIVE status.

    Returns:
        Tuple of active agent identities.
    """
    async with self._lock:
        return tuple(
            a for a in self._agents.values() if a.status == AgentStatus.ACTIVE
        )

list_by_department async

list_by_department(department)

List agents in a specific department.

Parameters:

Name Type Description Default
department NotBlankStr

Department name to filter by.

required

Returns:

Type Description
tuple[AgentIdentity, ...]

Tuple of matching agent identities.

Source code in src/synthorg/hr/registry.py
async def list_by_department(
    self,
    department: NotBlankStr,
) -> tuple[AgentIdentity, ...]:
    """List agents in a specific department.

    Args:
        department: Department name to filter by.

    Returns:
        Tuple of matching agent identities.
    """
    async with self._lock:
        dept_lower = str(department).lower()
        return tuple(
            a
            for a in self._agents.values()
            if str(a.department).lower() == dept_lower
        )

update_status async

update_status(agent_id, status)

Update an agent's lifecycle status.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required
status AgentStatus

New status.

required

Returns:

Type Description
AgentIdentity

Updated agent identity.

Raises:

Type Description
AgentNotFoundError

If the agent is not found.

Source code in src/synthorg/hr/registry.py
async def update_status(
    self,
    agent_id: NotBlankStr,
    status: AgentStatus,
) -> AgentIdentity:
    """Update an agent's lifecycle status.

    Args:
        agent_id: The agent identifier.
        status: New status.

    Returns:
        Updated agent identity.

    Raises:
        AgentNotFoundError: If the agent is not found.
    """
    key = str(agent_id)
    async with self._lock:
        identity = self._agents.get(key)
        if identity is None:
            msg = f"Agent {agent_id!r} not found in registry"
            logger.warning(
                HR_REGISTRY_STATUS_UPDATED,
                agent_id=key,
                error=msg,
            )
            raise AgentNotFoundError(msg)
        updated = identity.model_copy(update={"status": status})
        self._agents[key] = updated

    logger.info(
        HR_REGISTRY_STATUS_UPDATED,
        agent_id=key,
        status=status.value,
    )
    return updated

update_identity async

update_identity(agent_id, **updates)

Update agent identity fields via model_copy(update=...).

Only fields in _UPDATABLE_FIELDS are accepted. Use update_status for status changes.

Parameters:

Name Type Description Default
agent_id NotBlankStr

The agent identifier.

required
**updates Any

Fields to update on the AgentIdentity.

{}

Returns:

Type Description
AgentIdentity

Updated agent identity.

Raises:

Type Description
AgentNotFoundError

If the agent is not found.

ValueError

If any field is not in the allowlist.

Source code in src/synthorg/hr/registry.py
async def update_identity(
    self,
    agent_id: NotBlankStr,
    **updates: Any,
) -> AgentIdentity:
    """Update agent identity fields via model_copy(update=...).

    Only fields in ``_UPDATABLE_FIELDS`` are accepted.  Use
    ``update_status`` for status changes.

    Args:
        agent_id: The agent identifier.
        **updates: Fields to update on the AgentIdentity.

    Returns:
        Updated agent identity.

    Raises:
        AgentNotFoundError: If the agent is not found.
        ValueError: If any field is not in the allowlist.
    """
    disallowed = set(updates.keys()) - self._UPDATABLE_FIELDS
    if disallowed:
        msg = (
            f"Fields not allowed for update_identity: "
            f"{sorted(disallowed)}; allowed: {sorted(self._UPDATABLE_FIELDS)}"
        )
        logger.warning(
            HR_REGISTRY_IDENTITY_UPDATED,
            agent_id=str(agent_id),
            error=msg,
        )
        raise ValueError(msg)

    key = str(agent_id)
    async with self._lock:
        identity = self._agents.get(key)
        if identity is None:
            msg = f"Agent {agent_id!r} not found in registry"
            logger.warning(
                HR_REGISTRY_IDENTITY_UPDATED,
                agent_id=key,
                error=msg,
            )
            raise AgentNotFoundError(msg)
        updated = identity.model_copy(update=updates)
        self._agents[key] = updated

    logger.info(
        HR_REGISTRY_IDENTITY_UPDATED,
        agent_id=key,
        updated_fields=sorted(updates.keys()),
    )
    return updated

agent_count async

agent_count()

Number of agents currently in the registry.

Source code in src/synthorg/hr/registry.py
async def agent_count(self) -> int:
    """Number of agents currently in the registry."""
    async with self._lock:
        return len(self._agents)

Hiring Service

hiring_service

Hiring service.

Orchestrates the hiring pipeline: request creation, candidate generation, approval submission, and agent instantiation.

HiringService

HiringService(
    *, registry, approval_store=None, onboarding_service=None, default_model_config=None
)

Orchestrates the hiring pipeline.

Manages the lifecycle of hiring requests from creation through candidate generation, approval, and agent instantiation.

Parameters:

Name Type Description Default
registry AgentRegistryService

Agent registry for registering new agents.

required
approval_store ApprovalStore | None

Optional approval store for human approval.

None
onboarding_service OnboardingService | None

Optional onboarding service to start onboarding after instantiation.

None
default_model_config ModelConfig | None

Optional default model configuration for newly created agents. Falls back to generic defaults if not provided.

None
Source code in src/synthorg/hr/hiring_service.py
def __init__(
    self,
    *,
    registry: AgentRegistryService,
    approval_store: ApprovalStore | None = None,
    onboarding_service: OnboardingService | None = None,
    default_model_config: ModelConfig | None = None,
) -> None:
    self._registry = registry
    self._approval_store = approval_store
    self._onboarding_service = onboarding_service
    self._default_model_config = default_model_config
    self._requests: dict[str, HiringRequest] = {}

create_request async

create_request(
    *,
    requested_by,
    department,
    role,
    level,
    required_skills=(),
    reason,
    budget_limit_monthly=None,
    template_name=None,
)

Create a new hiring request.

Parameters:

Name Type Description Default
requested_by NotBlankStr

Request initiator.

required
department NotBlankStr

Target department.

required
role NotBlankStr

Desired role.

required
level str

Desired seniority level.

required
required_skills tuple[NotBlankStr, ...]

Required skills.

()
reason NotBlankStr

Business justification.

required
budget_limit_monthly float | None

Optional monthly budget limit.

None
template_name str | None

Template for candidate generation.

None

Returns:

Type Description
HiringRequest

The created hiring request.

Source code in src/synthorg/hr/hiring_service.py
async def create_request(  # noqa: PLR0913
    self,
    *,
    requested_by: NotBlankStr,
    department: NotBlankStr,
    role: NotBlankStr,
    level: str,
    required_skills: tuple[NotBlankStr, ...] = (),
    reason: NotBlankStr,
    budget_limit_monthly: float | None = None,
    template_name: str | None = None,
) -> HiringRequest:
    """Create a new hiring request.

    Args:
        requested_by: Request initiator.
        department: Target department.
        role: Desired role.
        level: Desired seniority level.
        required_skills: Required skills.
        reason: Business justification.
        budget_limit_monthly: Optional monthly budget limit.
        template_name: Template for candidate generation.

    Returns:
        The created hiring request.
    """
    try:
        parsed_level = SeniorityLevel(level)
    except ValueError as exc:
        msg = f"Invalid seniority level {level!r} for hiring request"
        logger.warning(
            HR_HIRING_REQUEST_CREATED,
            error=msg,
            level=level,
        )
        raise HiringError(msg) from exc

    request = HiringRequest(
        requested_by=requested_by,
        department=department,
        role=role,
        level=parsed_level,
        required_skills=required_skills,
        reason=reason,
        budget_limit_monthly=budget_limit_monthly,
        template_name=template_name,
        created_at=datetime.now(UTC),
    )
    self._requests[str(request.id)] = request

    logger.info(
        HR_HIRING_REQUEST_CREATED,
        request_id=str(request.id),
        department=str(department),
        role=str(role),
    )
    return request

generate_candidate async

generate_candidate(request)

Generate a candidate card for a hiring request.

Builds a CandidateCard from role/level defaults. In the future, this can be extended with template presets and LLM customization.

Parameters:

Name Type Description Default
request HiringRequest

The hiring request to generate a candidate for.

required

Returns:

Type Description
HiringRequest

Updated request with the new candidate appended.

Source code in src/synthorg/hr/hiring_service.py
async def generate_candidate(
    self,
    request: HiringRequest,
) -> HiringRequest:
    """Generate a candidate card for a hiring request.

    Builds a ``CandidateCard`` from role/level defaults. In the
    future, this can be extended with template presets and LLM
    customization.

    Args:
        request: The hiring request to generate a candidate for.

    Returns:
        Updated request with the new candidate appended.
    """
    request = self._get_request(str(request.id))

    candidate = CandidateCard(
        name=NotBlankStr(f"{request.role}-{request.department}-agent"),
        role=request.role,
        department=request.department,
        level=request.level,
        skills=request.required_skills,
        rationale=NotBlankStr(
            f"Generated for: {request.reason}",
        ),
        estimated_monthly_cost=(
            request.budget_limit_monthly
            if request.budget_limit_monthly is not None
            else 50.0
        ),
        template_source=request.template_name,
    )

    updated = request.model_copy(
        update={"candidates": (*request.candidates, candidate)},
    )
    self._requests[str(updated.id)] = updated

    logger.info(
        HR_HIRING_CANDIDATE_GENERATED,
        request_id=str(request.id),
        candidate_id=str(candidate.id),
    )
    return updated

submit_for_approval async

submit_for_approval(request, candidate_id)

Submit a candidate for approval.

If no approval store is configured, auto-approves the request.

Parameters:

Name Type Description Default
request HiringRequest

The hiring request.

required
candidate_id str

ID of the candidate to approve.

required

Returns:

Type Description
HiringRequest

Updated request with approval status.

Raises:

Type Description
InvalidCandidateError

If the candidate ID is not found.

Source code in src/synthorg/hr/hiring_service.py
async def submit_for_approval(
    self,
    request: HiringRequest,
    candidate_id: str,
) -> HiringRequest:
    """Submit a candidate for approval.

    If no approval store is configured, auto-approves the request.

    Args:
        request: The hiring request.
        candidate_id: ID of the candidate to approve.

    Returns:
        Updated request with approval status.

    Raises:
        InvalidCandidateError: If the candidate ID is not found.
    """
    request = self._get_request(str(request.id))

    candidate = next(
        (c for c in request.candidates if str(c.id) == candidate_id),
        None,
    )
    if candidate is None:
        msg = f"Candidate {candidate_id!r} not found on request {request.id!r}"
        logger.warning(
            HR_HIRING_APPROVAL_SUBMITTED,
            request_id=str(request.id),
            error=msg,
        )
        raise InvalidCandidateError(msg)

    if self._approval_store is None:
        # Auto-approve when no approval store.
        updated = request.model_copy(
            update={
                "status": HiringRequestStatus.APPROVED,
                "selected_candidate_id": candidate_id,
            },
        )
    else:
        # Create an approval item.
        updated = await self._submit_approval_item(request, candidate, candidate_id)

    self._requests[str(updated.id)] = updated

    logger.info(
        HR_HIRING_APPROVAL_SUBMITTED,
        request_id=str(request.id),
        candidate_id=candidate_id,
        auto_approved=self._approval_store is None,
    )
    return updated

instantiate_agent async

instantiate_agent(request)

Instantiate an agent from an approved hiring request.

Parameters:

Name Type Description Default
request HiringRequest

The approved hiring request.

required

Returns:

Type Description
AgentIdentity

The newly created agent identity.

Raises:

Type Description
HiringApprovalRequiredError

If request is not approved.

HiringRejectedError

If request was rejected.

InvalidCandidateError

If no candidate is selected.

HiringError

If instantiation fails.

Source code in src/synthorg/hr/hiring_service.py
async def instantiate_agent(
    self,
    request: HiringRequest,
) -> AgentIdentity:
    """Instantiate an agent from an approved hiring request.

    Args:
        request: The approved hiring request.

    Returns:
        The newly created agent identity.

    Raises:
        HiringApprovalRequiredError: If request is not approved.
        HiringRejectedError: If request was rejected.
        InvalidCandidateError: If no candidate is selected.
        HiringError: If instantiation fails.
    """
    request = self._get_request(str(request.id))
    self._validate_instantiation_status(request)
    candidate = self._find_selected_candidate(request)

    identity = self._build_agent_identity(candidate)
    await self._register_agent(identity, request)

    # Update request status.
    updated = request.model_copy(
        update={"status": HiringRequestStatus.INSTANTIATED},
    )
    self._requests[str(updated.id)] = updated

    # Start onboarding if service is available.
    await self._try_onboard(identity)

    logger.info(
        HR_HIRING_INSTANTIATED,
        request_id=str(request.id),
        agent_id=str(identity.id),
        agent_name=str(identity.name),
    )
    return identity

Onboarding Service

onboarding_service

Onboarding service.

Manages agent onboarding checklists, step tracking, and automatic activation upon checklist completion.

OnboardingService

OnboardingService(*, registry)

Manages onboarding checklists and step tracking.

Creates checklists with all OnboardingStep values when onboarding starts. When all steps are completed, automatically transitions the agent to ACTIVE status via the registry.

Parameters:

Name Type Description Default
registry AgentRegistryService

Agent registry for status updates.

required
Source code in src/synthorg/hr/onboarding_service.py
def __init__(
    self,
    *,
    registry: AgentRegistryService,
) -> None:
    self._registry = registry
    self._checklists: dict[str, OnboardingChecklist] = {}

start_onboarding async

start_onboarding(agent_id)

Start onboarding for a newly hired agent.

Creates a checklist with all onboarding steps in PENDING state.

Parameters:

Name Type Description Default
agent_id str

Agent to onboard.

required

Returns:

Type Description
OnboardingChecklist

The created onboarding checklist.

Raises:

Type Description
OnboardingError

If a checklist already exists.

Source code in src/synthorg/hr/onboarding_service.py
async def start_onboarding(self, agent_id: str) -> OnboardingChecklist:
    """Start onboarding for a newly hired agent.

    Creates a checklist with all onboarding steps in PENDING state.

    Args:
        agent_id: Agent to onboard.

    Returns:
        The created onboarding checklist.

    Raises:
        OnboardingError: If a checklist already exists.
    """
    agent = await self._registry.get(NotBlankStr(agent_id))
    if agent is None:
        msg = f"Agent {agent_id!r} not found in registry"
        logger.warning(HR_ONBOARDING_STARTED, agent_id=agent_id, error=msg)
        raise OnboardingError(msg)

    if agent_id in self._checklists:
        msg = f"Onboarding checklist already exists for agent {agent_id!r}"
        logger.warning(HR_ONBOARDING_STARTED, agent_id=agent_id, error=msg)
        raise OnboardingError(msg)

    steps = tuple(OnboardingStepRecord(step=step) for step in OnboardingStep)
    checklist = OnboardingChecklist(
        agent_id=agent_id,
        steps=steps,
        started_at=datetime.now(UTC),
    )
    self._checklists[agent_id] = checklist

    logger.info(
        HR_ONBOARDING_STARTED,
        agent_id=agent_id,
        step_count=len(steps),
    )
    return checklist

complete_step async

complete_step(agent_id, step, *, notes='')

Mark an onboarding step as complete.

When all steps are completed, automatically transitions the agent to ACTIVE status.

Parameters:

Name Type Description Default
agent_id str

Agent being onboarded.

required
step OnboardingStep

The step to complete.

required
notes str

Optional notes for the step.

''

Returns:

Type Description
OnboardingChecklist

Updated onboarding checklist.

Raises:

Type Description
OnboardingError

If no checklist exists for the agent.

Source code in src/synthorg/hr/onboarding_service.py
async def complete_step(
    self,
    agent_id: str,
    step: OnboardingStep,
    *,
    notes: str = "",
) -> OnboardingChecklist:
    """Mark an onboarding step as complete.

    When all steps are completed, automatically transitions the
    agent to ACTIVE status.

    Args:
        agent_id: Agent being onboarded.
        step: The step to complete.
        notes: Optional notes for the step.

    Returns:
        Updated onboarding checklist.

    Raises:
        OnboardingError: If no checklist exists for the agent.
    """
    checklist = self._checklists.get(agent_id)
    if checklist is None:
        msg = f"No onboarding checklist for agent {agent_id!r}"
        logger.warning(
            HR_ONBOARDING_STEP_COMPLETE,
            agent_id=agent_id,
            error=msg,
        )
        raise OnboardingError(msg)

    now = datetime.now(UTC)
    step_found = any(s.step == step and not s.completed for s in checklist.steps)
    if not step_found:
        logger.warning(
            HR_ONBOARDING_STEP_COMPLETE,
            agent_id=agent_id,
            step=step.value,
            skipped="step_not_found_or_already_completed",
        )
        return checklist

    updated_steps = tuple(
        s.model_copy(
            update={
                "completed": True,
                "completed_at": now,
                "notes": notes,
            },
        )
        if s.step == step and not s.completed
        else s
        for s in checklist.steps
    )

    updated = checklist.model_copy(update={"steps": updated_steps})

    # Check if all steps are now complete.
    if updated.is_complete and not checklist.is_complete:
        updated = updated.model_copy(update={"completed_at": now})
        await self._registry.update_status(agent_id, AgentStatus.ACTIVE)
        logger.info(HR_ONBOARDING_COMPLETE, agent_id=agent_id)

    self._checklists[agent_id] = updated

    logger.info(
        HR_ONBOARDING_STEP_COMPLETE,
        agent_id=agent_id,
        step=step.value,
    )
    return updated

get_checklist async

get_checklist(agent_id)

Retrieve the onboarding checklist for an agent.

Parameters:

Name Type Description Default
agent_id str

Agent to look up.

required

Returns:

Type Description
OnboardingChecklist | None

The checklist, or None if not found.

Source code in src/synthorg/hr/onboarding_service.py
async def get_checklist(
    self,
    agent_id: str,
) -> OnboardingChecklist | None:
    """Retrieve the onboarding checklist for an agent.

    Args:
        agent_id: Agent to look up.

    Returns:
        The checklist, or None if not found.
    """
    return self._checklists.get(agent_id)

Offboarding Service

offboarding_service

Offboarding service.

Orchestrates the firing/offboarding pipeline: task reassignment, memory archival, team notification, and agent termination.

OffboardingService

OffboardingService(
    *,
    registry,
    reassignment_strategy,
    archival_strategy,
    memory_backend=None,
    archival_store=None,
    org_memory_backend=None,
    message_bus=None,
    task_repository=None,
)

Orchestrates the firing/offboarding pipeline.

Pipeline steps
  1. Get active tasks and reassign via strategy.
  2. Archive memory via archival strategy.
  3. Notify team via message bus.
  4. Update agent status to TERMINATED and return record.

Parameters:

Name Type Description Default
registry AgentRegistryService

Agent registry for status updates.

required
reassignment_strategy TaskReassignmentStrategy

Strategy for task reassignment.

required
archival_strategy MemoryArchivalStrategy

Strategy for memory archival.

required
memory_backend MemoryBackend | None

Optional hot memory store.

None
archival_store ArchivalStore | None

Optional cold archival storage.

None
org_memory_backend OrgMemoryBackend | None

Optional org memory for promotion.

None
message_bus MessageBus | None

Optional message bus for notifications.

None
task_repository TaskRepository | None

Optional task repository for queries.

None
Source code in src/synthorg/hr/offboarding_service.py
def __init__(  # noqa: PLR0913
    self,
    *,
    registry: AgentRegistryService,
    reassignment_strategy: TaskReassignmentStrategy,
    archival_strategy: MemoryArchivalStrategy,
    memory_backend: MemoryBackend | None = None,
    archival_store: ArchivalStore | None = None,
    org_memory_backend: OrgMemoryBackend | None = None,
    message_bus: MessageBus | None = None,
    task_repository: TaskRepository | None = None,
) -> None:
    self._registry = registry
    self._reassignment_strategy = reassignment_strategy
    self._archival_strategy = archival_strategy
    self._memory_backend = memory_backend
    self._archival_store = archival_store
    self._org_memory_backend = org_memory_backend
    self._message_bus = message_bus
    self._task_repository = task_repository

offboard async

offboard(request)

Execute the full offboarding pipeline.

Parameters:

Name Type Description Default
request FiringRequest

The firing request to process.

required

Returns:

Type Description
OffboardingRecord

Record of the completed offboarding.

Raises:

Type Description
OffboardingError

If task reassignment fails (fatal).

AgentNotFoundError

If the agent is not in the registry (fatal).

Note

Memory archival and team notification failures are logged but non-fatal -- offboarding continues.

Source code in src/synthorg/hr/offboarding_service.py
async def offboard(
    self,
    request: FiringRequest,
) -> OffboardingRecord:
    """Execute the full offboarding pipeline.

    Args:
        request: The firing request to process.

    Returns:
        Record of the completed offboarding.

    Raises:
        OffboardingError: If task reassignment fails (fatal).
        AgentNotFoundError: If the agent is not in the registry
            (fatal).

    Note:
        Memory archival and team notification failures are logged
        but non-fatal -- offboarding continues.
    """
    started_at = datetime.now(UTC)
    agent_id = str(request.agent_id)

    logger.info(
        HR_FIRING_INITIATED,
        agent_id=agent_id,
        reason=request.reason.value,
    )

    # Verify agent exists in registry.
    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found in registry"
        logger.warning(HR_FIRING_INITIATED, agent_id=agent_id, error=msg)
        raise AgentNotFoundError(msg)

    # Step 1: Get active tasks and reassign.
    tasks_reassigned = await self._reassign_tasks(agent_id)

    # Step 2: Archive memory.
    archival_result = await self._archive_memory(agent_id, identity)

    # Step 3: Notify team.
    team_notified = await self._notify_team(
        agent_id, identity, request.reason.value
    )

    # Step 4: Terminate agent.
    await self._terminate_agent(agent_id)

    completed_at = datetime.now(UTC)
    record = OffboardingRecord(
        agent_id=NotBlankStr(agent_id),
        agent_name=identity.name,
        firing_request_id=request.id,
        tasks_reassigned=tasks_reassigned,
        memory_archive_id=None,
        org_memories_promoted=archival_result.promoted_to_org,
        team_notification_sent=team_notified,
        started_at=started_at,
        completed_at=completed_at,
    )

    logger.info(
        HR_FIRING_COMPLETE,
        agent_id=agent_id,
        tasks_reassigned=len(tasks_reassigned),
        memories_archived=archival_result.total_archived,
    )
    return record

Enums

enums

HR domain enumerations.

HiringRequestStatus

Bases: StrEnum

Status of a hiring request through the approval pipeline.

FiringReason

Bases: StrEnum

Reason for agent termination.

OnboardingStep

Bases: StrEnum

Steps in the agent onboarding checklist.

LifecycleEventType

Bases: StrEnum

Type of agent lifecycle event.

ActivityEventType

Bases: StrEnum

Event types produced by the activity feed timeline.

Superset of LifecycleEventType plus operational event types generated from task metrics, cost records, tool invocations, and delegation records.

PromotionDirection

Bases: StrEnum

Direction of a seniority level change.

TrendDirection

Bases: StrEnum

Direction of a performance metric trend.

Errors

errors

HR domain error hierarchy.

HRError

Bases: Exception

Base error for all HR operations.

HiringError

Bases: HRError

Error during the hiring process.

HiringApprovalRequiredError

Bases: HiringError

Hiring request requires approval before instantiation.

HiringRejectedError

Bases: HiringError

Hiring request was rejected.

InvalidCandidateError

Bases: HiringError

Candidate card is invalid or does not exist on the request.

FiringError

Bases: HRError

Error during the firing process.

OffboardingError

Bases: HRError

Error during the offboarding pipeline.

TaskReassignmentError

Bases: OffboardingError

Failed to reassign tasks from a departing agent.

MemoryArchivalError

Bases: OffboardingError

Failed to archive agent memories during offboarding.

OnboardingError

Bases: HRError

Error during the onboarding process.

AgentRegistryError

Bases: HRError

Error in the agent registry.

AgentNotFoundError

Bases: AgentRegistryError

Agent not found in the registry.

AgentAlreadyRegisteredError

Bases: AgentRegistryError

Agent is already registered.

PerformanceError

Bases: HRError

Error in the performance tracking system.

InsufficientDataError

Bases: PerformanceError

Not enough data points for a meaningful computation.

PromotionError

Bases: HRError

Error during the promotion/demotion process.

PromotionCooldownError

Bases: PromotionError

Promotion is blocked by the cooldown period.

PromotionApprovalRequiredError

Bases: PromotionError

Promotion requires human approval before proceeding.

Performance

config

Performance tracking configuration.

PerformanceConfig pydantic-model

Bases: BaseModel

Configuration for the performance tracking system.

Attributes:

Name Type Description
min_data_points int

Minimum data points for meaningful aggregation.

windows tuple[NotBlankStr, ...]

Time window labels for rolling metrics.

improving_threshold float

Slope threshold for improving trend.

declining_threshold float

Slope threshold for declining trend.

collaboration_weights dict[str, float] | None

Optional custom weights for collaboration scoring components.

llm_sampling_rate float

Fraction of collaboration events sampled by LLM (0.01 = 1%).

llm_sampling_model NotBlankStr | None

Model ID for LLM calibration sampling (None = disabled).

calibration_retention_days int

Days to retain LLM calibration records.

quality_judge_model NotBlankStr | None

Model ID for LLM quality judge (None = disabled).

quality_judge_provider NotBlankStr | None

Provider name for LLM quality judge (None = auto from model ref). Requires quality_judge_model.

quality_ci_weight float

Weight for CI signal in composite quality score (default 0.4).

quality_llm_weight float

Weight for LLM judge in composite quality score (default 0.6).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_quality_judge_provider_requires_model
  • _validate_threshold_ordering
  • _validate_quality_weights_sum

min_data_points pydantic-field

min_data_points = 5

Minimum data points for meaningful aggregation

windows pydantic-field

windows = (NotBlankStr('7d'), NotBlankStr('30d'), NotBlankStr('90d'))

Time window labels for rolling metrics

improving_threshold pydantic-field

improving_threshold = 0.05

Slope threshold for improving trend

declining_threshold pydantic-field

declining_threshold = -0.05

Slope threshold for declining trend

collaboration_weights pydantic-field

collaboration_weights = None

Custom weights for collaboration scoring components

llm_sampling_rate pydantic-field

llm_sampling_rate = 0.01

Fraction of collaboration events sampled by LLM (0.01 = 1%)

llm_sampling_model pydantic-field

llm_sampling_model = None

Model ID for LLM calibration sampling (None = disabled)

calibration_retention_days pydantic-field

calibration_retention_days = 90

Days to retain LLM calibration records

quality_judge_model pydantic-field

quality_judge_model = None

Model ID for LLM quality judge (None = disabled)

quality_judge_provider pydantic-field

quality_judge_provider = None

Provider name for LLM quality judge (None = auto from model ref)

quality_ci_weight pydantic-field

quality_ci_weight = 0.4

Weight for CI signal in composite quality score. Together with quality_llm_weight, must sum to 1.0.

quality_llm_weight pydantic-field

quality_llm_weight = 0.6

Weight for LLM judge in composite quality score. Together with quality_ci_weight, must sum to 1.0.

models

Performance tracking domain models.

Frozen Pydantic models for task metrics, collaboration metrics, quality/collaboration scoring results, trend detection, and rolling-window aggregates.

TaskMetricRecord pydantic-model

Bases: BaseModel

Record of a single task completion for performance tracking.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent who completed the task.

task_id NotBlankStr

Task identifier.

task_type TaskType

Classification of the task.

started_at AwareDatetime | None

When the task started (None if not tracked).

completed_at AwareDatetime

When the task was completed.

is_success bool

Whether the task completed successfully.

duration_seconds float

Wall-clock execution time.

cost_usd float

Cost of the task in USD (base currency).

turns_used int

Number of LLM turns used.

tokens_used int

Total tokens consumed.

quality_score float | None

Quality score (0.0-10.0), None if not scored.

complexity Complexity

Estimated task complexity.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_temporal_ordering

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent who completed the task

task_id pydantic-field

task_id

Task identifier

task_type pydantic-field

task_type

Classification of the task

started_at pydantic-field

started_at = None

When the task started (None if not tracked)

completed_at pydantic-field

completed_at

When the task was completed

is_success pydantic-field

is_success

Whether the task completed successfully

duration_seconds pydantic-field

duration_seconds

Wall-clock execution time

cost_usd pydantic-field

cost_usd

Cost of the task in USD (base currency)

turns_used pydantic-field

turns_used

Number of LLM turns used

tokens_used pydantic-field

tokens_used

Total tokens consumed

quality_score pydantic-field

quality_score = None

Quality score (0.0-10.0)

complexity pydantic-field

complexity

Estimated task complexity

CollaborationMetricRecord pydantic-model

Bases: BaseModel

Record of a collaboration behavior data point.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent being measured.

recorded_at AwareDatetime

When the observation was recorded.

delegation_success bool | None

Whether a delegation was successful.

delegation_response_seconds float | None

Response time for a delegation.

conflict_constructiveness float | None

How constructively conflict was handled.

meeting_contribution float | None

Quality of meeting contribution.

loop_triggered bool

Whether the agent triggered a delegation loop.

handoff_completeness float | None

Completeness of task handoff (0.0-1.0).

interaction_summary NotBlankStr | None

Text summary of the interaction for LLM calibration (None if not available).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent being measured

recorded_at pydantic-field

recorded_at

When the observation was recorded

delegation_success pydantic-field

delegation_success = None

Whether a delegation was successful

delegation_response_seconds pydantic-field

delegation_response_seconds = None

Response time for a delegation

conflict_constructiveness pydantic-field

conflict_constructiveness = None

How constructively conflict was handled

meeting_contribution pydantic-field

meeting_contribution = None

Quality of meeting contribution

loop_triggered pydantic-field

loop_triggered = False

Whether the agent triggered a delegation loop

handoff_completeness pydantic-field

handoff_completeness = None

Completeness of task handoff

interaction_summary pydantic-field

interaction_summary = None

Text summary of the interaction for LLM calibration

QualityScoreResult pydantic-model

Bases: BaseModel

Result of a quality scoring evaluation.

Attributes:

Name Type Description
score float

Overall quality score (0.0-10.0).

strategy_name NotBlankStr

Name of the scoring strategy used.

breakdown tuple[tuple[NotBlankStr, float], ...]

Score components as (name, value) pairs.

confidence float

Confidence in the score (0.0-1.0).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

score pydantic-field

score

Overall quality score

strategy_name pydantic-field

strategy_name

Scoring strategy used

breakdown pydantic-field

breakdown = ()

Score components as (name, value) pairs

confidence pydantic-field

confidence

Confidence in the score

CollaborationScoreResult pydantic-model

Bases: BaseModel

Result of a collaboration scoring evaluation.

Attributes:

Name Type Description
score float

Overall collaboration score (0.0-10.0).

strategy_name NotBlankStr

Name of the scoring strategy used.

component_scores tuple[tuple[NotBlankStr, float], ...]

Per-component scores as (name, value) pairs.

confidence float

Confidence in the score (0.0-1.0).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

score pydantic-field

score

Overall collaboration score

strategy_name pydantic-field

strategy_name

Scoring strategy used

component_scores pydantic-field

component_scores = ()

Per-component scores as (name, value) pairs

confidence pydantic-field

confidence

Confidence in the score

override_active pydantic-field

override_active = False

Whether a human override is active

LlmCalibrationRecord pydantic-model

Bases: BaseModel

Record of an LLM calibration sample for collaboration scoring.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent being evaluated.

sampled_at AwareDatetime

When the LLM evaluation occurred.

interaction_record_id NotBlankStr

ID of the sampled CollaborationMetricRecord.

llm_score float

LLM-assigned collaboration score (0.0-10.0).

behavioral_score float

Behavioral strategy score at time of sampling.

drift float

Absolute difference between LLM and behavioral scores (computed).

rationale NotBlankStr

LLM's explanation for the score.

model_used NotBlankStr

Which LLM model was used for evaluation.

cost_usd float

Cost of the LLM call in USD (base currency).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent being evaluated

sampled_at pydantic-field

sampled_at

When the LLM evaluation occurred

interaction_record_id pydantic-field

interaction_record_id

ID of the sampled CollaborationMetricRecord

llm_score pydantic-field

llm_score

LLM-assigned collaboration score

behavioral_score pydantic-field

behavioral_score

Behavioral strategy score at time of sampling

drift property

drift

Absolute difference between LLM and behavioral scores.

rationale pydantic-field

rationale

LLM's explanation for the score

model_used pydantic-field

model_used

Which LLM model was used for evaluation

cost_usd pydantic-field

cost_usd

Cost of the LLM call in USD (base currency)

CollaborationOverride pydantic-model

Bases: _BaseOverride

Human-applied override for an agent's collaboration score.

Fields:

Validators:

  • _validate_expiration_ordering

QualityOverride pydantic-model

Bases: _BaseOverride

Human-applied override for an agent's quality score.

Fields:

Validators:

  • _validate_expiration_ordering

TrendResult pydantic-model

Bases: BaseModel

Result of a trend detection analysis.

Attributes:

Name Type Description
metric_name NotBlankStr

Name of the metric being trended.

window_size NotBlankStr

Time window label (e.g. '7d', '30d').

direction TrendDirection

Detected trend direction.

slope float

Computed slope of the trend line.

data_point_count int

Number of data points used.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

metric_name pydantic-field

metric_name

Metric being trended

window_size pydantic-field

window_size

Time window label

direction pydantic-field

direction

Detected trend direction

slope pydantic-field

slope

Slope of the trend line

data_point_count pydantic-field

data_point_count

Number of data points used

WindowMetrics pydantic-model

Bases: BaseModel

Aggregate metrics for a rolling time window.

Attributes:

Name Type Description
window_size NotBlankStr

Time window label (e.g. '7d', '30d').

data_point_count int

Number of records in the window.

tasks_completed int

Number of successful tasks.

tasks_failed int

Number of failed tasks.

avg_quality_score float | None

Average quality score, None if insufficient data.

avg_cost_per_task float | None

Average cost per task, None if insufficient data.

avg_completion_time_seconds float | None

Average time, None if insufficient data.

avg_tokens_per_task float | None

Average tokens, None if insufficient data.

success_rate float | None

Task success rate (0.0-1.0), None if no tasks.

collaboration_score float | None

Collaboration score, None if not computed.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_task_counts

window_size pydantic-field

window_size

Time window label

data_point_count pydantic-field

data_point_count

Records in the window

tasks_completed pydantic-field

tasks_completed

Number of successful tasks

tasks_failed pydantic-field

tasks_failed

Number of failed tasks

avg_quality_score pydantic-field

avg_quality_score = None

Average quality score

avg_cost_per_task pydantic-field

avg_cost_per_task = None

Average cost per task in USD (base currency)

avg_completion_time_seconds pydantic-field

avg_completion_time_seconds = None

Average completion time

avg_tokens_per_task pydantic-field

avg_tokens_per_task = None

Average tokens per task

success_rate pydantic-field

success_rate = None

Task success rate

collaboration_score pydantic-field

collaboration_score = None

Collaboration score

AgentPerformanceSnapshot pydantic-model

Bases: BaseModel

Complete performance snapshot for an agent at a point in time.

Attributes:

Name Type Description
agent_id NotBlankStr

The agent being evaluated.

computed_at AwareDatetime

When this snapshot was computed.

windows tuple[WindowMetrics, ...]

Rolling window metrics.

trends tuple[TrendResult, ...]

Detected trends per metric.

overall_quality_score float | None

Aggregate quality score.

overall_collaboration_score float | None

Aggregate collaboration score.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

agent_id pydantic-field

agent_id

Agent being evaluated

computed_at pydantic-field

computed_at

When this snapshot was computed

windows pydantic-field

windows = ()

Rolling window metrics

trends pydantic-field

trends = ()

Detected trends per metric

overall_quality_score pydantic-field

overall_quality_score = None

Aggregate quality score

overall_collaboration_score pydantic-field

overall_collaboration_score = None

Aggregate collaboration score

tracker

Performance tracker service.

Central service for recording and querying agent performance metrics. Delegates scoring, windowing, and trend detection to pluggable strategies.

PerformanceTracker

PerformanceTracker(
    *,
    quality_strategy=None,
    collaboration_strategy=None,
    window_strategy=None,
    trend_strategy=None,
    config=None,
    sampler=None,
    override_store=None,
    quality_override_store=None,
)

Central service for recording and querying agent performance metrics.

In-memory storage keyed by agent_id. Delegates scoring, windowing, and trend detection to injected strategy implementations.

When strategies are not provided, sensible defaults are constructed (window and trend strategies use values from PerformanceConfig).

Parameters:

Name Type Description Default
quality_strategy QualityScoringStrategy | None

Strategy for scoring task quality.

None
collaboration_strategy CollaborationScoringStrategy | None

Strategy for scoring collaboration.

None
window_strategy MetricsWindowStrategy | None

Strategy for computing rolling windows.

None
trend_strategy TrendDetectionStrategy | None

Strategy for detecting trends.

None
config PerformanceConfig | None

Performance tracking configuration.

None
sampler LlmCalibrationSampler | None

LLM calibration sampler (None = disabled).

None
override_store CollaborationOverrideStore | None

Collaboration override store (None = disabled).

None
quality_override_store QualityOverrideStore | None

Quality override store (None = disabled).

None
Source code in src/synthorg/hr/performance/tracker.py
def __init__(  # noqa: PLR0913
    self,
    *,
    quality_strategy: QualityScoringStrategy | None = None,
    collaboration_strategy: CollaborationScoringStrategy | None = None,
    window_strategy: MetricsWindowStrategy | None = None,
    trend_strategy: TrendDetectionStrategy | None = None,
    config: PerformanceConfig | None = None,
    sampler: LlmCalibrationSampler | None = None,
    override_store: CollaborationOverrideStore | None = None,
    quality_override_store: QualityOverrideStore | None = None,
) -> None:
    cfg = config or PerformanceConfig()
    self._config = cfg
    self._quality_strategy = quality_strategy or self._default_quality()
    self._collaboration_strategy = (
        collaboration_strategy or self._default_collaboration(cfg)
    )
    self._window_strategy = window_strategy or self._default_window(cfg)
    self._trend_strategy = trend_strategy or self._default_trend(cfg)
    self._sampler = sampler
    self._override_store = override_store
    self._quality_override_store = quality_override_store
    self._task_metrics: dict[str, list[TaskMetricRecord]] = {}
    self._collab_metrics: dict[str, list[CollaborationMetricRecord]] = {}
    self._background_tasks: set[asyncio.Task[None]] = set()

override_store property

override_store

Return the collaboration override store, if configured.

quality_override_store property

quality_override_store

Return the quality override store, if configured.

sampler property

sampler

Return the LLM calibration sampler, if configured.

aclose async

aclose()

Cancel and await all pending background tasks.

Should be called during application shutdown to prevent RuntimeError: Task was destroyed but it is pending! warnings.

Source code in src/synthorg/hr/performance/tracker.py
async def aclose(self) -> None:
    """Cancel and await all pending background tasks.

    Should be called during application shutdown to prevent
    ``RuntimeError: Task was destroyed but it is pending!``
    warnings.
    """
    tasks = list(self._background_tasks)
    for t in tasks:
        t.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)
    self._background_tasks.clear()

record_task_metric async

record_task_metric(record)

Record a task completion metric.

Parameters:

Name Type Description Default
record TaskMetricRecord

The task metric record to store.

required

Returns:

Type Description
TaskMetricRecord

The stored record.

Source code in src/synthorg/hr/performance/tracker.py
async def record_task_metric(
    self,
    record: TaskMetricRecord,
) -> TaskMetricRecord:
    """Record a task completion metric.

    Args:
        record: The task metric record to store.

    Returns:
        The stored record.
    """
    agent_key = str(record.agent_id)
    if agent_key not in self._task_metrics:
        self._task_metrics[agent_key] = []
    self._task_metrics[agent_key].append(record)

    logger.info(
        PERF_METRIC_RECORDED,
        agent_id=record.agent_id,
        task_id=record.task_id,
        is_success=record.is_success,
    )
    return record

score_task_quality async

score_task_quality(*, agent_id, task_id, task_result, acceptance_criteria=())

Score task quality and update the record.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent who completed the task.

required
task_id NotBlankStr

Task identifier.

required
task_result TaskMetricRecord

Recorded task metrics.

required
acceptance_criteria tuple[AcceptanceCriterion, ...]

Criteria to evaluate against.

()

Returns:

Type Description
TaskMetricRecord

Updated record with quality score.

Source code in src/synthorg/hr/performance/tracker.py
async def score_task_quality(
    self,
    *,
    agent_id: NotBlankStr,
    task_id: NotBlankStr,
    task_result: TaskMetricRecord,
    acceptance_criteria: tuple[AcceptanceCriterion, ...] = (),
) -> TaskMetricRecord:
    """Score task quality and update the record.

    Args:
        agent_id: Agent who completed the task.
        task_id: Task identifier.
        task_result: Recorded task metrics.
        acceptance_criteria: Criteria to evaluate against.

    Returns:
        Updated record with quality score.
    """
    result = await self._quality_strategy.score(
        agent_id=agent_id,
        task_id=task_id,
        task_result=task_result,
        acceptance_criteria=acceptance_criteria,
    )
    return task_result.model_copy(update={"quality_score": result.score})

record_collaboration_event async

record_collaboration_event(record)

Record a collaboration behavior data point.

If an LLM sampler is configured and the record has an interaction_summary, the sampler is invoked probabilistically.

Parameters:

Name Type Description Default
record CollaborationMetricRecord

Collaboration metric record to store.

required
Source code in src/synthorg/hr/performance/tracker.py
async def record_collaboration_event(
    self,
    record: CollaborationMetricRecord,
) -> None:
    """Record a collaboration behavior data point.

    If an LLM sampler is configured and the record has an
    ``interaction_summary``, the sampler is invoked probabilistically.

    Args:
        record: Collaboration metric record to store.
    """
    agent_key = str(record.agent_id)
    if agent_key not in self._collab_metrics:
        self._collab_metrics[agent_key] = []
    self._collab_metrics[agent_key].append(record)

    logger.debug(
        PERF_METRIC_RECORDED,
        agent_id=record.agent_id,
        metric_type="collaboration",
    )

    self._schedule_sampling(record)

get_collaboration_score async

get_collaboration_score(agent_id, *, now=None)

Compute collaboration score for an agent.

Returns the active human override if one exists; otherwise delegates to the collaboration scoring strategy.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
now AwareDatetime | None

Reference time for override expiration check (defaults to current UTC time).

None

Returns:

Type Description
CollaborationScoreResult

Collaboration score result.

Source code in src/synthorg/hr/performance/tracker.py
async def get_collaboration_score(
    self,
    agent_id: NotBlankStr,
    *,
    now: AwareDatetime | None = None,
) -> CollaborationScoreResult:
    """Compute collaboration score for an agent.

    Returns the active human override if one exists; otherwise
    delegates to the collaboration scoring strategy.

    Args:
        agent_id: Agent to evaluate.
        now: Reference time for override expiration check
            (defaults to current UTC time).

    Returns:
        Collaboration score result.
    """
    if self._override_store is not None:
        override = self._override_store.get_active_override(
            agent_id,
            now=now,
        )
        if override is not None:
            logger.info(
                PERF_OVERRIDE_APPLIED,
                agent_id=agent_id,
                score=override.score,
                applied_by=override.applied_by,
            )
            return CollaborationScoreResult(
                score=override.score,
                strategy_name=NotBlankStr("human_override"),
                component_scores=(),
                confidence=1.0,
                override_active=True,
            )

    records = tuple(self._collab_metrics.get(str(agent_id), []))
    return await self._collaboration_strategy.score(
        agent_id=agent_id,
        records=records,
    )

get_snapshot async

get_snapshot(agent_id, *, now=None)

Compute a full performance snapshot for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
now AwareDatetime | None

Reference time (defaults to current UTC time).

None

Returns:

Type Description
AgentPerformanceSnapshot

Complete performance snapshot with windows and trends.

Source code in src/synthorg/hr/performance/tracker.py
async def get_snapshot(
    self,
    agent_id: NotBlankStr,
    *,
    now: AwareDatetime | None = None,
) -> AgentPerformanceSnapshot:
    """Compute a full performance snapshot for an agent.

    Args:
        agent_id: Agent to evaluate.
        now: Reference time (defaults to current UTC time).

    Returns:
        Complete performance snapshot with windows and trends.
    """
    if now is None:
        now = datetime.now(UTC)

    agent_key = str(agent_id)
    task_records = tuple(self._task_metrics.get(agent_key, []))

    # Compute windows.
    windows = self._window_strategy.compute_windows(
        task_records,
        now=now,
    )

    # Compute trends for quality and cost metrics.
    trends = self._compute_trends(task_records, windows, now=now)

    # Overall quality: average of all scored records.
    scored = [r.quality_score for r in task_records if r.quality_score is not None]
    overall_quality = round(sum(scored) / len(scored), 4) if scored else None

    # Overall collaboration score (respects active overrides).
    collab_result = await self.get_collaboration_score(
        agent_id,
        now=now,
    )
    overall_collab = collab_result.score if collab_result.confidence > 0.0 else None

    snapshot = AgentPerformanceSnapshot(
        agent_id=agent_id,
        computed_at=now,
        windows=windows,
        trends=tuple(trends),
        overall_quality_score=overall_quality,
        overall_collaboration_score=overall_collab,
    )

    logger.info(
        PERF_SNAPSHOT_COMPUTED,
        agent_id=agent_id,
        window_count=len(windows),
        trend_count=len(trends),
    )
    return snapshot

get_task_metrics

get_task_metrics(*, agent_id=None, since=None, until=None)

Query raw task metric records with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent.

None
since AwareDatetime | None

Include records after this time.

None
until AwareDatetime | None

Include records before this time.

None

Returns:

Type Description
tuple[TaskMetricRecord, ...]

Matching task metric records.

Source code in src/synthorg/hr/performance/tracker.py
def get_task_metrics(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    since: AwareDatetime | None = None,
    until: AwareDatetime | None = None,
) -> tuple[TaskMetricRecord, ...]:
    """Query raw task metric records with optional filters.

    Args:
        agent_id: Filter by agent.
        since: Include records after this time.
        until: Include records before this time.

    Returns:
        Matching task metric records.
    """
    if agent_id is not None:
        records = list(self._task_metrics.get(str(agent_id), []))
    else:
        records = [r for recs in self._task_metrics.values() for r in recs]

    if since is not None:
        records = [r for r in records if r.completed_at >= since]
    if until is not None:
        records = [r for r in records if r.completed_at < until]
    return tuple(records)

get_collaboration_metrics

get_collaboration_metrics(*, agent_id=None, since=None, until=None)

Query collaboration metric records with optional filters.

Parameters:

Name Type Description Default
agent_id NotBlankStr | None

Filter by agent.

None
since AwareDatetime | None

Include records after this time.

None
until AwareDatetime | None

Include records before this time.

None

Returns:

Type Description
tuple[CollaborationMetricRecord, ...]

Matching collaboration metric records.

Source code in src/synthorg/hr/performance/tracker.py
def get_collaboration_metrics(
    self,
    *,
    agent_id: NotBlankStr | None = None,
    since: AwareDatetime | None = None,
    until: AwareDatetime | None = None,
) -> tuple[CollaborationMetricRecord, ...]:
    """Query collaboration metric records with optional filters.

    Args:
        agent_id: Filter by agent.
        since: Include records after this time.
        until: Include records before this time.

    Returns:
        Matching collaboration metric records.
    """
    if agent_id is not None:
        records = list(self._collab_metrics.get(str(agent_id), []))
    else:
        records = [r for recs in self._collab_metrics.values() for r in recs]

    if since is not None:
        records = [r for r in records if r.recorded_at >= since]
    if until is not None:
        records = [r for r in records if r.recorded_at < until]
    return tuple(records)

quality_protocol

Quality scoring strategy protocol.

Defines the interface for pluggable quality scoring strategies that evaluate task completion quality (see Agents design page, D2).

QualityScoringStrategy

Bases: Protocol

Strategy for scoring task completion quality.

Implementations evaluate task results against acceptance criteria and other quality signals to produce a normalized score.

name property

name

Human-readable strategy name.

score async

score(*, agent_id, task_id, task_result, acceptance_criteria)

Score task completion quality.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent who completed the task.

required
task_id NotBlankStr

Task identifier.

required
task_result TaskMetricRecord

Recorded task metrics.

required
acceptance_criteria tuple[AcceptanceCriterion, ...]

Criteria to evaluate against.

required

Returns:

Type Description
QualityScoreResult

Quality score result with breakdown and confidence.

Source code in src/synthorg/hr/performance/quality_protocol.py
async def score(
    self,
    *,
    agent_id: NotBlankStr,
    task_id: NotBlankStr,
    task_result: TaskMetricRecord,
    acceptance_criteria: tuple[AcceptanceCriterion, ...],
) -> QualityScoreResult:
    """Score task completion quality.

    Args:
        agent_id: Agent who completed the task.
        task_id: Task identifier.
        task_result: Recorded task metrics.
        acceptance_criteria: Criteria to evaluate against.

    Returns:
        Quality score result with breakdown and confidence.
    """
    ...

collaboration_protocol

Collaboration scoring strategy protocol.

Defines the interface for pluggable collaboration scoring strategies that evaluate agent collaboration behavior (see Agents design page, D3).

CollaborationScoringStrategy

Bases: Protocol

Strategy for scoring agent collaboration behavior.

Implementations evaluate behavioral telemetry records to produce a normalized collaboration score.

name property

name

Human-readable strategy name.

score async

score(*, agent_id, records, role_weights=None)

Score agent collaboration behavior.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent being evaluated.

required
records tuple[CollaborationMetricRecord, ...]

Collaboration metric records to evaluate.

required
role_weights dict[str, float] | None

Optional per-component weight overrides.

None

Returns:

Type Description
CollaborationScoreResult

Collaboration score result with component scores.

Source code in src/synthorg/hr/performance/collaboration_protocol.py
async def score(
    self,
    *,
    agent_id: NotBlankStr,
    records: tuple[CollaborationMetricRecord, ...],
    role_weights: dict[str, float] | None = None,
) -> CollaborationScoreResult:
    """Score agent collaboration behavior.

    Args:
        agent_id: Agent being evaluated.
        records: Collaboration metric records to evaluate.
        role_weights: Optional per-component weight overrides.

    Returns:
        Collaboration score result with component scores.
    """
    ...

trend_protocol

Trend detection strategy protocol.

Defines the interface for pluggable trend detection strategies that analyze metric time series (see Agents design page, D12).

TrendDetectionStrategy

Bases: Protocol

Strategy for detecting trends in metric time series.

Implementations analyze sequences of (timestamp, value) pairs to determine whether a metric is improving, stable, or declining.

name property

name

Human-readable strategy name.

detect

detect(*, metric_name, values, window_size)

Detect the trend direction in a metric time series.

Parameters:

Name Type Description Default
metric_name NotBlankStr

Name of the metric being analyzed.

required
values tuple[tuple[AwareDatetime, float], ...]

Time series data as (timestamp, value) pairs.

required
window_size NotBlankStr

Time window label for context.

required

Returns:

Type Description
TrendResult

Trend detection result with direction and slope.

Source code in src/synthorg/hr/performance/trend_protocol.py
def detect(
    self,
    *,
    metric_name: NotBlankStr,
    values: tuple[tuple[AwareDatetime, float], ...],
    window_size: NotBlankStr,
) -> TrendResult:
    """Detect the trend direction in a metric time series.

    Args:
        metric_name: Name of the metric being analyzed.
        values: Time series data as (timestamp, value) pairs.
        window_size: Time window label for context.

    Returns:
        Trend detection result with direction and slope.
    """
    ...

Promotion

config

Promotion configuration models.

Defines PromotionConfig and sub-configs for controlling promotion/demotion behavior.

PromotionCriteriaConfig pydantic-model

Bases: BaseModel

Configuration for promotion criteria evaluation.

Attributes:

Name Type Description
min_criteria_met int

Minimum number of criteria that must be met.

required_criteria tuple[NotBlankStr, ...]

Criteria names that must always be met.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

min_criteria_met pydantic-field

min_criteria_met = 2

Minimum number of criteria that must be met (max 3)

required_criteria pydantic-field

required_criteria = ()

Criteria names that must always be met

PromotionApprovalConfig pydantic-model

Bases: BaseModel

Configuration for promotion approval decisions.

Attributes:

Name Type Description
human_approval_from_level SeniorityLevel

Seniority level from which human approval is required for promotion.

auto_demote_cost_saving bool

Auto-apply cost-saving demotions.

human_demote_authority bool

Require human approval for authority-reducing demotions.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

human_approval_from_level pydantic-field

human_approval_from_level = SENIOR

Level from which human approval is required

auto_demote_cost_saving pydantic-field

auto_demote_cost_saving = True

Auto-apply cost-saving demotions

human_demote_authority pydantic-field

human_demote_authority = True

Human approval for authority-reducing demotions

ModelMappingConfig pydantic-model

Bases: BaseModel

Configuration for model mapping on seniority changes.

Attributes:

Name Type Description
model_follows_seniority bool

Whether model changes with seniority.

seniority_model_map Any

Explicit level-to-model overrides.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_model_map_keys

model_follows_seniority pydantic-field

model_follows_seniority = True

Whether model follows seniority level

seniority_model_map pydantic-field

seniority_model_map

Explicit seniority level to model ID overrides (wrapped as MappingProxyType after validation)

PromotionConfig pydantic-model

Bases: BaseModel

Top-level promotion/demotion configuration.

Attributes:

Name Type Description
enabled bool

Whether the promotion subsystem is enabled.

cooldown_hours int

Hours between consecutive promotions/demotions.

criteria PromotionCriteriaConfig

Promotion criteria configuration.

approval PromotionApprovalConfig

Promotion approval configuration.

model_mapping ModelMappingConfig

Model mapping configuration.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

enabled pydantic-field

enabled = True

Whether the promotion subsystem is enabled

cooldown_hours pydantic-field

cooldown_hours = 24

Hours between consecutive promotions/demotions

criteria pydantic-field

criteria

Promotion criteria configuration

approval pydantic-field

approval

Promotion approval configuration

model_mapping pydantic-field

model_mapping

Model mapping configuration

models

Promotion domain models.

Frozen Pydantic models for promotion criteria results, evaluations, approval decisions, records, and requests.

CriterionResult pydantic-model

Bases: BaseModel

Result of a single promotion/demotion criterion evaluation.

Attributes:

Name Type Description
name NotBlankStr

Criterion name.

met bool

Whether the criterion was met.

current_value float

Agent's current value for this criterion.

threshold float

Required threshold value.

weight float | None

Weight of this criterion (None if not weighted).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

name pydantic-field

name

Criterion name

met pydantic-field

met

Whether the criterion was met

current_value pydantic-field

current_value

Agent's current value

threshold pydantic-field

threshold

Required threshold value

weight pydantic-field

weight = None

Weight of this criterion

PromotionEvaluation pydantic-model

Bases: BaseModel

Result of evaluating an agent for promotion or demotion.

Attributes:

Name Type Description
agent_id NotBlankStr

Agent being evaluated.

current_level SeniorityLevel

Current seniority level.

target_level SeniorityLevel

Target seniority level.

direction PromotionDirection

Whether this is a promotion or demotion.

criteria_results tuple[CriterionResult, ...]

Individual criterion results.

required_criteria_met bool

Whether all required criteria are met.

eligible bool

Whether the agent is eligible for the change.

evaluated_at AwareDatetime

When the evaluation was performed.

strategy_name NotBlankStr

Strategy that performed the evaluation.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_direction_consistency

agent_id pydantic-field

agent_id

Agent being evaluated

current_level pydantic-field

current_level

Current seniority level

target_level pydantic-field

target_level

Target seniority level

direction pydantic-field

direction

Promotion or demotion

criteria_results pydantic-field

criteria_results = ()

Individual criterion results

required_criteria_met pydantic-field

required_criteria_met

Whether all required criteria are met

eligible pydantic-field

eligible

Whether the agent is eligible for the change

evaluated_at pydantic-field

evaluated_at

When the evaluation was performed

strategy_name pydantic-field

strategy_name

Strategy that performed the evaluation

criteria_met_count property

criteria_met_count

Number of criteria that were met.

PromotionApprovalDecision pydantic-model

Bases: BaseModel

Decision on whether a promotion needs human approval.

Attributes:

Name Type Description
auto_approve bool

Whether the promotion can be auto-approved.

reason NotBlankStr

Explanation for the decision.

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

auto_approve pydantic-field

auto_approve

Whether auto-approved

reason pydantic-field

reason

Explanation for the decision

requires_human property

requires_human

Whether human approval is required (inverse of auto_approve).

PromotionRecord pydantic-model

Bases: BaseModel

Record of a completed promotion or demotion.

Attributes:

Name Type Description
id NotBlankStr

Unique record identifier.

agent_id NotBlankStr

Agent who was promoted/demoted.

agent_name NotBlankStr

Agent display name.

old_level SeniorityLevel

Previous seniority level.

new_level SeniorityLevel

New seniority level.

direction PromotionDirection

Whether this was a promotion or demotion.

evaluation PromotionEvaluation

The evaluation that led to this change.

approved_by NotBlankStr | None

Who approved the change ("auto" if auto-approved, "human" if human-approved via approval_id).

approval_id NotBlankStr | None

Approval item ID if human-approved.

effective_at AwareDatetime

When the change took effect.

initiated_by NotBlankStr

Who initiated the promotion process.

model_changed bool

Whether the model was changed.

old_model_id NotBlankStr | None

Previous model ID (None if not changed).

new_model_id NotBlankStr | None

New model ID (None if not changed).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

Validators:

  • _validate_model_fields

id pydantic-field

id

Unique record identifier

agent_id pydantic-field

agent_id

Agent who was promoted/demoted

agent_name pydantic-field

agent_name

Agent display name

old_level pydantic-field

old_level

Previous seniority level

new_level pydantic-field

new_level

New seniority level

direction pydantic-field

direction

Promotion or demotion

evaluation pydantic-field

evaluation

Evaluation that led to this change

approved_by pydantic-field

approved_by = None

Who approved the change

approval_id pydantic-field

approval_id = None

Approval item ID if human-approved

effective_at pydantic-field

effective_at

When the change took effect

initiated_by pydantic-field

initiated_by

Who initiated the promotion process

model_changed pydantic-field

model_changed = False

Whether the model was changed

old_model_id pydantic-field

old_model_id = None

Previous model ID

new_model_id pydantic-field

new_model_id = None

New model ID

PromotionRequest pydantic-model

Bases: BaseModel

A pending promotion or demotion request.

Attributes:

Name Type Description
id NotBlankStr

Unique request identifier.

agent_id NotBlankStr

Agent being promoted/demoted.

agent_name NotBlankStr

Agent display name.

current_level SeniorityLevel

Current seniority level.

target_level SeniorityLevel

Target seniority level.

direction PromotionDirection

Whether this is a promotion or demotion.

evaluation PromotionEvaluation

The evaluation supporting this request.

status ApprovalStatus

Current approval status.

created_at AwareDatetime

When the request was created.

approval_id NotBlankStr | None

Linked approval item ID (for human approval).

Config:

  • frozen: True
  • allow_inf_nan: False

Fields:

id pydantic-field

id

Unique request identifier

agent_id pydantic-field

agent_id

Agent being promoted/demoted

agent_name pydantic-field

agent_name

Agent display name

current_level pydantic-field

current_level

Current seniority level

target_level pydantic-field

target_level

Target seniority level

direction pydantic-field

direction

Promotion or demotion

evaluation pydantic-field

evaluation

Evaluation supporting this request

status pydantic-field

status = PENDING

Current approval status

created_at pydantic-field

created_at

When the request was created

approval_id pydantic-field

approval_id = None

Linked approval item ID

service

Promotion service orchestrator.

Central service for managing agent promotions and demotions, including criteria evaluation, approval decisions, model mapping, and trust integration.

PromotionService

PromotionService(
    *,
    criteria_strategy,
    approval_strategy,
    model_mapping_strategy,
    registry,
    tracker,
    config,
    approval_store=None,
    trust_service=None,
    on_notification=None,
)

Orchestrates agent promotions and demotions.

Coordinates criteria evaluation, approval decisions, model mapping, registry updates, and optional trust re-evaluation.

Parameters:

Name Type Description Default
criteria_strategy PromotionCriteriaStrategy

Strategy for evaluating promotion criteria.

required
approval_strategy PromotionApprovalStrategy

Strategy for approval decisions.

required
model_mapping_strategy ModelMappingStrategy

Strategy for model resolution.

required
registry AgentRegistryService

Agent registry service.

required
tracker PerformanceTracker

Performance tracker.

required
config PromotionConfig

Promotion configuration.

required
approval_store ApprovalStore | None

Optional approval store for human approval.

None
trust_service TrustService | None

Optional trust service for re-evaluation.

None
on_notification PromotionNotificationCallback | None

Optional callback to notify agents/teams of promotion or demotion events. Wired by the communication layer when available.

None
Source code in src/synthorg/hr/promotion/service.py
def __init__(  # noqa: PLR0913
    self,
    *,
    criteria_strategy: PromotionCriteriaStrategy,
    approval_strategy: PromotionApprovalStrategy,
    model_mapping_strategy: ModelMappingStrategy,
    registry: AgentRegistryService,
    tracker: PerformanceTracker,
    config: PromotionConfig,
    approval_store: ApprovalStore | None = None,
    trust_service: TrustService | None = None,
    on_notification: PromotionNotificationCallback | None = None,
) -> None:
    self._criteria = criteria_strategy
    self._approval = approval_strategy
    self._model_mapping = model_mapping_strategy
    self._registry = registry
    self._tracker = tracker
    self._config = config
    self._approval_store = approval_store
    self._trust_service = trust_service
    self._on_notification = on_notification
    self._promotion_history: dict[str, list[PromotionRecord]] = {}
    self._cooldown_until: dict[str, AwareDatetime] = {}

evaluate_promotion async

evaluate_promotion(agent_id)

Evaluate whether an agent qualifies for promotion.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required

Returns:

Type Description
PromotionEvaluation

Promotion evaluation result.

Raises:

Type Description
PromotionError

If the agent cannot be promoted.

Source code in src/synthorg/hr/promotion/service.py
async def evaluate_promotion(
    self,
    agent_id: NotBlankStr,
) -> PromotionEvaluation:
    """Evaluate whether an agent qualifies for promotion.

    Args:
        agent_id: Agent to evaluate.

    Returns:
        Promotion evaluation result.

    Raises:
        PromotionError: If the agent cannot be promoted.
    """
    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    target = _next_level(identity.level)
    if target is None:
        msg = f"Agent {agent_id!r} is already at maximum seniority"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            current_level=identity.level.value,
            error=msg,
        )
        raise PromotionError(msg)

    logger.debug(
        PROMOTION_EVALUATE_START,
        agent_id=agent_id,
        current_level=identity.level.value,
        target_level=target.value,
    )

    snapshot = await self._tracker.get_snapshot(agent_id)

    evaluation = await self._criteria.evaluate(
        agent_id=agent_id,
        current_level=identity.level,
        target_level=target,
        snapshot=snapshot,
    )

    logger.debug(
        PROMOTION_EVALUATE_COMPLETE,
        agent_id=agent_id,
        eligible=evaluation.eligible,
    )
    return evaluation

evaluate_demotion async

evaluate_demotion(agent_id)

Evaluate whether an agent should be demoted.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required

Returns:

Type Description
PromotionEvaluation

Demotion evaluation result.

Raises:

Type Description
PromotionError

If the agent cannot be demoted.

Source code in src/synthorg/hr/promotion/service.py
async def evaluate_demotion(
    self,
    agent_id: NotBlankStr,
) -> PromotionEvaluation:
    """Evaluate whether an agent should be demoted.

    Args:
        agent_id: Agent to evaluate.

    Returns:
        Demotion evaluation result.

    Raises:
        PromotionError: If the agent cannot be demoted.
    """
    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    target = _prev_level(identity.level)
    if target is None:
        msg = f"Agent {agent_id!r} is already at minimum seniority"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            current_level=identity.level.value,
            error=msg,
        )
        raise PromotionError(msg)

    logger.debug(
        PROMOTION_EVALUATE_START,
        agent_id=agent_id,
        current_level=identity.level.value,
        target_level=target.value,
        direction="demotion",
    )

    snapshot = await self._tracker.get_snapshot(agent_id)

    return await self._criteria.evaluate(
        agent_id=agent_id,
        current_level=identity.level,
        target_level=target,
        snapshot=snapshot,
    )

request_promotion async

request_promotion(agent_id, evaluation, *, initiated_by=_SYSTEM_INITIATOR)

Create a promotion/demotion request.

Checks cooldown, evaluates approval decision, and creates an approval item if human approval is needed.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to promote/demote.

required
evaluation PromotionEvaluation

The evaluation result.

required
initiated_by NotBlankStr

Who initiated the request.

_SYSTEM_INITIATOR

Returns:

Type Description
PromotionRequest

Promotion request.

Raises:

Type Description
PromotionCooldownError

If in cooldown period.

PromotionError

If agent not found.

Source code in src/synthorg/hr/promotion/service.py
async def request_promotion(
    self,
    agent_id: NotBlankStr,
    evaluation: PromotionEvaluation,
    *,
    initiated_by: NotBlankStr = _SYSTEM_INITIATOR,
) -> PromotionRequest:
    """Create a promotion/demotion request.

    Checks cooldown, evaluates approval decision, and creates
    an approval item if human approval is needed.

    Args:
        agent_id: Agent to promote/demote.
        evaluation: The evaluation result.
        initiated_by: Who initiated the request.

    Returns:
        Promotion request.

    Raises:
        PromotionCooldownError: If in cooldown period.
        PromotionError: If agent not found.
    """
    if not evaluation.eligible:
        msg = f"Agent {agent_id!r} is not eligible for {evaluation.direction.value}"
        logger.warning(
            PROMOTION_EVALUATE_FAILED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    if self.is_in_cooldown(agent_id):
        until = self._cooldown_until.get(str(agent_id))
        msg = f"Agent {agent_id!r} is in cooldown until {until}"
        logger.info(
            PROMOTION_COOLDOWN_ACTIVE,
            agent_id=agent_id,
            until=str(until),
        )
        raise PromotionCooldownError(msg)

    identity = await self._registry.get(agent_id)
    if identity is None:
        msg = f"Agent {agent_id!r} not found"
        logger.warning(
            PROMOTION_REQUESTED,
            agent_id=agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    decision = await self._approval.decide(
        evaluation=evaluation,
        agent_identity=identity,
    )

    now = datetime.now(UTC)
    approval_id: NotBlankStr | None = None
    status = ApprovalStatus.PENDING

    if decision.auto_approve:
        status = ApprovalStatus.APPROVED
    elif decision.requires_human:
        if self._approval_store is None:
            msg = (
                f"Promotion for agent {agent_id!r} requires human "
                f"approval but no approval store is configured"
            )
            logger.warning(
                PROMOTION_REQUESTED,
                agent_id=agent_id,
                error=msg,
            )
            raise PromotionError(msg)
        approval_id = await self._create_approval(
            agent_id=agent_id,
            evaluation=evaluation,
            initiated_by=initiated_by,
        )

    request = PromotionRequest(
        agent_id=agent_id,
        agent_name=identity.name,
        current_level=evaluation.current_level,
        target_level=evaluation.target_level,
        direction=evaluation.direction,
        evaluation=evaluation,
        status=status,
        created_at=now,
        approval_id=approval_id,
    )

    logger.info(
        PROMOTION_REQUESTED,
        agent_id=agent_id,
        direction=evaluation.direction.value,
        status=status.value,
    )
    return request

apply_promotion async

apply_promotion(request, *, initiated_by=_SYSTEM_INITIATOR)

Apply a promotion/demotion from an approved request.

Updates the agent's seniority level, resolves model mapping, triggers trust re-evaluation, and records the lifecycle event.

Parameters:

Name Type Description Default
request PromotionRequest

Approved promotion request.

required
initiated_by NotBlankStr

Who initiated the application.

_SYSTEM_INITIATOR

Returns:

Type Description
PromotionRecord

Promotion record.

Raises:

Type Description
PromotionApprovalRequiredError

If request is not approved.

PromotionError

If agent not found.

Source code in src/synthorg/hr/promotion/service.py
async def apply_promotion(
    self,
    request: PromotionRequest,
    *,
    initiated_by: NotBlankStr = _SYSTEM_INITIATOR,
) -> PromotionRecord:
    """Apply a promotion/demotion from an approved request.

    Updates the agent's seniority level, resolves model mapping,
    triggers trust re-evaluation, and records the lifecycle event.

    Args:
        request: Approved promotion request.
        initiated_by: Who initiated the application.

    Returns:
        Promotion record.

    Raises:
        PromotionApprovalRequiredError: If request is not approved.
        PromotionError: If agent not found.
    """
    if request.status != ApprovalStatus.APPROVED:
        event = (
            PROMOTION_REJECTED
            if request.status == ApprovalStatus.REJECTED
            else PROMOTION_REQUESTED
        )
        logger.warning(
            event,
            agent_id=request.agent_id,
            status=request.status.value,
        )
        msg = f"Cannot apply promotion: request status is {request.status.value}"
        raise PromotionApprovalRequiredError(msg)

    await self._verify_approval(request)

    identity = await self._registry.get(request.agent_id)
    if identity is None:
        msg = f"Agent {request.agent_id!r} not found"
        logger.warning(
            PROMOTION_APPLIED,
            agent_id=request.agent_id,
            error=msg,
        )
        raise PromotionError(msg)

    # Resolve model mapping
    new_model_id = self._model_mapping.resolve_model(
        agent_identity=identity,
        new_level=request.target_level,
    )

    updates: dict[str, object] = {"level": request.target_level}
    if new_model_id is not None:
        updates["model"] = identity.model.model_copy(
            update={"model_id": NotBlankStr(new_model_id)},
        )
        logger.info(
            PROMOTION_MODEL_CHANGED,
            agent_id=request.agent_id,
            old_model=str(identity.model.model_id),
            new_model=new_model_id,
        )

    await self._registry.update_identity(
        request.agent_id,
        **updates,
    )

    now = datetime.now(UTC)
    record = PromotionRecord(
        agent_id=request.agent_id,
        agent_name=request.agent_name,
        old_level=request.current_level,
        new_level=request.target_level,
        direction=request.direction,
        evaluation=request.evaluation,
        approved_by=(
            NotBlankStr("auto")
            if request.approval_id is None
            else NotBlankStr("human")
        ),
        approval_id=request.approval_id,
        effective_at=now,
        initiated_by=initiated_by,
        model_changed=new_model_id is not None,
        old_model_id=(
            identity.model.model_id if new_model_id is not None else None
        ),
        new_model_id=(
            NotBlankStr(new_model_id) if new_model_id is not None else None
        ),
    )

    self._promotion_history.setdefault(
        str(request.agent_id),
        [],
    ).append(record)

    if self._config.cooldown_hours > 0:
        self._cooldown_until[str(request.agent_id)] = now + timedelta(
            hours=self._config.cooldown_hours
        )

    # Best-effort trust re-evaluation -- promotion is already applied,
    # so failures here must not prevent the record from being returned.
    if self._trust_service is not None:
        try:
            snapshot = await self._tracker.get_snapshot(request.agent_id)
            await self._trust_service.evaluate_agent(
                request.agent_id,
                snapshot,
            )
        except Exception:
            logger.warning(
                PROMOTION_APPLIED,
                agent_id=request.agent_id,
                error="Trust re-evaluation failed after promotion; "
                "promotion still applied",
            )

    event = (
        PROMOTION_APPLIED
        if request.direction == PromotionDirection.PROMOTION
        else DEMOTION_APPLIED
    )
    logger.info(
        event,
        agent_id=request.agent_id,
        old_level=record.old_level.value,
        new_level=record.new_level.value,
        model_changed=record.model_changed,
    )

    # Notify agent and team -- best-effort, must not block the record.
    if self._on_notification is not None:
        try:
            await self._on_notification(record)
            logger.debug(
                PROMOTION_NOTIFICATION_SENT,
                agent_id=request.agent_id,
                direction=request.direction.value,
            )
        except Exception:
            logger.warning(
                PROMOTION_NOTIFICATION_SENT,
                agent_id=request.agent_id,
                error="Notification callback failed; promotion still applied",
            )

    return record

get_promotion_history

get_promotion_history(agent_id)

Get promotion/demotion history for an agent.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
tuple[PromotionRecord, ...]

Tuple of promotion records.

Source code in src/synthorg/hr/promotion/service.py
def get_promotion_history(
    self,
    agent_id: NotBlankStr,
) -> tuple[PromotionRecord, ...]:
    """Get promotion/demotion history for an agent.

    Args:
        agent_id: Agent identifier.

    Returns:
        Tuple of promotion records.
    """
    return tuple(self._promotion_history.get(str(agent_id), []))

is_in_cooldown

is_in_cooldown(agent_id)

Check whether an agent is in the promotion cooldown period.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent identifier.

required

Returns:

Type Description
bool

True if in cooldown.

Source code in src/synthorg/hr/promotion/service.py
def is_in_cooldown(self, agent_id: NotBlankStr) -> bool:
    """Check whether an agent is in the promotion cooldown period.

    Args:
        agent_id: Agent identifier.

    Returns:
        True if in cooldown.
    """
    until = self._cooldown_until.get(str(agent_id))
    if until is None:
        return False
    return datetime.now(UTC) < until

model_mapping_protocol

Model mapping strategy protocol.

Defines the pluggable interface for mapping seniority levels to LLM model identifiers.

ModelMappingStrategy

Bases: Protocol

Protocol for mapping seniority to LLM models.

Implementations determine which model an agent should use after a seniority level change.

name property

name

Strategy name identifier.

resolve_model

resolve_model(*, agent_identity, new_level)

Resolve the model for an agent at a new seniority level.

Parameters:

Name Type Description Default
agent_identity AgentIdentity

The agent's current identity.

required
new_level SeniorityLevel

The new seniority level.

required

Returns:

Type Description
str | None

New model_id, or None if no change needed.

Source code in src/synthorg/hr/promotion/model_mapping_protocol.py
def resolve_model(
    self,
    *,
    agent_identity: AgentIdentity,
    new_level: SeniorityLevel,
) -> str | None:
    """Resolve the model for an agent at a new seniority level.

    Args:
        agent_identity: The agent's current identity.
        new_level: The new seniority level.

    Returns:
        New model_id, or None if no change needed.
    """
    ...

criteria_protocol

Promotion criteria strategy protocol.

Defines the pluggable interface for evaluating promotion/demotion criteria.

PromotionCriteriaStrategy

Bases: Protocol

Protocol for promotion criteria evaluation.

Implementations define what criteria must be met for an agent to be promoted or demoted between seniority levels.

name property

name

Strategy name identifier.

evaluate async

evaluate(*, agent_id, current_level, target_level, snapshot)

Evaluate whether an agent meets criteria for level change.

Parameters:

Name Type Description Default
agent_id NotBlankStr

Agent to evaluate.

required
current_level SeniorityLevel

Current seniority level.

required
target_level SeniorityLevel

Target seniority level.

required
snapshot AgentPerformanceSnapshot

Agent performance snapshot.

required

Returns:

Type Description
PromotionEvaluation

Evaluation result with criteria details.

Source code in src/synthorg/hr/promotion/criteria_protocol.py
async def evaluate(
    self,
    *,
    agent_id: NotBlankStr,
    current_level: SeniorityLevel,
    target_level: SeniorityLevel,
    snapshot: AgentPerformanceSnapshot,
) -> PromotionEvaluation:
    """Evaluate whether an agent meets criteria for level change.

    Args:
        agent_id: Agent to evaluate.
        current_level: Current seniority level.
        target_level: Target seniority level.
        snapshot: Agent performance snapshot.

    Returns:
        Evaluation result with criteria details.
    """
    ...

approval_protocol

Promotion approval strategy protocol.

Defines the pluggable interface for deciding whether promotions require human approval.

PromotionApprovalStrategy

Bases: Protocol

Protocol for promotion approval decisions.

Implementations determine whether a promotion/demotion can be auto-approved or requires human intervention.

name property

name

Strategy name identifier.

decide async

decide(*, evaluation, agent_identity)

Decide whether a promotion needs human approval.

Parameters:

Name Type Description Default
evaluation PromotionEvaluation

The promotion evaluation result.

required
agent_identity AgentIdentity

The agent's current identity.

required

Returns:

Type Description
PromotionApprovalDecision

Approval decision.

Source code in src/synthorg/hr/promotion/approval_protocol.py
async def decide(
    self,
    *,
    evaluation: PromotionEvaluation,
    agent_identity: AgentIdentity,
) -> PromotionApprovalDecision:
    """Decide whether a promotion needs human approval.

    Args:
        evaluation: The promotion evaluation result.
        agent_identity: The agent's current identity.

    Returns:
        Approval decision.
    """
    ...