Skip to content

Streaming

pydantic-deep supports streaming execution for real-time progress monitoring.

Basic Streaming

Use agent.iter() for streaming:

import asyncio
from pydantic_deep import create_deep_agent, DeepAgentDeps, StateBackend

async def main():
    agent = create_deep_agent()
    deps = DeepAgentDeps(backend=StateBackend())

    async with agent.iter("Create a Python module", deps=deps) as run:
        async for node in run:
            print(f"Node: {type(node).__name__}")

        result = run.result

    print(f"\nFinal output: {result.output}")

asyncio.run(main())

Node Types

During streaming, you'll receive different node types:

from pydantic_ai._agent_graph import (
    UserPromptNode,
    ModelRequestNode,
    CallToolsNode,
    End,
)

async with agent.iter(prompt, deps=deps) as run:
    async for node in run:
        if isinstance(node, UserPromptNode):
            print("📝 Processing user prompt...")

        elif isinstance(node, ModelRequestNode):
            print("🤖 Calling model...")

        elif isinstance(node, CallToolsNode):
            # Extract tool names from the response
            tools = []
            for part in node.model_response.parts:
                if hasattr(part, 'tool_name'):
                    tools.append(part.tool_name)

            if tools:
                print(f"🔧 Executing: {', '.join(tools)}")

        elif isinstance(node, End):
            print("✅ Completed!")

Progress Display

Show a progress indicator:

import sys

async def run_with_progress(agent, prompt, deps):
    step = 0

    async with agent.iter(prompt, deps=deps) as run:
        async for node in run:
            step += 1
            node_type = type(node).__name__

            # Clear line and show progress
            sys.stdout.write(f"\r[Step {step}] {node_type}...")
            sys.stdout.flush()

        print("\n")
        return run.result

Tool Call Details

Get detailed information about tool calls:

async with agent.iter(prompt, deps=deps) as run:
    async for node in run:
        if isinstance(node, CallToolsNode):
            for part in node.model_response.parts:
                if hasattr(part, 'tool_name'):
                    print(f"Tool: {part.tool_name}")
                    if hasattr(part, 'args'):
                        print(f"  Args: {part.args}")

Live Output

For long-running operations, show intermediate results:

async def run_with_live_output(agent, prompt, deps):
    async with agent.iter(prompt, deps=deps) as run:
        async for node in run:
            if isinstance(node, CallToolsNode):
                for part in node.model_response.parts:
                    if hasattr(part, 'tool_name'):
                        tool = part.tool_name

                        # Show tool-specific output
                        if tool == "write_todos":
                            print("\n📋 Updated todo list")
                        elif tool == "write_file":
                            path = part.args.get("path", "")
                            print(f"\n📝 Writing: {path}")
                        elif tool == "read_file":
                            path = part.args.get("path", "")
                            print(f"\n📖 Reading: {path}")

        return run.result

Web Streaming

For web applications using Server-Sent Events:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.get("/agent/stream")
async def stream_agent(prompt: str):
    async def event_generator():
        async with agent.iter(prompt, deps=deps) as run:
            async for node in run:
                node_type = type(node).__name__

                data = {"type": node_type}

                if isinstance(node, CallToolsNode):
                    tools = []
                    for part in node.model_response.parts:
                        if hasattr(part, 'tool_name'):
                            tools.append(part.tool_name)
                    data["tools"] = tools

                yield f"data: {json.dumps(data)}\n\n"

            # Send final result
            yield f"data: {json.dumps({'type': 'complete', 'output': run.result.output})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )

Cancellation

Cancel a running agent:

import asyncio

async def run_with_timeout(agent, prompt, deps, timeout=60):
    try:
        async with asyncio.timeout(timeout):
            async with agent.iter(prompt, deps=deps) as run:
                async for node in run:
                    pass
                return run.result
    except asyncio.TimeoutError:
        print("Agent execution timed out")
        return None

Usage Statistics

Track token usage during streaming:

async with agent.iter(prompt, deps=deps) as run:
    async for node in run:
        pass

    result = run.result
    usage = result.usage()

    print(f"Input tokens: {usage.input_tokens}")
    print(f"Output tokens: {usage.output_tokens}")
    print(f"Total requests: {usage.requests}")

Example: Progress Bar

Using rich for beautiful progress display:

from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn

console = Console()

async def run_with_rich_progress(agent, prompt, deps):
    with Progress(
        SpinnerColumn(),
        TextColumn("[bold blue]{task.description}"),
        console=console,
    ) as progress:
        task = progress.add_task("Starting...", total=None)

        async with agent.iter(prompt, deps=deps) as run:
            async for node in run:
                node_type = type(node).__name__

                if isinstance(node, ModelRequestNode):
                    progress.update(task, description="🤖 Thinking...")
                elif isinstance(node, CallToolsNode):
                    tools = []
                    for part in node.model_response.parts:
                        if hasattr(part, 'tool_name'):
                            tools.append(part.tool_name)
                    if tools:
                        progress.update(
                            task,
                            description=f"🔧 {', '.join(tools)}"
                        )

            progress.update(task, description="✅ Complete!")
            return run.result

Next Steps