Skip to content

PipelineSpec

Build middleware pipelines in Python with export to JSON/YAML.

PipelineSpec provides a fluent API for constructing middleware pipelines that can be compiled into middleware instances or exported as configuration files. This is useful for configuration-driven deployments where pipeline definitions need to be portable.

Quick Example

Python
from pydantic_ai_middleware import PipelineSpec
from pydantic_ai_middleware.builder import MiddlewarePipelineCompiler

# Build a pipeline spec with fluent API
spec = (
    PipelineSpec()
    .add_type("logging", {"level": "DEBUG"})
    .add_type("rate_limit", {"max_requests": 100})
    .add_when(
        predicate="is_admin",
        then=[{"type": "admin_audit"}],
        else_=[{"type": "user_audit"}],
    )
)

# Export to YAML file
spec.save("middleware-pipeline.yaml")

# Or compile directly to middleware instances
compiler = MiddlewarePipelineCompiler(registry)
middleware_list = spec.compile(compiler)

Supported Node Types

  • type: A single middleware by registered name
  • chain: Sequential execution of multiple nodes
  • parallel: Concurrent execution with result aggregation
  • when: Conditional branching based on predicates

API Reference

pydantic_ai_middleware.pipeline_spec.PipelineSpec dataclass

A mutable pipeline spec builder for defining middleware pipelines in Python.

PipelineSpec provides a fluent API for building middleware pipelines that can be: - Compiled into middleware instances using a :class:MiddlewarePipelineCompiler - Exported as portable JSON/YAML config files for configuration-driven pipelines

The builder uses the same skeleton nodes as config loading: type, chain, parallel, and when.

Example
Python
from pydantic_ai_middleware import PipelineSpec
from pydantic_ai_middleware.builder import MiddlewarePipelineCompiler

# Build a pipeline spec
spec = (
    PipelineSpec()
    .add_type("logging", {"level": "DEBUG"})
    .add_type("rate_limit", {"max_requests": 100})
    .add_when(
        predicate="is_admin",
        then=[{"type": "admin_audit"}],
        else_=[{"type": "user_audit"}],
    )
)

# Export to YAML
spec.save("pipeline.yaml")

# Or compile to middleware
compiler = MiddlewarePipelineCompiler(registry)
middleware = spec.compile(compiler)
Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
@dataclass(slots=True)
class PipelineSpec:
    """A mutable pipeline spec builder for defining middleware pipelines in Python.

    PipelineSpec provides a fluent API for building middleware pipelines that can be:
    - Compiled into middleware instances using a :class:`MiddlewarePipelineCompiler`
    - Exported as portable JSON/YAML config files for configuration-driven pipelines

    The builder uses the same skeleton nodes as config loading: `type`, `chain`,
    `parallel`, and `when`.

    Example:
        ```python
        from pydantic_ai_middleware import PipelineSpec
        from pydantic_ai_middleware.builder import MiddlewarePipelineCompiler

        # Build a pipeline spec
        spec = (
            PipelineSpec()
            .add_type("logging", {"level": "DEBUG"})
            .add_type("rate_limit", {"max_requests": 100})
            .add_when(
                predicate="is_admin",
                then=[{"type": "admin_audit"}],
                else_=[{"type": "user_audit"}],
            )
        )

        # Export to YAML
        spec.save("pipeline.yaml")

        # Or compile to middleware
        compiler = MiddlewarePipelineCompiler(registry)
        middleware = spec.compile(compiler)
        ```
    """

    nodes: list[dict[str, Any]] = field(default_factory=list)
    """The list of pipeline nodes."""

    def add(self, node: Mapping[str, Any]) -> Self:
        """Add a raw node to the pipeline.

        Args:
            node: A node dictionary (e.g., {"type": "logging"}).

        Returns:
            Self for method chaining.
        """
        self.nodes.append(dict(node))
        return self

    def add_type(self, type_name: str, config: Mapping[str, Any] | None = None) -> Self:
        """Add a middleware type node to the pipeline.

        Args:
            type_name: The registered middleware type name.
            config: Optional configuration to pass to the middleware factory.

        Returns:
            Self for method chaining.

        Example:
            ```python
            spec.add_type("logging", {"level": "DEBUG"})
            ```
        """
        return self.add(type_node(type_name, config))

    def add_chain(self, nodes: Sequence[Mapping[str, Any]]) -> Self:
        """Add a chain node (sequential middleware execution).

        Args:
            nodes: Sequence of nodes to execute in order.

        Returns:
            Self for method chaining.

        Example:
            ```python
            spec.add_chain([
                {"type": "auth"},
                {"type": "logging"},
            ])
            ```
        """
        return self.add(chain_node(nodes))

    def add_parallel(
        self,
        nodes: Sequence[Mapping[str, Any]],
        *,
        strategy: str | AggregationStrategy | None = None,
        timeout: float | None = None,
        name: str | None = None,
    ) -> Self:
        """Add a parallel node (concurrent middleware execution).

        Args:
            nodes: Sequence of nodes to execute in parallel.
            strategy: Aggregation strategy for combining results.
                Options: "first", "last", "merge", "all".
            timeout: Optional timeout in seconds for parallel execution.
            name: Optional name for the parallel group.

        Returns:
            Self for method chaining.

        Example:
            ```python
            spec.add_parallel(
                [{"type": "cache"}, {"type": "metrics"}],
                strategy="merge",
                timeout=5.0,
            )
            ```
        """
        return self.add(parallel_node(nodes, strategy=strategy, timeout=timeout, name=name))

    def add_when(
        self,
        *,
        predicate: str | Mapping[str, Any] | bool,
        then: Sequence[Mapping[str, Any]],
        else_: Sequence[Mapping[str, Any]] | None = None,
    ) -> Self:
        """Add a conditional node (branching based on predicate).

        Args:
            predicate: Condition for branching. Can be:
                - A string (registered predicate name)
                - A dict (predicate with config)
                - A boolean (static condition)
            then: Nodes to execute when predicate is True.
            else_: Optional nodes to execute when predicate is False.

        Returns:
            Self for method chaining.

        Example:
            ```python
            spec.add_when(
                predicate="is_authenticated",
                then=[{"type": "user_middleware"}],
                else_=[{"type": "guest_middleware"}],
            )
            ```
        """
        return self.add(when_node(predicate=predicate, then=then, else_=else_))

    def to_config(self) -> list[dict[str, Any]]:
        """Convert the spec to a config-compatible list of nodes.

        Returns:
            A list of node dictionaries suitable for JSON/YAML serialization.
        """
        return [dict(n) for n in self.nodes]

    def dump(self, *, format: str = "json") -> str:
        """Serialize the pipeline to a string.

        Args:
            format: Output format, either "json" or "yaml".

        Returns:
            The serialized pipeline as a string.

        Raises:
            MiddlewareConfigError: If format is unknown or YAML requested but
                PyYAML is not installed.
        """
        fmt = _normalize_format(format)
        if fmt == "json":
            return json.dumps(self.to_config(), indent=2, sort_keys=True)
        return _dump_yaml(self.to_config())

    def save(self, path: str | Path, *, format: str | None = None) -> None:
        """Save the pipeline to a file.

        Args:
            path: File path to save to. Format is auto-detected from
                extension (.json, .yaml, .yml) if not specified.
            format: Optional explicit format ("json" or "yaml").

        Raises:
            MiddlewareConfigError: If format cannot be determined or is unknown.
        """
        config_path = Path(path)
        fmt = _detect_format(format=format, path=config_path)
        config_path.write_text(self.dump(format=fmt), encoding="utf-8")

    def compile(self, compiler: MiddlewarePipelineCompiler[Any]) -> list[AgentMiddleware[Any]]:
        """Compile the spec into middleware instances.

        Args:
            compiler: A MiddlewarePipelineCompiler with registered factories.

        Returns:
            A list of middleware instances ready to use with an agent.

        Raises:
            MiddlewareConfigError: If compilation fails (e.g., unknown types).
        """
        return compiler.compile_list(self.to_config())

nodes = field(default_factory=list) class-attribute instance-attribute

The list of pipeline nodes.

add(node)

Add a raw node to the pipeline.

Parameters:

Name Type Description Default
node Mapping[str, Any]

A node dictionary (e.g., {"type": "logging"}).

required

Returns:

Type Description
Self

Self for method chaining.

Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def add(self, node: Mapping[str, Any]) -> Self:
    """Add a raw node to the pipeline.

    Args:
        node: A node dictionary (e.g., {"type": "logging"}).

    Returns:
        Self for method chaining.
    """
    self.nodes.append(dict(node))
    return self

add_type(type_name, config=None)

Add a middleware type node to the pipeline.

Parameters:

Name Type Description Default
type_name str

The registered middleware type name.

required
config Mapping[str, Any] | None

Optional configuration to pass to the middleware factory.

None

Returns:

Type Description
Self

Self for method chaining.

Example
Python
spec.add_type("logging", {"level": "DEBUG"})
Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def add_type(self, type_name: str, config: Mapping[str, Any] | None = None) -> Self:
    """Add a middleware type node to the pipeline.

    Args:
        type_name: The registered middleware type name.
        config: Optional configuration to pass to the middleware factory.

    Returns:
        Self for method chaining.

    Example:
        ```python
        spec.add_type("logging", {"level": "DEBUG"})
        ```
    """
    return self.add(type_node(type_name, config))

add_chain(nodes)

Add a chain node (sequential middleware execution).

Parameters:

Name Type Description Default
nodes Sequence[Mapping[str, Any]]

Sequence of nodes to execute in order.

required

Returns:

Type Description
Self

Self for method chaining.

Example
Python
spec.add_chain([
    {"type": "auth"},
    {"type": "logging"},
])
Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def add_chain(self, nodes: Sequence[Mapping[str, Any]]) -> Self:
    """Add a chain node (sequential middleware execution).

    Args:
        nodes: Sequence of nodes to execute in order.

    Returns:
        Self for method chaining.

    Example:
        ```python
        spec.add_chain([
            {"type": "auth"},
            {"type": "logging"},
        ])
        ```
    """
    return self.add(chain_node(nodes))

add_parallel(nodes, *, strategy=None, timeout=None, name=None)

Add a parallel node (concurrent middleware execution).

Parameters:

Name Type Description Default
nodes Sequence[Mapping[str, Any]]

Sequence of nodes to execute in parallel.

required
strategy str | AggregationStrategy | None

Aggregation strategy for combining results. Options: "first", "last", "merge", "all".

None
timeout float | None

Optional timeout in seconds for parallel execution.

None
name str | None

Optional name for the parallel group.

None

Returns:

Type Description
Self

Self for method chaining.

Example
Python
spec.add_parallel(
    [{"type": "cache"}, {"type": "metrics"}],
    strategy="merge",
    timeout=5.0,
)
Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def add_parallel(
    self,
    nodes: Sequence[Mapping[str, Any]],
    *,
    strategy: str | AggregationStrategy | None = None,
    timeout: float | None = None,
    name: str | None = None,
) -> Self:
    """Add a parallel node (concurrent middleware execution).

    Args:
        nodes: Sequence of nodes to execute in parallel.
        strategy: Aggregation strategy for combining results.
            Options: "first", "last", "merge", "all".
        timeout: Optional timeout in seconds for parallel execution.
        name: Optional name for the parallel group.

    Returns:
        Self for method chaining.

    Example:
        ```python
        spec.add_parallel(
            [{"type": "cache"}, {"type": "metrics"}],
            strategy="merge",
            timeout=5.0,
        )
        ```
    """
    return self.add(parallel_node(nodes, strategy=strategy, timeout=timeout, name=name))

add_when(*, predicate, then, else_=None)

Add a conditional node (branching based on predicate).

Parameters:

Name Type Description Default
predicate str | Mapping[str, Any] | bool

Condition for branching. Can be: - A string (registered predicate name) - A dict (predicate with config) - A boolean (static condition)

required
then Sequence[Mapping[str, Any]]

Nodes to execute when predicate is True.

required
else_ Sequence[Mapping[str, Any]] | None

Optional nodes to execute when predicate is False.

None

Returns:

Type Description
Self

Self for method chaining.

Example
Python
spec.add_when(
    predicate="is_authenticated",
    then=[{"type": "user_middleware"}],
    else_=[{"type": "guest_middleware"}],
)
Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def add_when(
    self,
    *,
    predicate: str | Mapping[str, Any] | bool,
    then: Sequence[Mapping[str, Any]],
    else_: Sequence[Mapping[str, Any]] | None = None,
) -> Self:
    """Add a conditional node (branching based on predicate).

    Args:
        predicate: Condition for branching. Can be:
            - A string (registered predicate name)
            - A dict (predicate with config)
            - A boolean (static condition)
        then: Nodes to execute when predicate is True.
        else_: Optional nodes to execute when predicate is False.

    Returns:
        Self for method chaining.

    Example:
        ```python
        spec.add_when(
            predicate="is_authenticated",
            then=[{"type": "user_middleware"}],
            else_=[{"type": "guest_middleware"}],
        )
        ```
    """
    return self.add(when_node(predicate=predicate, then=then, else_=else_))

to_config()

Convert the spec to a config-compatible list of nodes.

Returns:

Type Description
list[dict[str, Any]]

A list of node dictionaries suitable for JSON/YAML serialization.

Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def to_config(self) -> list[dict[str, Any]]:
    """Convert the spec to a config-compatible list of nodes.

    Returns:
        A list of node dictionaries suitable for JSON/YAML serialization.
    """
    return [dict(n) for n in self.nodes]

dump(*, format='json')

Serialize the pipeline to a string.

Parameters:

Name Type Description Default
format str

Output format, either "json" or "yaml".

'json'

Returns:

Type Description
str

The serialized pipeline as a string.

Raises:

Type Description
MiddlewareConfigError

If format is unknown or YAML requested but PyYAML is not installed.

Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def dump(self, *, format: str = "json") -> str:
    """Serialize the pipeline to a string.

    Args:
        format: Output format, either "json" or "yaml".

    Returns:
        The serialized pipeline as a string.

    Raises:
        MiddlewareConfigError: If format is unknown or YAML requested but
            PyYAML is not installed.
    """
    fmt = _normalize_format(format)
    if fmt == "json":
        return json.dumps(self.to_config(), indent=2, sort_keys=True)
    return _dump_yaml(self.to_config())

save(path, *, format=None)

Save the pipeline to a file.

Parameters:

Name Type Description Default
path str | Path

File path to save to. Format is auto-detected from extension (.json, .yaml, .yml) if not specified.

required
format str | None

Optional explicit format ("json" or "yaml").

None

Raises:

Type Description
MiddlewareConfigError

If format cannot be determined or is unknown.

Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def save(self, path: str | Path, *, format: str | None = None) -> None:
    """Save the pipeline to a file.

    Args:
        path: File path to save to. Format is auto-detected from
            extension (.json, .yaml, .yml) if not specified.
        format: Optional explicit format ("json" or "yaml").

    Raises:
        MiddlewareConfigError: If format cannot be determined or is unknown.
    """
    config_path = Path(path)
    fmt = _detect_format(format=format, path=config_path)
    config_path.write_text(self.dump(format=fmt), encoding="utf-8")

compile(compiler)

Compile the spec into middleware instances.

Parameters:

Name Type Description Default
compiler MiddlewarePipelineCompiler[Any]

A MiddlewarePipelineCompiler with registered factories.

required

Returns:

Type Description
list[AgentMiddleware[Any]]

A list of middleware instances ready to use with an agent.

Raises:

Type Description
MiddlewareConfigError

If compilation fails (e.g., unknown types).

Source code in src/pydantic_ai_middleware/pipeline_spec.py
Python
def compile(self, compiler: MiddlewarePipelineCompiler[Any]) -> list[AgentMiddleware[Any]]:
    """Compile the spec into middleware instances.

    Args:
        compiler: A MiddlewarePipelineCompiler with registered factories.

    Returns:
        A list of middleware instances ready to use with an agent.

    Raises:
        MiddlewareConfigError: If compilation fails (e.g., unknown types).
    """
    return compiler.compile_list(self.to_config())