Skip to content

LimitWarnerProcessor API

pydantic_ai_summarization.limit_warner

History processor that injects warnings as limits approach.

LimitWarnerProcessor dataclass

History processor that warns as configured run limits approach.

This processor appends a new trailing ModelRequest containing the warning as a UserPromptPart so the next model turn treats it like a distinct user message (rather than extra system text merged into the last turn, which models often deprioritize).

Example
Python
from pydantic_ai import Agent
from pydantic_ai_summarization import create_limit_warner_processor

processor = create_limit_warner_processor(
    max_iterations=40,
    max_context_tokens=100000,
    max_total_tokens=200000,
)

agent = Agent(
    "openai:gpt-4.1",
    history_processors=[processor],
)
Source code in src/pydantic_ai_summarization/limit_warner.py
Python
@dataclass
class LimitWarnerProcessor:
    """History processor that warns as configured run limits approach.

    This processor appends a new trailing ``ModelRequest`` containing the
    warning as a ``UserPromptPart`` so the next model turn treats it like a
    distinct user message (rather than extra system text merged into the last
    turn, which models often deprioritize).

    Example:
        ```python
        from pydantic_ai import Agent
        from pydantic_ai_summarization import create_limit_warner_processor

        processor = create_limit_warner_processor(
            max_iterations=40,
            max_context_tokens=100000,
            max_total_tokens=200000,
        )

        agent = Agent(
            "openai:gpt-4.1",
            history_processors=[processor],
        )
        ```
    """

    max_iterations: int | None = None
    """Maximum allowed requests for the run."""

    max_context_tokens: int | None = None
    """Maximum context window size to warn against."""

    max_total_tokens: int | None = None
    """Maximum cumulative run token budget to warn against."""

    warn_on: list[WarningOn] | None = None
    """Which limits should emit warnings. Defaults to all configured limits."""

    warning_threshold: float = 0.7
    """Fraction of a limit at which warnings begin."""

    critical_remaining_iterations: int = 3
    """Remaining request count at which iteration warnings become critical."""

    token_counter: TokenCounter = field(default=count_tokens_approximately)
    """Function to count context-window tokens in the current message history."""

    _warn_on: tuple[WarningOn, ...] = field(default_factory=tuple, init=False, repr=False)

    def __post_init__(self) -> None:
        """Validate configuration and normalize enabled warning types."""
        self._validate_positive_limit(self.max_iterations, "max_iterations")
        self._validate_positive_limit(self.max_context_tokens, "max_context_tokens")
        self._validate_positive_limit(self.max_total_tokens, "max_total_tokens")

        if not 0 < self.warning_threshold <= 1:
            raise ValueError(
                f"warning_threshold must be between 0 and 1, got {self.warning_threshold}."
            )
        if self.critical_remaining_iterations < 0:
            raise ValueError(
                "critical_remaining_iterations must be non-negative, "
                f"got {self.critical_remaining_iterations}."
            )

        configured = {
            "iterations": self.max_iterations,
            "context_window": self.max_context_tokens,
            "total_tokens": self.max_total_tokens,
        }
        if all(limit is None for limit in configured.values()):
            raise ValueError("At least one max_* limit must be configured.")

        if self.warn_on is None:
            self._warn_on = tuple(
                warning_name
                for warning_name in _WARNING_ORDER
                if configured[warning_name] is not None
            )
            return

        if not self.warn_on:
            raise ValueError("warn_on must not be empty.")

        normalized = tuple(dict.fromkeys(self.warn_on))
        invalid = sorted(set(normalized) - set(_WARNING_ORDER))
        if invalid:
            names = ", ".join(invalid)
            raise ValueError(f"Unsupported warn_on value(s): {names}.")

        for warning_name in normalized:
            if configured[warning_name] is None:
                raise ValueError(
                    f"{warning_name!r} requires its corresponding max_* limit to be configured."
                )

        self._warn_on = normalized

    @staticmethod
    def _validate_positive_limit(limit: int | None, parameter_name: str) -> None:
        """Validate optional positive limit values."""
        if limit is not None and limit <= 0:
            raise ValueError(f"{parameter_name} must be positive, got {limit}.")

    @staticmethod
    def _is_limit_warning_part(part: ModelRequestPart) -> bool:
        """Check whether a request part was generated by this processor."""
        if isinstance(part, SystemPromptPart):
            return _LIMIT_WARNING_MARKER in part.content
        if isinstance(part, UserPromptPart) and isinstance(part.content, str):
            return _LIMIT_WARNING_MARKER in part.content
        return False  # pragma: no cover

    def _strip_existing_warnings(self, messages: list[ModelMessage]) -> list[ModelMessage]:
        """Remove prior generated warnings from the message history."""
        cleaned_messages: list[ModelMessage] = []

        for message in messages:
            if not isinstance(message, ModelRequest):
                cleaned_messages.append(message)
                continue

            parts = [part for part in message.parts if not self._is_limit_warning_part(part)]
            if not parts:
                continue
            if len(parts) == len(message.parts):
                cleaned_messages.append(message)
            else:
                cleaned_messages.append(replace(message, parts=parts))  # pragma: no cover

        return cleaned_messages

    def _build_iteration_warning(self, ctx: RunContext[Any]) -> _TriggeredWarning | None:
        """Build an iteration warning when the request budget is approaching."""
        if self.max_iterations is None or "iterations" not in self._warn_on:
            return None

        usage = ctx.usage.requests / self.max_iterations
        if usage < self.warning_threshold:
            return None

        remaining = max(0, self.max_iterations - ctx.usage.requests)
        severity: Literal["URGENT", "CRITICAL"] = _SEVERITY_URGENT
        if remaining <= self.critical_remaining_iterations:
            severity = _SEVERITY_CRITICAL

        details = (
            f"Iterations: {ctx.usage.requests}/{self.max_iterations} requests used "
            f"({usage:.0%}); {remaining} remaining."
        )
        return _TriggeredWarning(kind="iterations", severity=severity, details=details)

    def _build_total_tokens_warning(self, ctx: RunContext[Any]) -> _TriggeredWarning | None:
        """Build a total-token warning when cumulative usage is approaching the cap."""
        if self.max_total_tokens is None or "total_tokens" not in self._warn_on:
            return None

        total_tokens = ctx.usage.total_tokens
        usage = total_tokens / self.max_total_tokens
        if usage < self.warning_threshold:
            return None

        remaining = max(0, self.max_total_tokens - total_tokens)
        severity: Literal["URGENT", "CRITICAL"] = _SEVERITY_URGENT
        if usage >= 1:
            severity = _SEVERITY_CRITICAL

        details = (
            f"Total tokens: {total_tokens}/{self.max_total_tokens} used "
            f"({usage:.0%}); {remaining} remaining."
        )
        return _TriggeredWarning(kind="total_tokens", severity=severity, details=details)

    def _build_context_warning(self, context_tokens: int) -> _TriggeredWarning | None:
        """Build a context-window warning when current history is too large."""
        if (  # pragma: no cover
            self.max_context_tokens is None or "context_window" not in self._warn_on
        ):
            return None

        usage = context_tokens / self.max_context_tokens
        if usage < self.warning_threshold:
            return None

        remaining = max(0, self.max_context_tokens - context_tokens)
        severity: Literal["URGENT", "CRITICAL"] = _SEVERITY_URGENT
        if usage >= 1:
            severity = _SEVERITY_CRITICAL

        details = (
            f"Context window: {context_tokens}/{self.max_context_tokens} tokens used "
            f"({usage:.0%}); {remaining} remaining."
        )
        return _TriggeredWarning(kind="context_window", severity=severity, details=details)

    @staticmethod
    def _format_warning_message(warnings: list[_TriggeredWarning]) -> str:
        """Format active warnings into a single user message body."""
        severity: Literal["URGENT", "CRITICAL"] = _SEVERITY_CRITICAL
        if all(warning.severity == _SEVERITY_URGENT for warning in warnings):
            severity = _SEVERITY_URGENT

        guidance = (
            "Complete the current task efficiently and avoid unnecessary tool calls."
            if severity == _SEVERITY_URGENT
            else "Complete the current task immediately and avoid unnecessary tool calls."
        )

        lines = [
            _LIMIT_WARNING_MARKER,
            f"{severity}: Configured run limits are approaching.",
        ]
        lines.extend(f"- {warning.details}" for warning in warnings)
        lines.append(guidance)
        return "\n".join(lines)

    @staticmethod
    def _append_warning_as_user_message(
        messages: list[ModelMessage], warning_message: str
    ) -> list[ModelMessage]:
        """Append the warning as a new user turn so APIs and models honor it reliably."""
        if not messages:
            return messages

        warning_turn = ModelRequest(parts=[UserPromptPart(content=warning_message)])
        return [*messages, warning_turn]

    async def __call__(
        self,
        ctx: RunContext[Any],
        messages: list[ModelMessage],
    ) -> list[ModelMessage]:
        """Process message history and inject a warning when needed."""
        cleaned_messages = self._strip_existing_warnings(messages)

        active_warnings: list[_TriggeredWarning] = []

        iteration_warning = self._build_iteration_warning(ctx)
        if iteration_warning is not None:
            active_warnings.append(iteration_warning)

        if self.max_context_tokens is not None and "context_window" in self._warn_on:
            context_tokens = await async_count_tokens(self.token_counter, cleaned_messages)
            context_warning = self._build_context_warning(context_tokens)
            if context_warning is not None:
                active_warnings.append(context_warning)

        total_tokens_warning = self._build_total_tokens_warning(ctx)
        if total_tokens_warning is not None:
            active_warnings.append(total_tokens_warning)

        if not active_warnings:
            return cleaned_messages

        warning_order = {warning_name: index for index, warning_name in enumerate(_WARNING_ORDER)}
        ordered_warnings = sorted(active_warnings, key=lambda warning: warning_order[warning.kind])
        warning_message = self._format_warning_message(ordered_warnings)
        return self._append_warning_as_user_message(cleaned_messages, warning_message)

max_iterations = None class-attribute instance-attribute

Maximum allowed requests for the run.

max_context_tokens = None class-attribute instance-attribute

Maximum context window size to warn against.

max_total_tokens = None class-attribute instance-attribute

Maximum cumulative run token budget to warn against.

warn_on = None class-attribute instance-attribute

Which limits should emit warnings. Defaults to all configured limits.

warning_threshold = 0.7 class-attribute instance-attribute

Fraction of a limit at which warnings begin.

critical_remaining_iterations = 3 class-attribute instance-attribute

Remaining request count at which iteration warnings become critical.

token_counter = field(default=count_tokens_approximately) class-attribute instance-attribute

Function to count context-window tokens in the current message history.

__post_init__()

Validate configuration and normalize enabled warning types.

Source code in src/pydantic_ai_summarization/limit_warner.py
Python
def __post_init__(self) -> None:
    """Validate configuration and normalize enabled warning types."""
    self._validate_positive_limit(self.max_iterations, "max_iterations")
    self._validate_positive_limit(self.max_context_tokens, "max_context_tokens")
    self._validate_positive_limit(self.max_total_tokens, "max_total_tokens")

    if not 0 < self.warning_threshold <= 1:
        raise ValueError(
            f"warning_threshold must be between 0 and 1, got {self.warning_threshold}."
        )
    if self.critical_remaining_iterations < 0:
        raise ValueError(
            "critical_remaining_iterations must be non-negative, "
            f"got {self.critical_remaining_iterations}."
        )

    configured = {
        "iterations": self.max_iterations,
        "context_window": self.max_context_tokens,
        "total_tokens": self.max_total_tokens,
    }
    if all(limit is None for limit in configured.values()):
        raise ValueError("At least one max_* limit must be configured.")

    if self.warn_on is None:
        self._warn_on = tuple(
            warning_name
            for warning_name in _WARNING_ORDER
            if configured[warning_name] is not None
        )
        return

    if not self.warn_on:
        raise ValueError("warn_on must not be empty.")

    normalized = tuple(dict.fromkeys(self.warn_on))
    invalid = sorted(set(normalized) - set(_WARNING_ORDER))
    if invalid:
        names = ", ".join(invalid)
        raise ValueError(f"Unsupported warn_on value(s): {names}.")

    for warning_name in normalized:
        if configured[warning_name] is None:
            raise ValueError(
                f"{warning_name!r} requires its corresponding max_* limit to be configured."
            )

    self._warn_on = normalized

__call__(ctx, messages) async

Process message history and inject a warning when needed.

Source code in src/pydantic_ai_summarization/limit_warner.py
Python
async def __call__(
    self,
    ctx: RunContext[Any],
    messages: list[ModelMessage],
) -> list[ModelMessage]:
    """Process message history and inject a warning when needed."""
    cleaned_messages = self._strip_existing_warnings(messages)

    active_warnings: list[_TriggeredWarning] = []

    iteration_warning = self._build_iteration_warning(ctx)
    if iteration_warning is not None:
        active_warnings.append(iteration_warning)

    if self.max_context_tokens is not None and "context_window" in self._warn_on:
        context_tokens = await async_count_tokens(self.token_counter, cleaned_messages)
        context_warning = self._build_context_warning(context_tokens)
        if context_warning is not None:
            active_warnings.append(context_warning)

    total_tokens_warning = self._build_total_tokens_warning(ctx)
    if total_tokens_warning is not None:
        active_warnings.append(total_tokens_warning)

    if not active_warnings:
        return cleaned_messages

    warning_order = {warning_name: index for index, warning_name in enumerate(_WARNING_ORDER)}
    ordered_warnings = sorted(active_warnings, key=lambda warning: warning_order[warning.kind])
    warning_message = self._format_warning_message(ordered_warnings)
    return self._append_warning_as_user_message(cleaned_messages, warning_message)

create_limit_warner_processor(max_iterations=None, max_context_tokens=None, max_total_tokens=None, warn_on=None, warning_threshold=0.7, critical_remaining_iterations=3, token_counter=None)

Create a limit warning history processor.

Parameters:

Name Type Description Default
max_iterations int | None

Maximum allowed requests for the run.

None
max_context_tokens int | None

Maximum context-window size to warn against.

None
max_total_tokens int | None

Maximum cumulative run token budget to warn against.

None
warn_on list[WarningOn] | None

Which warning types to enable. Defaults to all configured limits.

None
warning_threshold float

Fraction of a limit at which warnings begin.

0.7
critical_remaining_iterations int

Remaining request count at which iteration warnings become critical.

3
token_counter TokenCounter | None

Custom token counter for context-window measurement.

None

Returns:

Type Description
LimitWarnerProcessor

Configured LimitWarnerProcessor instance.

Source code in src/pydantic_ai_summarization/limit_warner.py
Python
def create_limit_warner_processor(
    max_iterations: int | None = None,
    max_context_tokens: int | None = None,
    max_total_tokens: int | None = None,
    warn_on: list[WarningOn] | None = None,
    warning_threshold: float = 0.7,
    critical_remaining_iterations: int = 3,
    token_counter: TokenCounter | None = None,
) -> LimitWarnerProcessor:
    """Create a limit warning history processor.

    Args:
        max_iterations: Maximum allowed requests for the run.
        max_context_tokens: Maximum context-window size to warn against.
        max_total_tokens: Maximum cumulative run token budget to warn against.
        warn_on: Which warning types to enable. Defaults to all configured limits.
        warning_threshold: Fraction of a limit at which warnings begin.
        critical_remaining_iterations: Remaining request count at which
            iteration warnings become critical.
        token_counter: Custom token counter for context-window measurement.

    Returns:
        Configured ``LimitWarnerProcessor`` instance.
    """
    kwargs: dict[str, Any] = {
        "max_iterations": max_iterations,
        "max_context_tokens": max_context_tokens,
        "max_total_tokens": max_total_tokens,
        "warn_on": warn_on,
        "warning_threshold": warning_threshold,
        "critical_remaining_iterations": critical_remaining_iterations,
    }

    if token_counter is not None:
        kwargs["token_counter"] = token_counter

    return LimitWarnerProcessor(**kwargs)