Skip to content

Toolset API

create_subagent_toolset

subagents_pydantic_ai.create_subagent_toolset(subagents=None, default_model='openai:gpt-4.1', toolsets_factory=None, include_general_purpose=True, max_nesting_depth=0, id=None, registry=None, descriptions=None, ask_user=None, usage_limits=None)

Create a toolset for delegating tasks to subagents.

This is the main entry point for using the subagent system. It creates a toolset with tools for: - task: Delegate a task to a subagent (sync or async) - check_task: Check status of an async task - answer_subagent: Answer a question from a subagent - list_active_tasks: List all running background tasks - wait_tasks: Wait for one or more background tasks to finish - soft_cancel_task: Request cooperative cancellation - hard_cancel_task: Immediately cancel a task

Parameters:

Name Type Description Default
subagents list[SubAgentConfig] | None

List of subagent configurations. If None, only general-purpose subagent will be available.

None
default_model str | Model

Default model for subagents that don't specify one.

'openai:gpt-4.1'
toolsets_factory ToolsetFactory | None

Factory function that creates toolsets for subagents. Called with deps when running a task.

None
include_general_purpose bool

Whether to include the default general-purpose subagent. Set to False if you want only specialized subagents.

True
max_nesting_depth int

Maximum depth for nested subagents. 0 means subagents cannot spawn their own subagents.

0
id str | None

Optional toolset ID. Defaults to "subagents".

None
descriptions dict[str, str] | None

Optional mapping of tool name to custom description. Keys are tool names (task, check_task, answer_subagent, list_active_tasks, wait_tasks, soft_cancel_task, hard_cancel_task). When provided, the custom description replaces the built-in default.

None
ask_user AskUserCallback | None

Optional callback invoked when a subagent calls ask_parent in sync mode. Receives the question and must return the answer. Required for sync-mode subagents with can_ask_questions=True; without it the subagent gets a configuration error. In async mode the parent answers via answer_subagent instead.

None
usage_limits UsageLimits | UsageLimitsFactory | None

Optional pydantic-ai usage limits for delegated subagent runs. Pass a UsageLimits instance to reuse the same limits for every task, or a factory called once per task with the parent run context and selected subagent config. A factory may return None to run that task without explicit limits. Limits are honoured on every retry attempt as well.

None

Returns:

Type Description
FunctionToolset[Any]

FunctionToolset configured with subagent management tools.

Example
Python
from pydantic_ai import Agent
from subagents_pydantic_ai import create_subagent_toolset, SubAgentConfig

subagents = [
    SubAgentConfig(
        name="researcher",
        description="Researches topics",
        instructions="You are a research assistant.",
    ),
]

toolset = create_subagent_toolset(
    subagents=subagents,
    default_model="openai:gpt-4.1",
)

agent = Agent("openai:gpt-4.1", toolsets=[toolset])
Source code in src/subagents_pydantic_ai/toolset.py
Python
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
def create_subagent_toolset(  # noqa: C901
    subagents: list[SubAgentConfig] | None = None,
    default_model: str | Model = "openai:gpt-4.1",
    toolsets_factory: ToolsetFactory | None = None,
    include_general_purpose: bool = True,
    max_nesting_depth: int = 0,
    id: str | None = None,
    registry: Any | None = None,
    descriptions: dict[str, str] | None = None,
    ask_user: AskUserCallback | None = None,
    usage_limits: UsageLimits | UsageLimitsFactory | None = None,
) -> FunctionToolset[Any]:
    """Create a toolset for delegating tasks to subagents.

    This is the main entry point for using the subagent system. It creates
    a toolset with tools for:
    - `task`: Delegate a task to a subagent (sync or async)
    - `check_task`: Check status of an async task
    - `answer_subagent`: Answer a question from a subagent
    - `list_active_tasks`: List all running background tasks
    - `wait_tasks`: Wait for one or more background tasks to finish
    - `soft_cancel_task`: Request cooperative cancellation
    - `hard_cancel_task`: Immediately cancel a task

    Args:
        subagents: List of subagent configurations. If None, only
            general-purpose subagent will be available.
        default_model: Default model for subagents that don't specify one.
        toolsets_factory: Factory function that creates toolsets for subagents.
            Called with deps when running a task.
        include_general_purpose: Whether to include the default general-purpose
            subagent. Set to False if you want only specialized subagents.
        max_nesting_depth: Maximum depth for nested subagents. 0 means
            subagents cannot spawn their own subagents.
        id: Optional toolset ID. Defaults to "subagents".
        descriptions: Optional mapping of tool name to custom description.
            Keys are tool names (task, check_task, answer_subagent,
            list_active_tasks, wait_tasks, soft_cancel_task, hard_cancel_task).
            When provided, the custom description replaces the built-in default.
        ask_user: Optional callback invoked when a subagent calls `ask_parent`
            in sync mode. Receives the question and must return the answer.
            Required for sync-mode subagents with `can_ask_questions=True`;
            without it the subagent gets a configuration error. In async mode
            the parent answers via `answer_subagent` instead.
        usage_limits: Optional pydantic-ai usage limits for delegated subagent
            runs. Pass a `UsageLimits` instance to reuse the same limits for
            every task, or a factory called once per task with the parent run
            context and selected subagent config. A factory may return `None`
            to run that task without explicit limits. Limits are honoured on
            every retry attempt as well.

    Returns:
        FunctionToolset configured with subagent management tools.

    Example:
        ```python
        from pydantic_ai import Agent
        from subagents_pydantic_ai import create_subagent_toolset, SubAgentConfig

        subagents = [
            SubAgentConfig(
                name="researcher",
                description="Researches topics",
                instructions="You are a research assistant.",
            ),
        ]

        toolset = create_subagent_toolset(
            subagents=subagents,
            default_model="openai:gpt-4.1",
        )

        agent = Agent("openai:gpt-4.1", toolsets=[toolset])
        ```
    """
    _descs = descriptions or {}

    # Build subagent configs
    configs: list[SubAgentConfig] = list(subagents) if subagents else []
    if include_general_purpose:
        configs.append(_create_general_purpose_config())

    # Compile subagents
    compiled: dict[str, CompiledSubAgent] = {}
    for config in configs:
        compiled[config["name"]] = _compile_subagent(config, default_model)

    # Create shared state
    message_bus = InMemoryMessageBus()
    task_manager = TaskManager(message_bus=message_bus)

    # Build dynamic task description with available subagents
    subagent_list = "\n".join(f"- {name}: {c.description}" for name, c in compiled.items())
    task_description = _descs.get(
        "task",
        TASK_TOOL_DESCRIPTION.rstrip() + f"\n\nAvailable subagent types:\n{subagent_list}",
    )

    toolset: FunctionToolset[Any] = FunctionToolset(id=id or "subagents")

    @toolset.tool(description=task_description)
    async def task(
        ctx: RunContext[SubAgentDepsProtocol],
        description: str,
        subagent_type: str,
        mode: ExecutionMode = "sync",
        priority: TaskPriority = TaskPriority.NORMAL,
        complexity: Literal["simple", "moderate", "complex"] | None = None,
        requires_user_context: bool = False,
        may_need_clarification: bool = False,
    ) -> str:
        """Delegate a task to a specialized subagent.

        Args:
            ctx: The run context with dependencies.
            description: Detailed description of the task to perform.
            subagent_type: Name of the subagent to use.
            mode: Execution mode - "sync" (blocking), "async" (background), or "auto".
            priority: Task priority level (for async tasks).
            complexity: Override complexity estimate ("simple", "moderate", "complex").
            requires_user_context: Whether task needs ongoing user interaction.
            may_need_clarification: Whether task might need clarifying questions.
        """
        # Validate subagent_type — check static compiled dict first, then dynamic registry
        if subagent_type in compiled:
            subagent = compiled[subagent_type]
        elif (
            registry is not None
            and hasattr(registry, "get_compiled")
            and registry.get_compiled(subagent_type)
        ):
            subagent = registry.get_compiled(subagent_type)
        else:
            # Build available list from both sources
            available_names = list(compiled.keys())
            if registry is not None and hasattr(registry, "list_agents"):
                available_names.extend(registry.list_agents())
            available = ", ".join(available_names)
            return f"Error: Unknown subagent '{subagent_type}'. Available: {available}"

        config = subagent.config
        agent = subagent.agent

        if agent is None:
            return f"Error: Subagent '{subagent_type}' is not properly initialized"

        resolved_usage_limits = (
            usage_limits(ctx, config) if callable(usage_limits) else usage_limits
        )

        # Create deps for subagent
        parent_deps = ctx.deps
        subagent_deps = parent_deps.clone_for_subagent(max_nesting_depth - 1)

        # Build runtime toolsets from factory if provided
        runtime_toolsets = toolsets_factory(subagent_deps) if toolsets_factory else None

        # Generate task ID
        task_id = str(uuid.uuid4())[:8]

        # Resolve mode if "auto"
        if mode == "auto":
            characteristics = TaskCharacteristics(
                estimated_complexity=complexity or config.get("typical_complexity", "moderate"),
                requires_user_context=requires_user_context
                or config.get("typically_needs_context", False),
                may_need_clarification=may_need_clarification,
            )
            resolved_mode = decide_execution_mode(characteristics, config)
        else:
            resolved_mode = mode

        if resolved_mode == "sync":
            return await _run_sync(
                agent=agent,
                config=config,
                description=description,
                deps=subagent_deps,
                task_id=task_id,
                extra_toolsets=runtime_toolsets,
                ask_user=ask_user,
                usage_limits=resolved_usage_limits,
            )
        else:
            return await _run_async(
                agent=agent,
                config=config,
                description=description,
                deps=subagent_deps,
                task_id=task_id,
                task_manager=task_manager,
                message_bus=message_bus,
                extra_toolsets=runtime_toolsets,
                priority=priority,
                usage_limits=resolved_usage_limits,
            )

    @toolset.tool(description=_descs.get("check_task", CHECK_TASK_DESCRIPTION))
    async def check_task(
        ctx: RunContext[SubAgentDepsProtocol],
        task_id: str,
    ) -> str:
        """Check the status of a background task.

        Args:
            ctx: The run context.
            task_id: The task ID returned when the task was started.
        """
        handle = task_manager.get_handle(task_id)
        if handle is None:
            return f"Error: Task '{task_id}' not found"

        status_info = [
            f"Task: {task_id}",
            f"Subagent: {handle.subagent_name}",
            f"Status: {handle.status}",
            f"Description: {handle.description}",
        ]

        if handle.status == TaskStatus.COMPLETED:
            status_info.append(f"Result: {handle.result}")
            if handle.usage is not None:
                u = handle.usage
                inp = getattr(u, "input_tokens", 0)
                out = getattr(u, "output_tokens", 0)
                status_info.append(f"Usage: {inp + out} tokens ({inp} in / {out} out)")
        elif handle.status == TaskStatus.FAILED:
            status_info.append(f"Error: {handle.error}")
        elif handle.status == TaskStatus.WAITING_FOR_ANSWER:
            status_info.append(f"Question: {handle.pending_question}")
        elif handle.started_at:
            elapsed = (datetime.now() - handle.started_at).total_seconds()
            status_info.append(f"Running for: {elapsed:.1f}s")

        return "\n".join(status_info)

    @toolset.tool(description=_descs.get("answer_subagent", ANSWER_SUBAGENT_DESCRIPTION))
    async def answer_subagent(
        ctx: RunContext[SubAgentDepsProtocol],
        task_id: str,
        answer: str,
    ) -> str:
        """Answer a question from a subagent.

        Args:
            ctx: The run context.
            task_id: The task ID of the waiting subagent.
            answer: Your answer to the subagent's question.
        """
        handle = task_manager.get_handle(task_id)
        if handle is None:
            return f"Error: Task '{task_id}' not found"

        if handle.status != TaskStatus.WAITING_FOR_ANSWER:
            return f"Error: Task '{task_id}' is not waiting for an answer (status: {handle.status})"

        # Resolve the answer future that ask_parent is waiting on
        future = task_manager.get_answer_future(task_id)
        if future is not None and not future.done():
            future.set_result(answer)
            return f"Answer sent to task '{task_id}'"

        return "Error: Could not send answer - subagent is no longer waiting"

    @toolset.tool(
        description=_descs.get("send_message_to_subagent", SEND_MESSAGE_TO_SUBAGENT_DESCRIPTION)
    )
    async def send_message_to_subagent(
        ctx: RunContext[SubAgentDepsProtocol],
        task_id: str,
        message: str,
    ) -> str:
        """Steer a running async subagent with an unprompted message.

        The message is queued for the subagent and folded into its next model
        request as an extra user instruction, so it adapts without losing
        partial progress. Works only while the task is still running.

        Args:
            ctx: The run context.
            task_id: The task ID of the running async subagent.
            message: The steering instruction to deliver.
        """
        agent_id = f"subagent-{task_id}"
        if not message_bus.is_registered(agent_id):
            handle = task_manager.get_handle(task_id)
            if handle is None:
                return f"Error: Task '{task_id}' not found"
            return (
                f"Error: Task '{task_id}' is not accepting messages "
                f"(status: {handle.status}). Steering only works for running "
                "async tasks."
            )

        await message_bus.send(
            AgentMessage(
                type=MessageType.TASK_UPDATE,
                sender="parent",
                receiver=agent_id,
                payload={"message": message},
                task_id=task_id,
            )
        )
        return (
            f"Message delivered to task '{task_id}'; "
            "it will be applied on the subagent's next step."
        )

    @toolset.tool(description=_descs.get("list_active_tasks", LIST_ACTIVE_TASKS_DESCRIPTION))
    async def list_active_tasks(
        ctx: RunContext[SubAgentDepsProtocol],
    ) -> str:
        """List all active background tasks."""
        active_ids = task_manager.list_active_tasks()

        if not active_ids:
            return "No active background tasks."

        lines = ["Active background tasks:"]
        for tid in active_ids:
            handle = task_manager.get_handle(tid)
            if handle:  # pragma: no branch
                desc = handle.description[:50]
                lines.append(f"- {tid}: {handle.subagent_name} ({handle.status}) - {desc}...")

        return "\n".join(lines)

    @toolset.tool(description=_descs.get("wait_tasks", WAIT_TASKS_DESCRIPTION))
    async def wait_tasks(
        ctx: RunContext[SubAgentDepsProtocol],
        task_ids: list[str],
        timeout: float = 300.0,
        mode: Literal["all", "any"] = "all",
    ) -> str:
        """Wait for multiple background tasks to complete.

        Args:
            ctx: The run context.
            task_ids: List of task IDs to wait for.
            timeout: Maximum seconds to wait (default 300s / 5 minutes).
            mode: `"all"` (default) waits for every task to finish.
                `"any"` returns as soon as one task reaches a terminal
                state (completed, failed, or cancelled), so the orchestrator
                can react to the first finisher without stalling on the
                slowest one.
        """
        # Collect asyncio.Task objects for the requested task_ids
        tasks_to_await: list[tuple[str, asyncio.Task[Any]]] = []
        for tid in task_ids:
            t = task_manager.tasks.get(tid)
            if t is not None and not t.done():
                tasks_to_await.append((tid, t))

        if tasks_to_await:
            aws = [t for _, t in tasks_to_await]
            # Both modes route through `asyncio.wait`. Unlike
            # `asyncio.wait_for(asyncio.gather(...))`, `asyncio.wait` does
            # *not* cascade cancellation to its constituent tasks — neither on
            # timeout nor when its caller is cancelled (e.g. pydantic-ai's
            # `_call_tools` sibling-cancel hitting this tool call). Workers
            # keep owning their lifecycle, which is what an orchestrator
            # expects.
            return_when = asyncio.FIRST_COMPLETED if mode == "any" else asyncio.ALL_COMPLETED
            await asyncio.wait(aws, timeout=timeout, return_when=return_when)

        # Collect results
        lines: list[str] = []
        finished_count = 0
        for tid in task_ids:
            handle = task_manager.get_handle(tid)
            if handle is None:
                lines.append(f"- {tid}: not found")
                continue
            status = handle.status
            if status == "completed":
                finished_count += 1
                result_preview = (handle.result or "")[:2000]
                lines.append(f"- {tid} ({handle.subagent_name}): COMPLETED\n{result_preview}")
            elif status == "failed":
                finished_count += 1
                lines.append(f"- {tid} ({handle.subagent_name}): FAILED - {handle.error}")
            elif status == "cancelled":
                finished_count += 1
                lines.append(f"- {tid} ({handle.subagent_name}): CANCELLED - {handle.error}")
            else:
                lines.append(f"- {tid} ({handle.subagent_name}): {status}")

        total = len(task_ids)
        still_running = total - finished_count
        header_parts = [f"mode={mode}", f"{finished_count}/{total} finished"]
        if still_running > 0:
            header_parts.append(f"{still_running} still running")
        header = f"Task results ({', '.join(header_parts)}):"

        return header + "\n" + "\n\n".join(lines)

    @toolset.tool(description=_descs.get("soft_cancel_task", SOFT_CANCEL_TASK_DESCRIPTION))
    async def soft_cancel_task(
        ctx: RunContext[SubAgentDepsProtocol],
        task_id: str,
    ) -> str:
        """Request cooperative cancellation of a background task.

        Args:
            ctx: The run context.
            task_id: The task to cancel.
        """
        success = await task_manager.soft_cancel(task_id)
        if success:
            return f"Cancellation requested for task '{task_id}'"
        return f"Error: Task '{task_id}' not found"

    @toolset.tool(description=_descs.get("hard_cancel_task", HARD_CANCEL_TASK_DESCRIPTION))
    async def hard_cancel_task(
        ctx: RunContext[SubAgentDepsProtocol],
        task_id: str,
    ) -> str:
        """Immediately cancel a background task.

        Args:
            ctx: The run context.
            task_id: The task to cancel.
        """
        success = await task_manager.hard_cancel(task_id)
        if success:
            return f"Task '{task_id}' has been cancelled"
        return f"Error: Task '{task_id}' not found"

    # Expose task_manager for external monitoring (e.g., push notifications)
    toolset.task_manager = task_manager  # type: ignore[attr-defined]

    def get_total_usage() -> dict[str, int]:
        """Get aggregate token usage across all completed subagent tasks.

        Returns dict with `input_tokens`, `output_tokens`, `total_tokens`, `requests`.
        """
        totals: dict[str, int] = {
            "input_tokens": 0,
            "output_tokens": 0,
            "total_tokens": 0,
            "requests": 0,
        }
        for handle in task_manager.list_handles():
            if handle.usage is not None:
                totals["input_tokens"] += getattr(handle.usage, "input_tokens", 0)
                totals["output_tokens"] += getattr(handle.usage, "output_tokens", 0)
                totals["requests"] += getattr(handle.usage, "requests", 0)
        totals["total_tokens"] = totals["input_tokens"] + totals["output_tokens"]
        return totals

    toolset.get_total_usage = get_total_usage  # type: ignore[attr-defined]

    return toolset

create_agent_factory_toolset

subagents_pydantic_ai.create_agent_factory_toolset(registry, allowed_models=None, default_model='openai:gpt-4.1', max_agents=10, toolsets_factory=None, capabilities_map=None, id=None, default_agent_factory=None)

Create a toolset for dynamic agent creation.

This toolset provides tools for creating, listing, and removing agents at runtime. Created agents are stored in the provided registry and can be used with the main subagent toolset.

Parameters:

Name Type Description Default
registry DynamicAgentRegistry

Registry to store created agents.

required
allowed_models list[str] | None

List of allowed model names. If None, any model is allowed.

None
default_model str | Model

Default model to use when not specified.

'openai:gpt-4.1'
max_agents int

Maximum number of dynamic agents allowed.

10
toolsets_factory ToolsetFactory | None

Factory to create toolsets for new agents. Takes priority over capabilities if both are provided.

None
capabilities_map dict[str, CapabilityFactory] | None

Mapping of capability names to factory functions. E.g., {"filesystem": create_fs_toolset, "todo": create_todo_toolset}. Used when capabilities are specified in create_agent.

None
id str | None

Optional toolset ID. Defaults to "agent_factory".

None

Returns:

Type Description
FunctionToolset[Any]

FunctionToolset with agent management tools.

Example
Python
from pydantic_ai import Agent
from subagents_pydantic_ai import (
    create_agent_factory_toolset,
    DynamicAgentRegistry,
)

registry = DynamicAgentRegistry()

# With capabilities map
factory_toolset = create_agent_factory_toolset(
    registry=registry,
    allowed_models=["openai:gpt-4.1", "openai:gpt-4o-mini"],
    max_agents=5,
    capabilities_map={
        "filesystem": lambda deps: [create_fs_toolset(deps.backend)],
        "todo": lambda deps: [create_todo_toolset()],
    },
)

agent = Agent("openai:gpt-4.1", toolsets=[factory_toolset])
Source code in src/subagents_pydantic_ai/factory.py
Python
def create_agent_factory_toolset(
    registry: DynamicAgentRegistry,
    allowed_models: list[str] | None = None,
    default_model: str | Model = "openai:gpt-4.1",
    max_agents: int = 10,
    toolsets_factory: ToolsetFactory | None = None,
    capabilities_map: dict[str, CapabilityFactory] | None = None,
    id: str | None = None,
    default_agent_factory: Any | None = None,
) -> FunctionToolset[Any]:
    """Create a toolset for dynamic agent creation.

    This toolset provides tools for creating, listing, and removing
    agents at runtime. Created agents are stored in the provided
    registry and can be used with the main subagent toolset.

    Args:
        registry: Registry to store created agents.
        allowed_models: List of allowed model names. If None, any model
            is allowed.
        default_model: Default model to use when not specified.
        max_agents: Maximum number of dynamic agents allowed.
        toolsets_factory: Factory to create toolsets for new agents.
            Takes priority over capabilities if both are provided.
        capabilities_map: Mapping of capability names to factory functions.
            E.g., {"filesystem": create_fs_toolset, "todo": create_todo_toolset}.
            Used when capabilities are specified in create_agent.
        id: Optional toolset ID. Defaults to "agent_factory".

    Returns:
        FunctionToolset with agent management tools.

    Example:
        ```python
        from pydantic_ai import Agent
        from subagents_pydantic_ai import (
            create_agent_factory_toolset,
            DynamicAgentRegistry,
        )

        registry = DynamicAgentRegistry()

        # With capabilities map
        factory_toolset = create_agent_factory_toolset(
            registry=registry,
            allowed_models=["openai:gpt-4.1", "openai:gpt-4o-mini"],
            max_agents=5,
            capabilities_map={
                "filesystem": lambda deps: [create_fs_toolset(deps.backend)],
                "todo": lambda deps: [create_todo_toolset()],
            },
        )

        agent = Agent("openai:gpt-4.1", toolsets=[factory_toolset])
        ```
    """
    # Update registry max_agents
    registry.max_agents = max_agents

    # Format allowed models for docstring
    models_desc = (
        f"Allowed models: {', '.join(allowed_models)}" if allowed_models else "Any model is allowed"
    )

    # Format available capabilities for docstring
    caps_desc = (
        f"Available capabilities: {', '.join(capabilities_map.keys())}"
        if capabilities_map
        else "No predefined capabilities available"
    )

    toolset: FunctionToolset[Any] = FunctionToolset(id=id or "agent_factory")

    # Tool description passed to the model. This MUST be supplied via the
    # decorator: an `f"""..."""` as the first statement of the function body is
    # NOT a docstring (`__doc__` stays `None`) — it would be evaluated and
    # discarded on every call, throwing away the computed models/capabilities.
    create_agent_description = (
        "Create a new specialized agent at runtime.\n\n"
        "Creates a new agent with the specified configuration. The agent "
        "will be available for delegation via the task tool.\n\n"
        f"{models_desc}\n{caps_desc}\n\n"
        f"Default model when none is given: {default_model}."
    )

    @toolset.tool(description=create_agent_description)
    async def create_agent(
        ctx: RunContext[SubAgentDepsProtocol],
        name: str,
        description: str,
        instructions: str,
        model: str | None = None,
        capabilities: list[str] | None = None,
        can_ask_questions: bool = True,
    ) -> str:
        """Create a new specialized agent at runtime.

        The model-facing description (with the allowed models / capabilities /
        default model interpolated) is supplied via the `@toolset.tool`
        decorator above, not this docstring.

        Args:
            ctx: The run context.
            name: Unique name for the agent (letters, numbers, hyphens only).
            description: Brief description of what the agent does.
            instructions: System prompt / instructions for the agent.
            model: Model to use (optional, defaults to the factory default).
            capabilities: List of capability names to enable (e.g., ["filesystem", "todo"]).
            can_ask_questions: Whether agent can ask parent questions.

        Returns:
            Confirmation message or error.
        """
        # Validate name
        if not name or not all(c.isalnum() or c == "-" for c in name):
            return "Error: Name must contain only letters, numbers, and hyphens"

        if registry.exists(name):
            return f"Error: Agent '{name}' already exists"

        # Validate model
        actual_model = model or default_model
        if allowed_models and actual_model not in allowed_models:
            allowed = ", ".join(allowed_models)
            return f"Error: Model '{actual_model}' is not allowed. Use one of: {allowed}"

        # Validate capabilities
        if capabilities and capabilities_map:
            invalid_caps = [c for c in capabilities if c not in capabilities_map]
            if invalid_caps:
                available = ", ".join(capabilities_map.keys())
                invalid = ", ".join(invalid_caps)
                return f"Error: Unknown capabilities: {invalid}. Available: {available}"

        # Create config
        config = SubAgentConfig(
            name=name,
            description=description,
            instructions=instructions,
            model=actual_model,
            can_ask_questions=can_ask_questions,
        )

        # A custom default_agent_factory owns the whole agent build and only
        # receives `config`, so any toolsets/capabilities collected here
        # cannot be injected. Rather than silently dropping them while the
        # success message still reports them as enabled, reject the request so
        # the caller knows capabilities are unsupported with a custom factory.
        if default_agent_factory is not None and capabilities:
            return (
                "Error: capabilities are not supported when a custom "
                "default_agent_factory is configured. The factory builds the "
                "agent itself; create the agent without capabilities or "
                "configure the factory to attach the required toolsets."
            )

        # Collect toolsets
        agent_toolsets: list[Any] = []
        if toolsets_factory:
            agent_toolsets.extend(toolsets_factory(ctx.deps))
        elif capabilities and capabilities_map:
            for cap_name in capabilities:
                cap_factory = capabilities_map[cap_name]
                agent_toolsets.extend(cap_factory(ctx.deps))

        # Create agent
        try:
            if default_agent_factory is not None:
                agent: Any = default_agent_factory(config)
            else:
                agent = Agent(
                    actual_model,
                    system_prompt=instructions,
                    toolsets=agent_toolsets or None,
                )

            registry.register(config, agent)

            caps_info = f"\nCapabilities: {', '.join(capabilities)}" if capabilities else ""
            return (
                f"Agent '{name}' created successfully.\n"
                f"Model: {actual_model}\n"
                f"Description: {description}{caps_info}\n"
                f"Use task(description, '{name}') to delegate tasks."
            )

        except ValueError as e:
            return f"Error: {e}"
        except Exception as e:
            return f"Error creating agent: {e}"

    @toolset.tool
    async def list_agents(
        ctx: RunContext[SubAgentDepsProtocol],
    ) -> str:
        """List all dynamically created agents.

        Returns:
            List of agent names and descriptions.
        """
        return registry.get_summary()

    @toolset.tool
    async def remove_agent(
        ctx: RunContext[SubAgentDepsProtocol],
        name: str,
    ) -> str:
        """Remove a dynamically created agent.

        The agent will no longer be available for task delegation.

        Args:
            ctx: The run context.
            name: Name of the agent to remove.

        Returns:
            Confirmation or error message.
        """
        if registry.remove(name):
            return f"Agent '{name}' has been removed."
        return f"Error: Agent '{name}' not found."

    @toolset.tool
    async def get_agent_info(
        ctx: RunContext[SubAgentDepsProtocol],
        name: str,
    ) -> str:
        """Get detailed information about a dynamic agent.

        Args:
            ctx: The run context.
            name: Name of the agent.

        Returns:
            Agent details or error message.
        """
        config = registry.get_config(name)
        if config is None:
            return f"Error: Agent '{name}' not found."

        info = [
            f"Agent: {name}",
            f"Description: {config['description']}",
            f"Model: {config.get('model', default_model)}",
            f"Can ask questions: {config.get('can_ask_questions', True)}",
            "",
            "Instructions:",
            config["instructions"][:500] + ("..." if len(config["instructions"]) > 500 else ""),
        ]

        return "\n".join(info)

    return toolset

SubAgentToolset

subagents_pydantic_ai.SubAgentToolset = create_subagent_toolset module-attribute

Prompt builders

get_subagent_system_prompt and get_task_instructions_prompt are documented on the Prompts & Retry page.


Usage Example

Python
from subagents_pydantic_ai import create_subagent_toolset, SubAgentConfig

# Define subagents
subagents = [
    SubAgentConfig(
        name="researcher",
        description="Researches topics",
        instructions="You research topics thoroughly.",
    ),
    SubAgentConfig(
        name="writer",
        description="Writes content",
        instructions="You write clear content.",
    ),
]

# Create toolset
toolset = create_subagent_toolset(
    subagents=subagents,
    default_model="openai:gpt-4o",
    max_nesting_depth=1,
)

# Add to agent
from pydantic_ai import Agent

agent = Agent(
    "openai:gpt-4o",
    deps_type=Deps,
    toolsets=[toolset],
)

With Custom Tool Descriptions

Override default tool descriptions for better LLM behavior:

Python
toolset = create_subagent_toolset(
    subagents=subagents,
    descriptions={
        "task": "Assign a task to a specialized subagent",
        "check_task": "Check the status of a delegated task",
        "list_active_tasks": "Show all currently running background tasks",
    },
)

Available tool names: task, check_task, answer_subagent, list_active_tasks, wait_tasks, soft_cancel_task, hard_cancel_task.

With Toolsets Factory

Python
from pydantic_ai_backends import create_console_toolset

def my_toolsets_factory(deps):
    return [create_console_toolset()]

toolset = create_subagent_toolset(
    subagents=subagents,
    toolsets_factory=my_toolsets_factory,
)

With Dynamic Agent Creation

Python
from subagents_pydantic_ai import (
    create_subagent_toolset,
    create_agent_factory_toolset,
    DynamicAgentRegistry,
)

registry = DynamicAgentRegistry()

agent = Agent(
    "openai:gpt-4o",
    deps_type=Deps,
    toolsets=[
        create_subagent_toolset(subagents=subagents),
        create_agent_factory_toolset(
            registry=registry,
            allowed_models=["openai:gpt-4o", "openai:gpt-4o-mini"],
            max_agents=5,
        ),
    ],
)