Skip to content

Protocols API

SubAgentDepsProtocol

subagents_pydantic_ai.SubAgentDepsProtocol

Bases: Protocol

Protocol for dependencies that support subagent management.

Any deps class that wants to use the subagent toolset must implement this protocol. The key requirement is a subagents dict for storing compiled agent instances and a clone_for_subagent method for creating isolated deps for nested subagents.

Example
Python
@dataclass
class MyDeps:
    subagents: dict[str, Any] = field(default_factory=dict)

    def clone_for_subagent(self, max_depth: int = 0) -> "MyDeps":
        return MyDeps(
            subagents={} if max_depth <= 0 else self.subagents,
        )
Source code in src/subagents_pydantic_ai/protocols.py
Python
@runtime_checkable
class SubAgentDepsProtocol(Protocol):
    """Protocol for dependencies that support subagent management.

    Any deps class that wants to use the subagent toolset must implement
    this protocol. The key requirement is a `subagents` dict for storing
    compiled agent instances and a `clone_for_subagent` method for creating
    isolated deps for nested subagents.

    Example:
        ```python
        @dataclass
        class MyDeps:
            subagents: dict[str, Any] = field(default_factory=dict)

            def clone_for_subagent(self, max_depth: int = 0) -> "MyDeps":
                return MyDeps(
                    subagents={} if max_depth <= 0 else self.subagents,
                )
        ```
    """

    subagents: dict[str, Any]

    def clone_for_subagent(self, max_depth: int = 0) -> SubAgentDepsProtocol:
        """Create a new deps instance for a subagent.

        Subagents typically get:
        - Shared resources (backend, files, etc.)
        - Empty or limited subagents dict (based on max_depth)
        - Fresh state for task-specific data

        Args:
            max_depth: Maximum nesting depth for the subagent.
                If 0, the subagent cannot spawn further subagents.

        Returns:
            A new deps instance configured for the subagent.
        """
        ...

clone_for_subagent(max_depth=0)

Create a new deps instance for a subagent.

Subagents typically get: - Shared resources (backend, files, etc.) - Empty or limited subagents dict (based on max_depth) - Fresh state for task-specific data

Parameters:

Name Type Description Default
max_depth int

Maximum nesting depth for the subagent. If 0, the subagent cannot spawn further subagents.

0

Returns:

Type Description
SubAgentDepsProtocol

A new deps instance configured for the subagent.

Source code in src/subagents_pydantic_ai/protocols.py
Python
def clone_for_subagent(self, max_depth: int = 0) -> SubAgentDepsProtocol:
    """Create a new deps instance for a subagent.

    Subagents typically get:
    - Shared resources (backend, files, etc.)
    - Empty or limited subagents dict (based on max_depth)
    - Fresh state for task-specific data

    Args:
        max_depth: Maximum nesting depth for the subagent.
            If 0, the subagent cannot spawn further subagents.

    Returns:
        A new deps instance configured for the subagent.
    """
    ...

MessageBusProtocol

subagents_pydantic_ai.MessageBusProtocol

Bases: Protocol

Protocol for message bus implementations.

The message bus enables communication between agents, supporting both fire-and-forget messages and request-response patterns.

Implementations can use different backends: - In-memory (default): Uses asyncio queues - Redis: For distributed multi-process setups - Custom: Any backend implementing this protocol

Example
Python
bus = InMemoryMessageBus()

# Register an agent
queue = bus.register_agent("worker-1")

# Send a message
await bus.send(AgentMessage(
    type=MessageType.TASK_UPDATE,
    sender="parent",
    receiver="worker-1",
    payload={"status": "starting"},
))

# Request-response pattern
response = await bus.ask(
    sender="parent",
    receiver="worker-1",
    question="What is your status?",
    task_id="task-123",
    timeout=30.0,
)
Source code in src/subagents_pydantic_ai/protocols.py
Python
@runtime_checkable
class MessageBusProtocol(Protocol):
    """Protocol for message bus implementations.

    The message bus enables communication between agents, supporting
    both fire-and-forget messages and request-response patterns.

    Implementations can use different backends:
    - In-memory (default): Uses asyncio queues
    - Redis: For distributed multi-process setups
    - Custom: Any backend implementing this protocol

    Example:
        ```python
        bus = InMemoryMessageBus()

        # Register an agent
        queue = bus.register_agent("worker-1")

        # Send a message
        await bus.send(AgentMessage(
            type=MessageType.TASK_UPDATE,
            sender="parent",
            receiver="worker-1",
            payload={"status": "starting"},
        ))

        # Request-response pattern
        response = await bus.ask(
            sender="parent",
            receiver="worker-1",
            question="What is your status?",
            task_id="task-123",
            timeout=30.0,
        )
        ```
    """

    async def send(self, message: AgentMessage) -> None:
        """Send a message to a specific agent.

        Args:
            message: The message to send. Must have a valid receiver.

        Raises:
            KeyError: If the receiver is not registered.
        """
        ...

    async def ask(
        self,
        sender: str,
        receiver: str,
        question: Any,
        task_id: str,
        timeout: float = 30.0,
    ) -> AgentMessage:
        """Send a question and wait for a response.

        This implements a request-response pattern where the sender
        blocks until the receiver answers or the timeout expires.

        Args:
            sender: ID of the asking agent.
            receiver: ID of the agent to ask.
            question: The question payload.
            task_id: Task ID for correlation.
            timeout: Maximum time to wait for response in seconds.

        Returns:
            The response message from the receiver.

        Raises:
            asyncio.TimeoutError: If no response within timeout.
            KeyError: If the receiver is not registered.
        """
        ...

    async def answer(self, original: AgentMessage, answer: Any) -> None:
        """Answer a previously received question.

        Args:
            original: The original question message.
            answer: The answer payload.
        """
        ...

    def register_agent(self, agent_id: str) -> asyncio.Queue[AgentMessage]:
        """Register an agent to receive messages.

        Args:
            agent_id: Unique identifier for the agent.

        Returns:
            A queue where messages for this agent will be delivered.

        Raises:
            ValueError: If agent_id is already registered.
        """
        ...

    def unregister_agent(self, agent_id: str) -> None:
        """Unregister an agent from the message bus.

        After unregistration, messages sent to this agent will raise errors.

        Args:
            agent_id: The agent to unregister.
        """
        ...

    async def get_messages(
        self,
        agent_id: str,
        timeout: float = 0.0,
    ) -> list[AgentMessage]:
        """Get pending messages for an agent.

        Non-blocking retrieval of all pending messages in the agent's queue.
        Optionally waits up to `timeout` seconds for at least one message.

        Args:
            agent_id: The agent to get messages for.
            timeout: Maximum time to wait for a message (0 = no wait).

        Returns:
            List of pending messages (may be empty).

        Raises:
            KeyError: If the agent is not registered.
        """
        ...

send(message) async

Send a message to a specific agent.

Parameters:

Name Type Description Default
message AgentMessage

The message to send. Must have a valid receiver.

required

Raises:

Type Description
KeyError

If the receiver is not registered.

Source code in src/subagents_pydantic_ai/protocols.py
Python
async def send(self, message: AgentMessage) -> None:
    """Send a message to a specific agent.

    Args:
        message: The message to send. Must have a valid receiver.

    Raises:
        KeyError: If the receiver is not registered.
    """
    ...

ask(sender, receiver, question, task_id, timeout=30.0) async

Send a question and wait for a response.

This implements a request-response pattern where the sender blocks until the receiver answers or the timeout expires.

Parameters:

Name Type Description Default
sender str

ID of the asking agent.

required
receiver str

ID of the agent to ask.

required
question Any

The question payload.

required
task_id str

Task ID for correlation.

required
timeout float

Maximum time to wait for response in seconds.

30.0

Returns:

Type Description
AgentMessage

The response message from the receiver.

Raises:

Type Description
TimeoutError

If no response within timeout.

KeyError

If the receiver is not registered.

Source code in src/subagents_pydantic_ai/protocols.py
Python
async def ask(
    self,
    sender: str,
    receiver: str,
    question: Any,
    task_id: str,
    timeout: float = 30.0,
) -> AgentMessage:
    """Send a question and wait for a response.

    This implements a request-response pattern where the sender
    blocks until the receiver answers or the timeout expires.

    Args:
        sender: ID of the asking agent.
        receiver: ID of the agent to ask.
        question: The question payload.
        task_id: Task ID for correlation.
        timeout: Maximum time to wait for response in seconds.

    Returns:
        The response message from the receiver.

    Raises:
        asyncio.TimeoutError: If no response within timeout.
        KeyError: If the receiver is not registered.
    """
    ...

answer(original, answer) async

Answer a previously received question.

Parameters:

Name Type Description Default
original AgentMessage

The original question message.

required
answer Any

The answer payload.

required
Source code in src/subagents_pydantic_ai/protocols.py
Python
async def answer(self, original: AgentMessage, answer: Any) -> None:
    """Answer a previously received question.

    Args:
        original: The original question message.
        answer: The answer payload.
    """
    ...

register_agent(agent_id)

Register an agent to receive messages.

Parameters:

Name Type Description Default
agent_id str

Unique identifier for the agent.

required

Returns:

Type Description
Queue[AgentMessage]

A queue where messages for this agent will be delivered.

Raises:

Type Description
ValueError

If agent_id is already registered.

Source code in src/subagents_pydantic_ai/protocols.py
Python
def register_agent(self, agent_id: str) -> asyncio.Queue[AgentMessage]:
    """Register an agent to receive messages.

    Args:
        agent_id: Unique identifier for the agent.

    Returns:
        A queue where messages for this agent will be delivered.

    Raises:
        ValueError: If agent_id is already registered.
    """
    ...

unregister_agent(agent_id)

Unregister an agent from the message bus.

After unregistration, messages sent to this agent will raise errors.

Parameters:

Name Type Description Default
agent_id str

The agent to unregister.

required
Source code in src/subagents_pydantic_ai/protocols.py
Python
def unregister_agent(self, agent_id: str) -> None:
    """Unregister an agent from the message bus.

    After unregistration, messages sent to this agent will raise errors.

    Args:
        agent_id: The agent to unregister.
    """
    ...

get_messages(agent_id, timeout=0.0) async

Get pending messages for an agent.

Non-blocking retrieval of all pending messages in the agent's queue. Optionally waits up to timeout seconds for at least one message.

Parameters:

Name Type Description Default
agent_id str

The agent to get messages for.

required
timeout float

Maximum time to wait for a message (0 = no wait).

0.0

Returns:

Type Description
list[AgentMessage]

List of pending messages (may be empty).

Raises:

Type Description
KeyError

If the agent is not registered.

Source code in src/subagents_pydantic_ai/protocols.py
Python
async def get_messages(
    self,
    agent_id: str,
    timeout: float = 0.0,
) -> list[AgentMessage]:
    """Get pending messages for an agent.

    Non-blocking retrieval of all pending messages in the agent's queue.
    Optionally waits up to `timeout` seconds for at least one message.

    Args:
        agent_id: The agent to get messages for.
        timeout: Maximum time to wait for a message (0 = no wait).

    Returns:
        List of pending messages (may be empty).

    Raises:
        KeyError: If the agent is not registered.
    """
    ...

InMemoryMessageBus

subagents_pydantic_ai.InMemoryMessageBus dataclass

In-memory message bus using asyncio queues.

This is the default message bus implementation, suitable for single-process applications. For distributed systems, consider implementing a Redis-based bus using the MessageBusProtocol.

Example
Python
bus = InMemoryMessageBus()

# Register agents
parent_queue = bus.register_agent("parent")
worker_queue = bus.register_agent("worker-1")

# Send a message
await bus.send(AgentMessage(
    type=MessageType.TASK_ASSIGNED,
    sender="parent",
    receiver="worker-1",
    payload={"task": "analyze data"},
    task_id="task-123",
))

# Worker receives message
msg = await worker_queue.get()
Source code in src/subagents_pydantic_ai/message_bus.py
Python
@dataclass
class InMemoryMessageBus:
    """In-memory message bus using asyncio queues.

    This is the default message bus implementation, suitable for
    single-process applications. For distributed systems, consider
    implementing a Redis-based bus using the MessageBusProtocol.

    Example:
        ```python
        bus = InMemoryMessageBus()

        # Register agents
        parent_queue = bus.register_agent("parent")
        worker_queue = bus.register_agent("worker-1")

        # Send a message
        await bus.send(AgentMessage(
            type=MessageType.TASK_ASSIGNED,
            sender="parent",
            receiver="worker-1",
            payload={"task": "analyze data"},
            task_id="task-123",
        ))

        # Worker receives message
        msg = await worker_queue.get()
        ```
    """

    _queues: dict[str, asyncio.Queue[AgentMessage]] = field(default_factory=dict)
    _pending_questions: dict[str, asyncio.Future[AgentMessage]] = field(default_factory=dict)
    _handlers: list[Callable[[AgentMessage], Awaitable[None]]] = field(default_factory=list)

    async def send(self, message: AgentMessage) -> None:
        """Send a message to a specific agent.

        Args:
            message: The message to send.

        Raises:
            KeyError: If the receiver is not registered.
        """
        if message.receiver not in self._queues:
            raise KeyError(f"Agent '{message.receiver}' is not registered")

        await self._queues[message.receiver].put(message)

        # Notify handlers
        for handler in self._handlers:
            try:
                await handler(message)
            except Exception:  # pragma: no cover
                pass  # Don't let handler errors break message delivery

    async def ask(
        self,
        sender: str,
        receiver: str,
        question: Any,
        task_id: str,
        timeout: float = 30.0,
    ) -> AgentMessage:
        """Send a question and wait for a response.

        Args:
            sender: ID of the asking agent.
            receiver: ID of the agent to ask.
            question: The question payload.
            task_id: Task ID for correlation.
            timeout: Maximum time to wait in seconds.

        Returns:
            The response message.

        Raises:
            asyncio.TimeoutError: If no response within timeout.
            KeyError: If the receiver is not registered.
        """
        if receiver not in self._queues:
            raise KeyError(f"Agent '{receiver}' is not registered")

        correlation_id = str(uuid.uuid4())

        # Create future for the response
        loop = asyncio.get_event_loop()
        response_future: asyncio.Future[AgentMessage] = loop.create_future()
        self._pending_questions[correlation_id] = response_future

        try:
            # Send the question
            message = AgentMessage(
                type=MessageType.QUESTION,
                sender=sender,
                receiver=receiver,
                payload=question,
                task_id=task_id,
                correlation_id=correlation_id,
            )
            await self.send(message)

            # Wait for response
            return await asyncio.wait_for(response_future, timeout=timeout)
        finally:
            # Clean up
            self._pending_questions.pop(correlation_id, None)

    async def answer(self, original: AgentMessage, answer: Any) -> None:
        """Answer a previously received question.

        Args:
            original: The original question message.
            answer: The answer payload.

        Raises:
            KeyError: If the original sender is not registered or
                     if there's no pending question with the correlation_id.
        """
        if original.sender not in self._queues:
            raise KeyError(f"Agent '{original.sender}' is not registered")

        response = AgentMessage(
            type=MessageType.ANSWER,
            sender=original.receiver,  # We are the original receiver
            receiver=original.sender,  # Send back to original sender
            payload=answer,
            task_id=original.task_id,
            correlation_id=original.correlation_id,
        )

        # If there's a pending future for this correlation_id, resolve it
        if original.correlation_id and original.correlation_id in self._pending_questions:
            future = self._pending_questions[original.correlation_id]
            if not future.done():
                future.set_result(response)
        else:
            # Otherwise, put in queue
            await self.send(response)

    def register_agent(self, agent_id: str) -> asyncio.Queue[AgentMessage]:
        """Register an agent to receive messages.

        Args:
            agent_id: Unique identifier for the agent.

        Returns:
            A queue where messages for this agent will be delivered.

        Raises:
            ValueError: If agent_id is already registered.
        """
        if agent_id in self._queues:
            raise ValueError(f"Agent '{agent_id}' is already registered")

        queue: asyncio.Queue[AgentMessage] = asyncio.Queue()
        self._queues[agent_id] = queue
        return queue

    def unregister_agent(self, agent_id: str) -> None:
        """Unregister an agent from the message bus.

        Args:
            agent_id: The agent to unregister.
        """
        self._queues.pop(agent_id, None)

    def add_handler(self, handler: Callable[[AgentMessage], Awaitable[None]]) -> None:
        """Add a message handler for debugging/logging.

        Handlers are called for every message sent through the bus.

        Args:
            handler: Async function that receives messages.
        """
        self._handlers.append(handler)

    def remove_handler(self, handler: Callable[[AgentMessage], Awaitable[None]]) -> None:
        """Remove a previously added handler.

        Args:
            handler: The handler to remove.
        """
        if handler in self._handlers:
            self._handlers.remove(handler)

    def is_registered(self, agent_id: str) -> bool:
        """Check if an agent is registered.

        Args:
            agent_id: The agent ID to check.

        Returns:
            True if the agent is registered, False otherwise.
        """
        return agent_id in self._queues

    def registered_agents(self) -> list[str]:
        """Get list of registered agent IDs.

        Returns:
            List of registered agent IDs.
        """
        return list(self._queues.keys())

    async def get_messages(
        self,
        agent_id: str,
        timeout: float = 0.0,
    ) -> list[AgentMessage]:
        """Get pending messages for an agent.

        Non-blocking retrieval of all pending messages in the agent's queue.
        Optionally waits up to `timeout` seconds for at least one message.

        Args:
            agent_id: The agent to get messages for.
            timeout: Maximum time to wait for a message (0 = no wait).

        Returns:
            List of pending messages (may be empty).

        Raises:
            KeyError: If the agent is not registered.
        """
        if agent_id not in self._queues:
            raise KeyError(f"Agent '{agent_id}' is not registered")

        queue = self._queues[agent_id]
        messages: list[AgentMessage] = []

        # If timeout > 0 and queue is empty, wait for first message
        if timeout > 0 and queue.empty():
            try:
                msg = await asyncio.wait_for(queue.get(), timeout=timeout)
                messages.append(msg)
            except asyncio.TimeoutError:
                return messages

        # Drain all available messages
        while not queue.empty():
            try:
                msg = queue.get_nowait()
                messages.append(msg)
            except asyncio.QueueEmpty:  # pragma: no cover
                break  # Race condition - queue was emptied between empty() check and get_nowait()

        return messages

send(message) async

Send a message to a specific agent.

Parameters:

Name Type Description Default
message AgentMessage

The message to send.

required

Raises:

Type Description
KeyError

If the receiver is not registered.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
async def send(self, message: AgentMessage) -> None:
    """Send a message to a specific agent.

    Args:
        message: The message to send.

    Raises:
        KeyError: If the receiver is not registered.
    """
    if message.receiver not in self._queues:
        raise KeyError(f"Agent '{message.receiver}' is not registered")

    await self._queues[message.receiver].put(message)

    # Notify handlers
    for handler in self._handlers:
        try:
            await handler(message)
        except Exception:  # pragma: no cover
            pass  # Don't let handler errors break message delivery

ask(sender, receiver, question, task_id, timeout=30.0) async

Send a question and wait for a response.

Parameters:

Name Type Description Default
sender str

ID of the asking agent.

required
receiver str

ID of the agent to ask.

required
question Any

The question payload.

required
task_id str

Task ID for correlation.

required
timeout float

Maximum time to wait in seconds.

30.0

Returns:

Type Description
AgentMessage

The response message.

Raises:

Type Description
TimeoutError

If no response within timeout.

KeyError

If the receiver is not registered.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
async def ask(
    self,
    sender: str,
    receiver: str,
    question: Any,
    task_id: str,
    timeout: float = 30.0,
) -> AgentMessage:
    """Send a question and wait for a response.

    Args:
        sender: ID of the asking agent.
        receiver: ID of the agent to ask.
        question: The question payload.
        task_id: Task ID for correlation.
        timeout: Maximum time to wait in seconds.

    Returns:
        The response message.

    Raises:
        asyncio.TimeoutError: If no response within timeout.
        KeyError: If the receiver is not registered.
    """
    if receiver not in self._queues:
        raise KeyError(f"Agent '{receiver}' is not registered")

    correlation_id = str(uuid.uuid4())

    # Create future for the response
    loop = asyncio.get_event_loop()
    response_future: asyncio.Future[AgentMessage] = loop.create_future()
    self._pending_questions[correlation_id] = response_future

    try:
        # Send the question
        message = AgentMessage(
            type=MessageType.QUESTION,
            sender=sender,
            receiver=receiver,
            payload=question,
            task_id=task_id,
            correlation_id=correlation_id,
        )
        await self.send(message)

        # Wait for response
        return await asyncio.wait_for(response_future, timeout=timeout)
    finally:
        # Clean up
        self._pending_questions.pop(correlation_id, None)

answer(original, answer) async

Answer a previously received question.

Parameters:

Name Type Description Default
original AgentMessage

The original question message.

required
answer Any

The answer payload.

required

Raises:

Type Description
KeyError

If the original sender is not registered or if there's no pending question with the correlation_id.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
async def answer(self, original: AgentMessage, answer: Any) -> None:
    """Answer a previously received question.

    Args:
        original: The original question message.
        answer: The answer payload.

    Raises:
        KeyError: If the original sender is not registered or
                 if there's no pending question with the correlation_id.
    """
    if original.sender not in self._queues:
        raise KeyError(f"Agent '{original.sender}' is not registered")

    response = AgentMessage(
        type=MessageType.ANSWER,
        sender=original.receiver,  # We are the original receiver
        receiver=original.sender,  # Send back to original sender
        payload=answer,
        task_id=original.task_id,
        correlation_id=original.correlation_id,
    )

    # If there's a pending future for this correlation_id, resolve it
    if original.correlation_id and original.correlation_id in self._pending_questions:
        future = self._pending_questions[original.correlation_id]
        if not future.done():
            future.set_result(response)
    else:
        # Otherwise, put in queue
        await self.send(response)

register_agent(agent_id)

Register an agent to receive messages.

Parameters:

Name Type Description Default
agent_id str

Unique identifier for the agent.

required

Returns:

Type Description
Queue[AgentMessage]

A queue where messages for this agent will be delivered.

Raises:

Type Description
ValueError

If agent_id is already registered.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
def register_agent(self, agent_id: str) -> asyncio.Queue[AgentMessage]:
    """Register an agent to receive messages.

    Args:
        agent_id: Unique identifier for the agent.

    Returns:
        A queue where messages for this agent will be delivered.

    Raises:
        ValueError: If agent_id is already registered.
    """
    if agent_id in self._queues:
        raise ValueError(f"Agent '{agent_id}' is already registered")

    queue: asyncio.Queue[AgentMessage] = asyncio.Queue()
    self._queues[agent_id] = queue
    return queue

unregister_agent(agent_id)

Unregister an agent from the message bus.

Parameters:

Name Type Description Default
agent_id str

The agent to unregister.

required
Source code in src/subagents_pydantic_ai/message_bus.py
Python
def unregister_agent(self, agent_id: str) -> None:
    """Unregister an agent from the message bus.

    Args:
        agent_id: The agent to unregister.
    """
    self._queues.pop(agent_id, None)

add_handler(handler)

Add a message handler for debugging/logging.

Handlers are called for every message sent through the bus.

Parameters:

Name Type Description Default
handler Callable[[AgentMessage], Awaitable[None]]

Async function that receives messages.

required
Source code in src/subagents_pydantic_ai/message_bus.py
Python
def add_handler(self, handler: Callable[[AgentMessage], Awaitable[None]]) -> None:
    """Add a message handler for debugging/logging.

    Handlers are called for every message sent through the bus.

    Args:
        handler: Async function that receives messages.
    """
    self._handlers.append(handler)

remove_handler(handler)

Remove a previously added handler.

Parameters:

Name Type Description Default
handler Callable[[AgentMessage], Awaitable[None]]

The handler to remove.

required
Source code in src/subagents_pydantic_ai/message_bus.py
Python
def remove_handler(self, handler: Callable[[AgentMessage], Awaitable[None]]) -> None:
    """Remove a previously added handler.

    Args:
        handler: The handler to remove.
    """
    if handler in self._handlers:
        self._handlers.remove(handler)

is_registered(agent_id)

Check if an agent is registered.

Parameters:

Name Type Description Default
agent_id str

The agent ID to check.

required

Returns:

Type Description
bool

True if the agent is registered, False otherwise.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
def is_registered(self, agent_id: str) -> bool:
    """Check if an agent is registered.

    Args:
        agent_id: The agent ID to check.

    Returns:
        True if the agent is registered, False otherwise.
    """
    return agent_id in self._queues

registered_agents()

Get list of registered agent IDs.

Returns:

Type Description
list[str]

List of registered agent IDs.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
def registered_agents(self) -> list[str]:
    """Get list of registered agent IDs.

    Returns:
        List of registered agent IDs.
    """
    return list(self._queues.keys())

get_messages(agent_id, timeout=0.0) async

Get pending messages for an agent.

Non-blocking retrieval of all pending messages in the agent's queue. Optionally waits up to timeout seconds for at least one message.

Parameters:

Name Type Description Default
agent_id str

The agent to get messages for.

required
timeout float

Maximum time to wait for a message (0 = no wait).

0.0

Returns:

Type Description
list[AgentMessage]

List of pending messages (may be empty).

Raises:

Type Description
KeyError

If the agent is not registered.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
async def get_messages(
    self,
    agent_id: str,
    timeout: float = 0.0,
) -> list[AgentMessage]:
    """Get pending messages for an agent.

    Non-blocking retrieval of all pending messages in the agent's queue.
    Optionally waits up to `timeout` seconds for at least one message.

    Args:
        agent_id: The agent to get messages for.
        timeout: Maximum time to wait for a message (0 = no wait).

    Returns:
        List of pending messages (may be empty).

    Raises:
        KeyError: If the agent is not registered.
    """
    if agent_id not in self._queues:
        raise KeyError(f"Agent '{agent_id}' is not registered")

    queue = self._queues[agent_id]
    messages: list[AgentMessage] = []

    # If timeout > 0 and queue is empty, wait for first message
    if timeout > 0 and queue.empty():
        try:
            msg = await asyncio.wait_for(queue.get(), timeout=timeout)
            messages.append(msg)
        except asyncio.TimeoutError:
            return messages

    # Drain all available messages
    while not queue.empty():
        try:
            msg = queue.get_nowait()
            messages.append(msg)
        except asyncio.QueueEmpty:  # pragma: no cover
            break  # Race condition - queue was emptied between empty() check and get_nowait()

    return messages

TaskManager

subagents_pydantic_ai.TaskManager dataclass

Manages background tasks and their lifecycle.

Tracks running tasks, handles cancellation, and provides status querying capabilities.

Attributes:

Name Type Description
tasks dict[str, Task[Any]]

Dictionary of task_id -> asyncio.Task

handles dict[str, Any]

Dictionary of task_id -> TaskHandle

message_bus InMemoryMessageBus

Message bus for communication

Source code in src/subagents_pydantic_ai/message_bus.py
Python
@dataclass
class TaskManager:
    """Manages background tasks and their lifecycle.

    Tracks running tasks, handles cancellation, and provides
    status querying capabilities.

    Attributes:
        tasks: Dictionary of task_id -> asyncio.Task
        handles: Dictionary of task_id -> TaskHandle
        message_bus: Message bus for communication
    """

    tasks: dict[str, asyncio.Task[Any]] = field(default_factory=dict)
    handles: dict[str, Any] = field(default_factory=dict)  # TaskHandle
    message_bus: InMemoryMessageBus = field(default_factory=InMemoryMessageBus)
    _cancel_events: dict[str, asyncio.Event] = field(default_factory=dict)

    def create_task(
        self,
        task_id: str,
        coro: Any,  # Coroutine
        handle: Any,  # TaskHandle
    ) -> asyncio.Task[Any]:
        """Create and track a new background task.

        Args:
            task_id: Unique identifier for the task.
            coro: The coroutine to run.
            handle: TaskHandle for status tracking.

        Returns:
            The created asyncio.Task.
        """
        task = asyncio.create_task(coro)
        self.tasks[task_id] = task
        self.handles[task_id] = handle
        self._cancel_events[task_id] = asyncio.Event()

        # Update handle when task starts
        handle.status = "running"
        handle.started_at = datetime.now()

        return task

    def get_handle(self, task_id: str) -> Any | None:
        """Get the handle for a task.

        Args:
            task_id: The task ID.

        Returns:
            The TaskHandle if found, None otherwise.
        """
        return self.handles.get(task_id)

    def get_cancel_event(self, task_id: str) -> asyncio.Event | None:
        """Get the cancellation event for a task.

        Args:
            task_id: The task ID.

        Returns:
            The cancellation event if found, None otherwise.
        """
        return self._cancel_events.get(task_id)

    async def soft_cancel(self, task_id: str) -> bool:
        """Request cooperative cancellation of a task.

        Sets a cancellation event that the task can check periodically.
        The task is expected to clean up and exit gracefully.

        Args:
            task_id: The task to cancel.

        Returns:
            True if cancellation was requested, False if task not found.
        """
        if task_id not in self._cancel_events:
            return False

        self._cancel_events[task_id].set()

        # Send cancel request message
        if task_id in self.handles:
            handle = self.handles[task_id]
            try:
                await self.message_bus.send(
                    AgentMessage(
                        type=MessageType.CANCEL_REQUEST,
                        sender="task_manager",
                        receiver=handle.subagent_name,
                        payload={"reason": "soft_cancel"},
                        task_id=task_id,
                    )
                )
            except KeyError:
                pass  # Agent not registered, that's OK

        return True

    async def hard_cancel(self, task_id: str) -> bool:
        """Immediately cancel a task.

        Calls cancel() on the asyncio.Task, causing CancelledError
        to be raised in the task.

        Args:
            task_id: The task to cancel.

        Returns:
            True if task was cancelled, False if task not found.
        """
        if task_id not in self.tasks:
            return False

        task = self.tasks[task_id]
        if not task.done():
            task.cancel()

        # Update handle
        if task_id in self.handles:
            handle = self.handles[task_id]
            handle.status = "cancelled"
            handle.completed_at = datetime.now()

        return True

    def cleanup_task(self, task_id: str) -> None:
        """Clean up resources for a completed task.

        Args:
            task_id: The task to clean up.
        """
        self.tasks.pop(task_id, None)
        self._cancel_events.pop(task_id, None)
        # Keep handle for status queries

    def list_active_tasks(self) -> list[str]:
        """Get list of active (non-completed) task IDs.

        Returns:
            List of task IDs for tasks that haven't completed.
        """
        return [task_id for task_id, task in self.tasks.items() if not task.done()]

create_task(task_id, coro, handle)

Create and track a new background task.

Parameters:

Name Type Description Default
task_id str

Unique identifier for the task.

required
coro Any

The coroutine to run.

required
handle Any

TaskHandle for status tracking.

required

Returns:

Type Description
Task[Any]

The created asyncio.Task.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
def create_task(
    self,
    task_id: str,
    coro: Any,  # Coroutine
    handle: Any,  # TaskHandle
) -> asyncio.Task[Any]:
    """Create and track a new background task.

    Args:
        task_id: Unique identifier for the task.
        coro: The coroutine to run.
        handle: TaskHandle for status tracking.

    Returns:
        The created asyncio.Task.
    """
    task = asyncio.create_task(coro)
    self.tasks[task_id] = task
    self.handles[task_id] = handle
    self._cancel_events[task_id] = asyncio.Event()

    # Update handle when task starts
    handle.status = "running"
    handle.started_at = datetime.now()

    return task

get_handle(task_id)

Get the handle for a task.

Parameters:

Name Type Description Default
task_id str

The task ID.

required

Returns:

Type Description
Any | None

The TaskHandle if found, None otherwise.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
def get_handle(self, task_id: str) -> Any | None:
    """Get the handle for a task.

    Args:
        task_id: The task ID.

    Returns:
        The TaskHandle if found, None otherwise.
    """
    return self.handles.get(task_id)

get_cancel_event(task_id)

Get the cancellation event for a task.

Parameters:

Name Type Description Default
task_id str

The task ID.

required

Returns:

Type Description
Event | None

The cancellation event if found, None otherwise.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
def get_cancel_event(self, task_id: str) -> asyncio.Event | None:
    """Get the cancellation event for a task.

    Args:
        task_id: The task ID.

    Returns:
        The cancellation event if found, None otherwise.
    """
    return self._cancel_events.get(task_id)

soft_cancel(task_id) async

Request cooperative cancellation of a task.

Sets a cancellation event that the task can check periodically. The task is expected to clean up and exit gracefully.

Parameters:

Name Type Description Default
task_id str

The task to cancel.

required

Returns:

Type Description
bool

True if cancellation was requested, False if task not found.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
async def soft_cancel(self, task_id: str) -> bool:
    """Request cooperative cancellation of a task.

    Sets a cancellation event that the task can check periodically.
    The task is expected to clean up and exit gracefully.

    Args:
        task_id: The task to cancel.

    Returns:
        True if cancellation was requested, False if task not found.
    """
    if task_id not in self._cancel_events:
        return False

    self._cancel_events[task_id].set()

    # Send cancel request message
    if task_id in self.handles:
        handle = self.handles[task_id]
        try:
            await self.message_bus.send(
                AgentMessage(
                    type=MessageType.CANCEL_REQUEST,
                    sender="task_manager",
                    receiver=handle.subagent_name,
                    payload={"reason": "soft_cancel"},
                    task_id=task_id,
                )
            )
        except KeyError:
            pass  # Agent not registered, that's OK

    return True

hard_cancel(task_id) async

Immediately cancel a task.

Calls cancel() on the asyncio.Task, causing CancelledError to be raised in the task.

Parameters:

Name Type Description Default
task_id str

The task to cancel.

required

Returns:

Type Description
bool

True if task was cancelled, False if task not found.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
async def hard_cancel(self, task_id: str) -> bool:
    """Immediately cancel a task.

    Calls cancel() on the asyncio.Task, causing CancelledError
    to be raised in the task.

    Args:
        task_id: The task to cancel.

    Returns:
        True if task was cancelled, False if task not found.
    """
    if task_id not in self.tasks:
        return False

    task = self.tasks[task_id]
    if not task.done():
        task.cancel()

    # Update handle
    if task_id in self.handles:
        handle = self.handles[task_id]
        handle.status = "cancelled"
        handle.completed_at = datetime.now()

    return True

cleanup_task(task_id)

Clean up resources for a completed task.

Parameters:

Name Type Description Default
task_id str

The task to clean up.

required
Source code in src/subagents_pydantic_ai/message_bus.py
Python
def cleanup_task(self, task_id: str) -> None:
    """Clean up resources for a completed task.

    Args:
        task_id: The task to clean up.
    """
    self.tasks.pop(task_id, None)
    self._cancel_events.pop(task_id, None)

list_active_tasks()

Get list of active (non-completed) task IDs.

Returns:

Type Description
list[str]

List of task IDs for tasks that haven't completed.

Source code in src/subagents_pydantic_ai/message_bus.py
Python
def list_active_tasks(self) -> list[str]:
    """Get list of active (non-completed) task IDs.

    Returns:
        List of task IDs for tasks that haven't completed.
    """
    return [task_id for task_id, task in self.tasks.items() if not task.done()]

create_message_bus

subagents_pydantic_ai.create_message_bus(backend='memory', **kwargs)

Create a message bus instance.

Factory function for creating message bus implementations. Currently only supports in-memory backend.

Parameters:

Name Type Description Default
backend str

The backend type ("memory" is currently supported).

'memory'
**kwargs Any

Backend-specific configuration.

{}

Returns:

Type Description
InMemoryMessageBus

A message bus instance.

Raises:

Type Description
ValueError

If the backend is not supported.

Example
Python
# Create in-memory bus (default)
bus = create_message_bus()

# Future: Redis bus
# bus = create_message_bus("redis", url="redis://localhost")
Source code in src/subagents_pydantic_ai/message_bus.py
Python
def create_message_bus(backend: str = "memory", **kwargs: Any) -> InMemoryMessageBus:
    """Create a message bus instance.

    Factory function for creating message bus implementations.
    Currently only supports in-memory backend.

    Args:
        backend: The backend type ("memory" is currently supported).
        **kwargs: Backend-specific configuration.

    Returns:
        A message bus instance.

    Raises:
        ValueError: If the backend is not supported.

    Example:
        ```python
        # Create in-memory bus (default)
        bus = create_message_bus()

        # Future: Redis bus
        # bus = create_message_bus("redis", url="redis://localhost")
        ```
    """
    if backend == "memory":
        return InMemoryMessageBus()

    raise ValueError(f"Unknown message bus backend: {backend}")

DynamicAgentRegistry

subagents_pydantic_ai.DynamicAgentRegistry dataclass

Registry for dynamically created agents.

Provides storage and management for agents created at runtime. Used by the agent factory toolset to track created agents.

Attributes:

Name Type Description
agents dict[str, Any]

Dictionary mapping agent names to Agent instances.

configs dict[str, SubAgentConfig]

Dictionary mapping agent names to their configurations.

max_agents int | None

Maximum number of agents allowed (optional limit).

Example
Python
registry = DynamicAgentRegistry(max_agents=10)

# Register a new agent
registry.register(config, agent)

# Get an agent
agent = registry.get("my-agent")

# List all agents
names = registry.list_agents()

# Remove an agent
registry.remove("my-agent")
Source code in src/subagents_pydantic_ai/registry.py
Python
@dataclass
class DynamicAgentRegistry:
    """Registry for dynamically created agents.

    Provides storage and management for agents created at runtime.
    Used by the agent factory toolset to track created agents.

    Attributes:
        agents: Dictionary mapping agent names to Agent instances.
        configs: Dictionary mapping agent names to their configurations.
        max_agents: Maximum number of agents allowed (optional limit).

    Example:
        ```python
        registry = DynamicAgentRegistry(max_agents=10)

        # Register a new agent
        registry.register(config, agent)

        # Get an agent
        agent = registry.get("my-agent")

        # List all agents
        names = registry.list_agents()

        # Remove an agent
        registry.remove("my-agent")
        ```
    """

    agents: dict[str, Any] = field(default_factory=dict)
    configs: dict[str, SubAgentConfig] = field(default_factory=dict)
    _compiled: dict[str, CompiledSubAgent] = field(default_factory=dict)
    max_agents: int | None = None

    def register(
        self,
        config: SubAgentConfig,
        agent: Any,
    ) -> None:
        """Register a new agent.

        Args:
            config: The agent's configuration.
            agent: The Agent instance.

        Raises:
            ValueError: If agent name already exists or max_agents reached.
        """
        name = config["name"]

        if name in self.agents:
            raise ValueError(f"Agent '{name}' already exists")

        if self.max_agents and len(self.agents) >= self.max_agents:
            raise ValueError(
                f"Maximum number of agents ({self.max_agents}) reached. "
                f"Remove an agent before creating a new one."
            )

        self.agents[name] = agent
        self.configs[name] = config
        self._compiled[name] = CompiledSubAgent(
            name=name,
            description=config["description"],
            agent=agent,
            config=config,
        )

    def get(self, name: str) -> Any | None:
        """Get an agent by name.

        Args:
            name: The agent name.

        Returns:
            The Agent instance, or None if not found.
        """
        return self.agents.get(name)

    def get_config(self, name: str) -> SubAgentConfig | None:
        """Get an agent's configuration by name.

        Args:
            name: The agent name.

        Returns:
            The SubAgentConfig, or None if not found.
        """
        return self.configs.get(name)

    def get_compiled(self, name: str) -> CompiledSubAgent | None:
        """Get a compiled agent by name.

        Args:
            name: The agent name.

        Returns:
            The CompiledSubAgent, or None if not found.
        """
        return self._compiled.get(name)

    def remove(self, name: str) -> bool:
        """Remove an agent from the registry.

        Args:
            name: The agent name to remove.

        Returns:
            True if agent was removed, False if not found.
        """
        if name not in self.agents:
            return False

        del self.agents[name]
        del self.configs[name]
        del self._compiled[name]
        return True

    def list_agents(self) -> list[str]:
        """Get list of all registered agent names.

        Returns:
            List of agent names.
        """
        return list(self.agents.keys())

    def list_configs(self) -> list[SubAgentConfig]:
        """Get list of all agent configurations.

        Returns:
            List of SubAgentConfig for all registered agents.
        """
        return list(self.configs.values())

    def list_compiled(self) -> list[CompiledSubAgent]:
        """Get list of all compiled agents.

        Returns:
            List of CompiledSubAgent for all registered agents.
        """
        return list(self._compiled.values())

    def exists(self, name: str) -> bool:
        """Check if an agent exists.

        Args:
            name: The agent name.

        Returns:
            True if agent exists, False otherwise.
        """
        return name in self.agents

    def count(self) -> int:
        """Get the number of registered agents.

        Returns:
            Number of agents in the registry.
        """
        return len(self.agents)

    def clear(self) -> None:
        """Remove all agents from the registry."""
        self.agents.clear()
        self.configs.clear()
        self._compiled.clear()

    def get_summary(self) -> str:
        """Get a formatted summary of all registered agents.

        Returns:
            Multi-line string describing all agents.
        """
        if not self.agents:
            return "No dynamically created agents."

        lines = [f"Dynamic Agents ({len(self.agents)}):"]
        for name, config in self.configs.items():
            model = config.get("model", "default")
            lines.append(f"- {name} [{model}]: {config['description']}")

        return "\n".join(lines)

register(config, agent)

Register a new agent.

Parameters:

Name Type Description Default
config SubAgentConfig

The agent's configuration.

required
agent Any

The Agent instance.

required

Raises:

Type Description
ValueError

If agent name already exists or max_agents reached.

Source code in src/subagents_pydantic_ai/registry.py
Python
def register(
    self,
    config: SubAgentConfig,
    agent: Any,
) -> None:
    """Register a new agent.

    Args:
        config: The agent's configuration.
        agent: The Agent instance.

    Raises:
        ValueError: If agent name already exists or max_agents reached.
    """
    name = config["name"]

    if name in self.agents:
        raise ValueError(f"Agent '{name}' already exists")

    if self.max_agents and len(self.agents) >= self.max_agents:
        raise ValueError(
            f"Maximum number of agents ({self.max_agents}) reached. "
            f"Remove an agent before creating a new one."
        )

    self.agents[name] = agent
    self.configs[name] = config
    self._compiled[name] = CompiledSubAgent(
        name=name,
        description=config["description"],
        agent=agent,
        config=config,
    )

get(name)

Get an agent by name.

Parameters:

Name Type Description Default
name str

The agent name.

required

Returns:

Type Description
Any | None

The Agent instance, or None if not found.

Source code in src/subagents_pydantic_ai/registry.py
Python
def get(self, name: str) -> Any | None:
    """Get an agent by name.

    Args:
        name: The agent name.

    Returns:
        The Agent instance, or None if not found.
    """
    return self.agents.get(name)

get_config(name)

Get an agent's configuration by name.

Parameters:

Name Type Description Default
name str

The agent name.

required

Returns:

Type Description
SubAgentConfig | None

The SubAgentConfig, or None if not found.

Source code in src/subagents_pydantic_ai/registry.py
Python
def get_config(self, name: str) -> SubAgentConfig | None:
    """Get an agent's configuration by name.

    Args:
        name: The agent name.

    Returns:
        The SubAgentConfig, or None if not found.
    """
    return self.configs.get(name)

get_compiled(name)

Get a compiled agent by name.

Parameters:

Name Type Description Default
name str

The agent name.

required

Returns:

Type Description
CompiledSubAgent | None

The CompiledSubAgent, or None if not found.

Source code in src/subagents_pydantic_ai/registry.py
Python
def get_compiled(self, name: str) -> CompiledSubAgent | None:
    """Get a compiled agent by name.

    Args:
        name: The agent name.

    Returns:
        The CompiledSubAgent, or None if not found.
    """
    return self._compiled.get(name)

remove(name)

Remove an agent from the registry.

Parameters:

Name Type Description Default
name str

The agent name to remove.

required

Returns:

Type Description
bool

True if agent was removed, False if not found.

Source code in src/subagents_pydantic_ai/registry.py
Python
def remove(self, name: str) -> bool:
    """Remove an agent from the registry.

    Args:
        name: The agent name to remove.

    Returns:
        True if agent was removed, False if not found.
    """
    if name not in self.agents:
        return False

    del self.agents[name]
    del self.configs[name]
    del self._compiled[name]
    return True

list_agents()

Get list of all registered agent names.

Returns:

Type Description
list[str]

List of agent names.

Source code in src/subagents_pydantic_ai/registry.py
Python
def list_agents(self) -> list[str]:
    """Get list of all registered agent names.

    Returns:
        List of agent names.
    """
    return list(self.agents.keys())

list_configs()

Get list of all agent configurations.

Returns:

Type Description
list[SubAgentConfig]

List of SubAgentConfig for all registered agents.

Source code in src/subagents_pydantic_ai/registry.py
Python
def list_configs(self) -> list[SubAgentConfig]:
    """Get list of all agent configurations.

    Returns:
        List of SubAgentConfig for all registered agents.
    """
    return list(self.configs.values())

list_compiled()

Get list of all compiled agents.

Returns:

Type Description
list[CompiledSubAgent]

List of CompiledSubAgent for all registered agents.

Source code in src/subagents_pydantic_ai/registry.py
Python
def list_compiled(self) -> list[CompiledSubAgent]:
    """Get list of all compiled agents.

    Returns:
        List of CompiledSubAgent for all registered agents.
    """
    return list(self._compiled.values())

exists(name)

Check if an agent exists.

Parameters:

Name Type Description Default
name str

The agent name.

required

Returns:

Type Description
bool

True if agent exists, False otherwise.

Source code in src/subagents_pydantic_ai/registry.py
Python
def exists(self, name: str) -> bool:
    """Check if an agent exists.

    Args:
        name: The agent name.

    Returns:
        True if agent exists, False otherwise.
    """
    return name in self.agents

count()

Get the number of registered agents.

Returns:

Type Description
int

Number of agents in the registry.

Source code in src/subagents_pydantic_ai/registry.py
Python
def count(self) -> int:
    """Get the number of registered agents.

    Returns:
        Number of agents in the registry.
    """
    return len(self.agents)

clear()

Remove all agents from the registry.

Source code in src/subagents_pydantic_ai/registry.py
Python
def clear(self) -> None:
    """Remove all agents from the registry."""
    self.agents.clear()
    self.configs.clear()
    self._compiled.clear()

get_summary()

Get a formatted summary of all registered agents.

Returns:

Type Description
str

Multi-line string describing all agents.

Source code in src/subagents_pydantic_ai/registry.py
Python
def get_summary(self) -> str:
    """Get a formatted summary of all registered agents.

    Returns:
        Multi-line string describing all agents.
    """
    if not self.agents:
        return "No dynamically created agents."

    lines = [f"Dynamic Agents ({len(self.agents)}):"]
    for name, config in self.configs.items():
        model = config.get("model", "default")
        lines.append(f"- {name} [{model}]: {config['description']}")

    return "\n".join(lines)

Usage Examples

Implementing SubAgentDepsProtocol

Python
from dataclasses import dataclass, field
from typing import Any

@dataclass
class MyDeps:
    """Custom dependencies implementing SubAgentDepsProtocol."""

    subagents: dict[str, Any] = field(default_factory=dict)
    database_url: str = ""
    api_key: str = ""

    def clone_for_subagent(self, max_depth: int = 0) -> "MyDeps":
        """Create isolated deps for subagent."""
        return MyDeps(
            subagents={} if max_depth <= 0 else self.subagents.copy(),
            database_url=self.database_url,  # Share read-only config
            api_key=self.api_key,
        )

Implementing Custom Message Bus

Python
from subagents_pydantic_ai import MessageBusProtocol, AgentMessage

class RedisMessageBus:
    """Redis-based message bus for distributed systems."""

    def __init__(self, redis_url: str):
        self.redis = Redis.from_url(redis_url)

    async def send(self, message: AgentMessage) -> None:
        channel = f"agent:{message.receiver}"
        await self.redis.publish(channel, message.json())

    async def receive(
        self,
        agent_id: str,
        timeout: float | None = None,
    ) -> AgentMessage | None:
        # Implementation...
        pass

    async def subscribe(self, agent_id: str) -> None:
        pass

    async def unsubscribe(self, agent_id: str) -> None:
        pass

Using TaskManager

Python
from subagents_pydantic_ai import TaskManager, InMemoryMessageBus

bus = InMemoryMessageBus()
manager = TaskManager(message_bus=bus)

# Create a task
handle = await manager.create_task(
    subagent_name="researcher",
    description="Research Python async",
)

# Check status
status = await manager.get_task_status(handle.task_id)

# Answer a question
await manager.answer_question(handle.task_id, "Use asyncio")

# Cancel a task
await manager.cancel_task(handle.task_id, hard=False)

Using DynamicAgentRegistry

Python
from subagents_pydantic_ai import DynamicAgentRegistry

registry = DynamicAgentRegistry()

# List registered agents
agents = registry.list_agents()

# Get a specific agent
agent = registry.get_agent("custom-analyst")

# Remove an agent
registry.remove_agent("custom-analyst")