Skip to content

Docker API

DockerSandbox

pydantic_ai_backends.backends.docker.sandbox.DockerSandbox

Bases: BaseSandbox

Docker-based sandbox for isolated command execution.

Creates a Docker container for running commands in an isolated environment. Requires the docker Python package to be installed.

Supports RuntimeConfig for pre-configured environments with packages pre-installed.

Example
Python
from pydantic_ai_backends import DockerSandbox, RuntimeConfig

# Use a simple image
sandbox = DockerSandbox(image="python:3.12-slim")

# Or use a custom runtime with packages
custom_runtime = RuntimeConfig(
    name="ml-env",
    base_image="python:3.12-slim",
    packages=["torch", "transformers"],
)
sandbox = DockerSandbox(runtime=custom_runtime)
Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
class DockerSandbox(BaseSandbox):  # pragma: no cover
    """Docker-based sandbox for isolated command execution.

    Creates a Docker container for running commands in an isolated environment.
    Requires the docker Python package to be installed.

    Supports RuntimeConfig for pre-configured environments with packages pre-installed.

    Example:
        ```python
        from pydantic_ai_backends import DockerSandbox, RuntimeConfig

        # Use a simple image
        sandbox = DockerSandbox(image="python:3.12-slim")

        # Or use a custom runtime with packages
        custom_runtime = RuntimeConfig(
            name="ml-env",
            base_image="python:3.12-slim",
            packages=["torch", "transformers"],
        )
        sandbox = DockerSandbox(runtime=custom_runtime)
        ```
    """

    def __init__(
        self,
        image: str = "python:3.12-slim",
        sandbox_id: str | None = None,
        work_dir: str = "/workspace",
        auto_remove: bool = True,
        runtime: RuntimeConfig | str | None = None,
        session_id: str | None = None,
        idle_timeout: int = 3600,
        volumes: dict[str, str] | None = None,
    ):
        """Initialize Docker sandbox.

        Args:
            image: Docker image to use (ignored if runtime is provided).
            sandbox_id: Unique identifier for this sandbox.
            work_dir: Working directory inside container (ignored if runtime is provided).
            auto_remove: Remove container when stopped.
            runtime: RuntimeConfig or name of built-in runtime.
            session_id: Alias for sandbox_id (for session management).
            idle_timeout: Timeout in seconds for idle cleanup (default: 1 hour).
            volumes: Host-to-container volume mappings for persistent storage.
                     Format: {"/host/path": "/container/path"}
        """
        # session_id is an alias for sandbox_id
        effective_id = session_id or sandbox_id
        super().__init__(effective_id)

        self._auto_remove = auto_remove
        self._container = None
        self._idle_timeout = idle_timeout
        self._last_activity = time.time()
        self._volumes = volumes or {}

        # Handle runtime configuration
        if runtime is not None:
            if isinstance(runtime, str):
                from pydantic_ai_backends.backends.docker.runtimes import get_runtime

                runtime = get_runtime(runtime)
            self._runtime: RuntimeConfig | None = runtime
            self._work_dir = runtime.work_dir
            self._image = image  # Will be overridden by _ensure_runtime_image()
        else:
            self._runtime = None
            self._work_dir = work_dir
            self._image = image

    @property
    def runtime(self) -> RuntimeConfig | None:
        """The runtime configuration for this sandbox."""
        return self._runtime

    @property
    def session_id(self) -> str:
        """Alias for sandbox id, used for session management."""
        return self._id

    def _ensure_container(self) -> None:
        """Ensure Docker container is running."""
        if self._container is not None:
            return

        try:
            import docker
        except ImportError as e:
            raise ImportError(
                "Docker package not installed. "
                "Install with: pip install pydantic-ai-backend[docker]"
            ) from e

        client = docker.from_env()

        # Get the appropriate image (build if needed for runtime)
        image = self._ensure_runtime_image(client)

        # Prepare environment variables from runtime
        env_vars = {}
        if self._runtime and self._runtime.env_vars:
            env_vars = self._runtime.env_vars

        # Convert simple volume format to Docker SDK format
        # {"/host": "/container"} -> {"/host": {"bind": "/container", "mode": "rw"}}
        docker_volumes: dict[str, dict[str, str]] = {}
        for host_path, container_path in self._volumes.items():
            docker_volumes[host_path] = {"bind": container_path, "mode": "rw"}

        self._container = client.containers.run(
            image,
            command="sleep infinity",
            detach=True,
            working_dir=self._work_dir,
            auto_remove=self._auto_remove,
            environment=env_vars,
            volumes=docker_volumes if docker_volumes else None,
        )

    def _ensure_runtime_image(self, client: object) -> str:
        """Ensure runtime image exists and return its name.

        Args:
            client: Docker client instance.

        Returns:
            Docker image name/tag to use.
        """
        if self._runtime is None:
            return self._image

        # If ready-to-use image is specified
        if self._runtime.image:
            return self._runtime.image

        # If base_image + packages - need to build
        if self._runtime.base_image:
            return self._build_runtime_image(client)

        # Fallback to default image
        return self._image

    def _build_runtime_image(self, client: object) -> str:
        """Build a custom image with packages installed.

        Args:
            client: Docker client instance.

        Returns:
            Docker image tag for the built image.
        """
        import docker.errors

        runtime = self._runtime
        assert runtime is not None
        assert runtime.base_image is not None

        # Generate unique tag based on config
        config_hash = hashlib.md5(runtime.model_dump_json().encode()).hexdigest()[:12]
        image_tag = f"pydantic-ai-backend-runtime:{runtime.name}-{config_hash}"

        # Check if image exists (cache)
        if runtime.cache_image:
            try:
                client.images.get(image_tag)  # type: ignore[attr-defined]
                return image_tag
            except docker.errors.ImageNotFound:
                pass

        # Build Dockerfile
        dockerfile = self._generate_dockerfile(runtime)

        # Build image
        client.images.build(  # type: ignore[attr-defined]
            fileobj=io.BytesIO(dockerfile.encode()),
            tag=image_tag,
            rm=True,
        )

        return image_tag

    def _generate_dockerfile(self, runtime: RuntimeConfig) -> str:
        """Generate Dockerfile content for runtime.

        Args:
            runtime: Runtime configuration.

        Returns:
            Dockerfile content as string.
        """
        assert runtime.base_image is not None
        lines = [f"FROM {runtime.base_image}"]

        # Setup commands
        for cmd in runtime.setup_commands:
            lines.append(f"RUN {cmd}")

        # Install packages
        if runtime.packages:
            packages_str = " ".join(runtime.packages)
            if runtime.package_manager == "pip":
                lines.append(f"RUN pip install --no-cache-dir {packages_str}")
            elif runtime.package_manager == "npm":
                lines.append(f"RUN npm install -g {packages_str}")
            elif runtime.package_manager == "apt":
                lines.append(f"RUN apt-get update && apt-get install -y {packages_str}")
            elif runtime.package_manager == "cargo":
                lines.append(f"RUN cargo install {packages_str}")

        # Environment variables
        for key, value in runtime.env_vars.items():
            lines.append(f"ENV {key}={value}")

        # Work directory
        lines.append(f"WORKDIR {runtime.work_dir}")

        return "\n".join(lines)

    def execute(self, command: str, timeout: int | None = None) -> ExecuteResponse:
        """Execute command in Docker container."""
        self._ensure_container()
        self._last_activity = time.time()  # Update activity timestamp
        assert self._container is not None  # Ensured by _ensure_container()

        try:
            # Note: Docker SDK exec_run doesn't support timeout parameter directly.
            # For timeouts, we wrap the command with 'timeout' utility.
            if timeout:
                exec_cmd = ["timeout", str(timeout), "sh", "-c", command]
            else:
                exec_cmd = ["sh", "-c", command]

            exit_code, output = self._container.exec_run(
                exec_cmd,
                workdir=self._work_dir,
            )

            output_str = output.decode("utf-8", errors="replace")

            # Truncate if too long
            max_output = 100000
            truncated = len(output_str) > max_output
            if truncated:
                output_str = output_str[:max_output]

            return ExecuteResponse(
                output=output_str,
                exit_code=exit_code,
                truncated=truncated,
            )
        except Exception as e:
            return ExecuteResponse(
                output=f"Error: {e}",
                exit_code=1,
                truncated=False,
            )

    def _read_bytes(self, path: str) -> bytes:
        """Read raw bytes from file in container.

        Args:
            path: Path to the file in the container.

        Returns:
            File content as bytes.
        """
        self._ensure_container()
        assert self._container is not None

        try:
            # Use Docker get_archive to read file
            stream, stat = self._container.get_archive(path)
            raw_tar_bytes = b"".join(stream)
        except Exception as e:
            raise RuntimeError(f"Failed to read file: {e}") from e

        # Extract file from tar archive
        with (
            io.BytesIO(raw_tar_bytes) as tar_buffer,
            tarfile.open(fileobj=tar_buffer, mode="r") as tar,
        ):
            member = next((m for m in tar.getmembers() if m.isfile()), None)

            if not member:
                return f"[Error: Path '{path}' exists but is empty or not a file.]".encode()

            f = tar.extractfile(member)
            if f is None:
                return b"[Error: Could not extract file stream from archive]"

            file_bytes = f.read()
            return file_bytes

    def read(self, path: str, offset: int = 0, limit: int = 2000) -> str:
        """
        Read file from container using Docker get_archive API.

        Args:
            path: Path to the file in the container.
            offset: Start line index (for pagination).
            limit: Maximum number of lines to return.
        """
        try:
            # Read raw bytes from file
            file_bytes = self._read_bytes(path)

            # Convert bytes to string
            file_ext = Path(path).suffix.lower().lstrip(".")
            full_text = self._convert_bytes_to_text(file_ext, file_bytes)

            # Split into lines
            lines = full_text.splitlines()
            total_lines = len(lines)

            if offset >= total_lines:
                return "[End of file]"

            end_index = offset + limit
            chunk_lines = lines[offset:end_index]
            chunk = "\n".join(chunk_lines)

            if end_index < total_lines:
                remaining = total_lines - end_index
                footer = f"\n\n[... {remaining} more lines. Use offset={end_index} to read more.]"
                return chunk + footer

            return chunk

        except Exception as e:
            return f"[Error reading file: {e}]"

    def _convert_bytes_to_text(self, file_ext: str, file_bytes: bytes) -> str:
        # Plain text files with encoding detection
        if file_ext in ("txt", "log", "md", "json", "xml", "csv", "yaml", "yml"):
            return self._decode_text(file_bytes)

        # PDF files
        elif file_ext == "pdf":
            return self._extract_pdf_text(file_bytes)

        # Code files
        elif file_ext in (
            "py",
            "js",
            "java",
            "cpp",
            "c",
            "h",
            "cs",
            "rb",
            "go",
            "rs",
            "php",
            "html",
            "css",
            "sh",
            "sql",
            "ts",
            "jsx",
            "tsx",
        ):
            return self._decode_text(file_bytes)

        else:
            raise ValueError(f"Unsupported file type: .{file_ext}")

    def _decode_text(self, file_bytes: bytes) -> str:
        chardet = _get_chardet()

        # Use chardet to detect encoding with confidence
        detection = chardet.detect(file_bytes)
        detected_encoding = detection.get("encoding")
        confidence = detection.get("confidence", 0)

        # If high confidence detection, use it
        if detected_encoding and confidence > 0.7:
            try:
                return file_bytes.decode(detected_encoding)
            except (UnicodeDecodeError, AttributeError, LookupError):
                pass  # Fall through to manual attempts

        # Fallback to common encodings if detection failed or low confidence
        encodings = ["utf-8", "utf-8-sig", "latin-1", "cp1252", "iso-8859-1"]

        # Add detected encoding to the front if not already there
        if detected_encoding and detected_encoding not in encodings:
            encodings.insert(0, detected_encoding)

        for encoding in encodings:
            try:
                return file_bytes.decode(encoding)
            except (UnicodeDecodeError, AttributeError, LookupError):
                continue

        # Last resort: decode with errors='replace' to avoid complete failure
        return file_bytes.decode("utf-8", errors="replace")

    def _extract_pdf_text(self, file_bytes: bytes) -> str:
        pypdf = _get_pypdf()

        try:
            pdf_file = BytesIO(file_bytes)
            pdf_reader = pypdf.PdfReader(pdf_file)

            if len(pdf_reader.pages) == 0:
                raise ValueError("PDF contains no pages")

            # Extract metadata for context
            metadata = pdf_reader.metadata
            text_parts = []

            if metadata:
                if metadata.get("/Title"):
                    text_parts.append(f"Title: {metadata['/Title']}\n")
                if metadata.get("/Author"):
                    text_parts.append(f"Author: {metadata['/Author']}\n")
                if metadata.get("/Subject"):
                    text_parts.append(f"Subject: {metadata['/Subject']}\n")
                text_parts.append("\n")

            # Extract text from each page with clear separators
            for page_num, page in enumerate(pdf_reader.pages, 1):
                page_text = page.extract_text()

                if page_text and page_text.strip():
                    # Clean up common PDF artifacts
                    page_text = self._clean_pdf_text(page_text)
                    text_parts.append(f"--- Page {page_num} ---\n")
                    text_parts.append(page_text)
                    text_parts.append("\n\n")

            full_text = "".join(text_parts).strip()

            if not full_text:
                raise ValueError("No extractable text found in PDF")

            return full_text

        except Exception as e:
            raise ValueError(f"Failed to parse PDF: {str(e)}") from e

    def _clean_pdf_text(self, text: str) -> str:
        """
        Clean common PDF text extraction artifacts for better LLM processing.

        Args:
            text: Raw extracted text

        Returns:
            Cleaned text
        """

        # Remove excessive whitespace while preserving paragraph breaks
        text = re.sub(r" +", " ", text)  # Multiple spaces to single space
        text = re.sub(r"\n ", "\n", text)  # Remove leading spaces on lines
        text = re.sub(r" \n", "\n", text)  # Remove trailing spaces on lines
        text = re.sub(r"\n{3,}", "\n\n", text)  # Max 2 consecutive newlines

        # Fix common hyphenation issues at line breaks
        text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text)

        # Remove form feed characters
        text = text.replace("\f", "\n")

        return text.strip()

    def edit(
        self, path: str, old_string: str, new_string: str, replace_all: bool = False
    ) -> EditResult:
        """Edit file using Python string operations instead of sed.

        This method reads the entire file, performs string replacement in Python,
        and writes it back. This approach handles multiline strings naturally
        without shell escaping issues.

        Args:
            path: Path to the file in the container.
            old_string: String to find and replace.
            new_string: Replacement string.
            replace_all: If True, replace all occurrences. If False, only replace first.

        Returns:
            EditResult with path and occurrence count on success, or error message.
        """
        try:
            # Read the file content
            file_bytes = self._read_bytes(path)

            # Check for error messages from _read_bytes
            if file_bytes.startswith(b"[Error:"):
                error_msg = file_bytes.decode("utf-8", errors="replace")
                return EditResult(error=error_msg)

            # Decode to string using the same logic as read()
            file_ext = Path(path).suffix.lower().lstrip(".")
            content = self._convert_bytes_to_text(file_ext, file_bytes)

            # Count occurrences
            occurrences = content.count(old_string)

            if occurrences == 0:
                return EditResult(error="String not found in file")

            if occurrences > 1 and not replace_all:
                return EditResult(
                    error=f"String found {occurrences} times. "
                    "Use replace_all=True to replace all, or provide more context."
                )

            new_content = content.replace(old_string, new_string)

            # Write back the modified content
            write_result = self.write(path, new_content)

            if write_result.error:
                return EditResult(error=write_result.error)

            return EditResult(path=path, occurrences=occurrences)

        except Exception as e:
            return EditResult(error=f"Failed to edit file: {e}")

    def write(self, path: str, content: str | bytes) -> WriteResult:
        """Write file to container using Docker put_archive API.

        This method uses Docker's put_archive() instead of heredoc to handle
        large files and special characters reliably.

        Args:
            path: Absolute path where the file should be written.
            content: File content as string or bytes.

        Returns:
            WriteResult with path on success, or error message on failure.
        """
        self._ensure_container()
        assert self._container is not None

        try:
            # Parse path into directory and filename
            posix_path = PurePosixPath(path)
            parent_dir = str(posix_path.parent)
            filename = posix_path.name

            # Ensure parent directory exists
            safe_parent_dir = shlex.quote(parent_dir)
            mkdir_result = self.execute(f"mkdir -p {safe_parent_dir}")
            if mkdir_result.exit_code != 0:
                return WriteResult(error=f"Failed to create directory: {mkdir_result.output}")

            # Create tar archive in memory
            content = content if isinstance(content, bytes) else content.encode()
            tar_buffer = io.BytesIO()

            with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
                # Create TarInfo for the file
                tarinfo = tarfile.TarInfo(name=filename)
                tarinfo.size = len(content)
                tarinfo.mtime = int(time.time())
                tarinfo.mode = 0o644

                # Add file to archive
                tar.addfile(tarinfo, io.BytesIO(content))

            # Reset buffer position
            tar_buffer.seek(0)

            # Upload to container
            self._container.put_archive(parent_dir, tar_buffer)

            return WriteResult(path=path)

        except Exception as e:
            return WriteResult(error=f"Failed to write file: {e}")

    def start(self) -> None:
        """Explicitly start the container.

        This is useful for pre-warming containers before use.
        The container is normally started lazily on first operation.
        """
        self._ensure_container()

    def is_alive(self) -> bool:
        """Check if container is running.

        Returns:
            True if container is running, False otherwise.
        """
        if self._container is None:
            return False
        try:
            self._container.reload()
            return self._container.status == "running"
        except Exception:
            return False

    def stop(self) -> None:
        """Stop and remove the container."""
        import contextlib

        if self._container:
            with contextlib.suppress(Exception):
                self._container.stop()
            self._container = None

    def __del__(self) -> None:
        """Cleanup container on deletion."""
        self.stop()

runtime property

The runtime configuration for this sandbox.

session_id property

Alias for sandbox id, used for session management.

__init__(image='python:3.12-slim', sandbox_id=None, work_dir='/workspace', auto_remove=True, runtime=None, session_id=None, idle_timeout=3600, volumes=None)

Initialize Docker sandbox.

Parameters:

Name Type Description Default
image str

Docker image to use (ignored if runtime is provided).

'python:3.12-slim'
sandbox_id str | None

Unique identifier for this sandbox.

None
work_dir str

Working directory inside container (ignored if runtime is provided).

'/workspace'
auto_remove bool

Remove container when stopped.

True
runtime RuntimeConfig | str | None

RuntimeConfig or name of built-in runtime.

None
session_id str | None

Alias for sandbox_id (for session management).

None
idle_timeout int

Timeout in seconds for idle cleanup (default: 1 hour).

3600
volumes dict[str, str] | None

Host-to-container volume mappings for persistent storage. Format: {"/host/path": "/container/path"}

None
Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def __init__(
    self,
    image: str = "python:3.12-slim",
    sandbox_id: str | None = None,
    work_dir: str = "/workspace",
    auto_remove: bool = True,
    runtime: RuntimeConfig | str | None = None,
    session_id: str | None = None,
    idle_timeout: int = 3600,
    volumes: dict[str, str] | None = None,
):
    """Initialize Docker sandbox.

    Args:
        image: Docker image to use (ignored if runtime is provided).
        sandbox_id: Unique identifier for this sandbox.
        work_dir: Working directory inside container (ignored if runtime is provided).
        auto_remove: Remove container when stopped.
        runtime: RuntimeConfig or name of built-in runtime.
        session_id: Alias for sandbox_id (for session management).
        idle_timeout: Timeout in seconds for idle cleanup (default: 1 hour).
        volumes: Host-to-container volume mappings for persistent storage.
                 Format: {"/host/path": "/container/path"}
    """
    # session_id is an alias for sandbox_id
    effective_id = session_id or sandbox_id
    super().__init__(effective_id)

    self._auto_remove = auto_remove
    self._container = None
    self._idle_timeout = idle_timeout
    self._last_activity = time.time()
    self._volumes = volumes or {}

    # Handle runtime configuration
    if runtime is not None:
        if isinstance(runtime, str):
            from pydantic_ai_backends.backends.docker.runtimes import get_runtime

            runtime = get_runtime(runtime)
        self._runtime: RuntimeConfig | None = runtime
        self._work_dir = runtime.work_dir
        self._image = image  # Will be overridden by _ensure_runtime_image()
    else:
        self._runtime = None
        self._work_dir = work_dir
        self._image = image

execute(command, timeout=None)

Execute command in Docker container.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def execute(self, command: str, timeout: int | None = None) -> ExecuteResponse:
    """Execute command in Docker container."""
    self._ensure_container()
    self._last_activity = time.time()  # Update activity timestamp
    assert self._container is not None  # Ensured by _ensure_container()

    try:
        # Note: Docker SDK exec_run doesn't support timeout parameter directly.
        # For timeouts, we wrap the command with 'timeout' utility.
        if timeout:
            exec_cmd = ["timeout", str(timeout), "sh", "-c", command]
        else:
            exec_cmd = ["sh", "-c", command]

        exit_code, output = self._container.exec_run(
            exec_cmd,
            workdir=self._work_dir,
        )

        output_str = output.decode("utf-8", errors="replace")

        # Truncate if too long
        max_output = 100000
        truncated = len(output_str) > max_output
        if truncated:
            output_str = output_str[:max_output]

        return ExecuteResponse(
            output=output_str,
            exit_code=exit_code,
            truncated=truncated,
        )
    except Exception as e:
        return ExecuteResponse(
            output=f"Error: {e}",
            exit_code=1,
            truncated=False,
        )

read(path, offset=0, limit=2000)

Read file from container using Docker get_archive API.

Parameters:

Name Type Description Default
path str

Path to the file in the container.

required
offset int

Start line index (for pagination).

0
limit int

Maximum number of lines to return.

2000
Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def read(self, path: str, offset: int = 0, limit: int = 2000) -> str:
    """
    Read file from container using Docker get_archive API.

    Args:
        path: Path to the file in the container.
        offset: Start line index (for pagination).
        limit: Maximum number of lines to return.
    """
    try:
        # Read raw bytes from file
        file_bytes = self._read_bytes(path)

        # Convert bytes to string
        file_ext = Path(path).suffix.lower().lstrip(".")
        full_text = self._convert_bytes_to_text(file_ext, file_bytes)

        # Split into lines
        lines = full_text.splitlines()
        total_lines = len(lines)

        if offset >= total_lines:
            return "[End of file]"

        end_index = offset + limit
        chunk_lines = lines[offset:end_index]
        chunk = "\n".join(chunk_lines)

        if end_index < total_lines:
            remaining = total_lines - end_index
            footer = f"\n\n[... {remaining} more lines. Use offset={end_index} to read more.]"
            return chunk + footer

        return chunk

    except Exception as e:
        return f"[Error reading file: {e}]"

write(path, content)

Write file to container using Docker put_archive API.

This method uses Docker's put_archive() instead of heredoc to handle large files and special characters reliably.

Parameters:

Name Type Description Default
path str

Absolute path where the file should be written.

required
content str | bytes

File content as string or bytes.

required

Returns:

Type Description
WriteResult

WriteResult with path on success, or error message on failure.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def write(self, path: str, content: str | bytes) -> WriteResult:
    """Write file to container using Docker put_archive API.

    This method uses Docker's put_archive() instead of heredoc to handle
    large files and special characters reliably.

    Args:
        path: Absolute path where the file should be written.
        content: File content as string or bytes.

    Returns:
        WriteResult with path on success, or error message on failure.
    """
    self._ensure_container()
    assert self._container is not None

    try:
        # Parse path into directory and filename
        posix_path = PurePosixPath(path)
        parent_dir = str(posix_path.parent)
        filename = posix_path.name

        # Ensure parent directory exists
        safe_parent_dir = shlex.quote(parent_dir)
        mkdir_result = self.execute(f"mkdir -p {safe_parent_dir}")
        if mkdir_result.exit_code != 0:
            return WriteResult(error=f"Failed to create directory: {mkdir_result.output}")

        # Create tar archive in memory
        content = content if isinstance(content, bytes) else content.encode()
        tar_buffer = io.BytesIO()

        with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
            # Create TarInfo for the file
            tarinfo = tarfile.TarInfo(name=filename)
            tarinfo.size = len(content)
            tarinfo.mtime = int(time.time())
            tarinfo.mode = 0o644

            # Add file to archive
            tar.addfile(tarinfo, io.BytesIO(content))

        # Reset buffer position
        tar_buffer.seek(0)

        # Upload to container
        self._container.put_archive(parent_dir, tar_buffer)

        return WriteResult(path=path)

    except Exception as e:
        return WriteResult(error=f"Failed to write file: {e}")

start()

Explicitly start the container.

This is useful for pre-warming containers before use. The container is normally started lazily on first operation.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def start(self) -> None:
    """Explicitly start the container.

    This is useful for pre-warming containers before use.
    The container is normally started lazily on first operation.
    """
    self._ensure_container()

stop()

Stop and remove the container.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def stop(self) -> None:
    """Stop and remove the container."""
    import contextlib

    if self._container:
        with contextlib.suppress(Exception):
            self._container.stop()
        self._container = None

is_alive()

Check if container is running.

Returns:

Type Description
bool

True if container is running, False otherwise.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def is_alive(self) -> bool:
    """Check if container is running.

    Returns:
        True if container is running, False otherwise.
    """
    if self._container is None:
        return False
    try:
        self._container.reload()
        return self._container.status == "running"
    except Exception:
        return False

BaseSandbox

pydantic_ai_backends.backends.docker.sandbox.BaseSandbox

Bases: ABC

Abstract base class for sandbox backends.

Sandboxes provide isolated environments for executing commands and managing files. Subclasses must implement the execute() method.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
class BaseSandbox(ABC):
    """Abstract base class for sandbox backends.

    Sandboxes provide isolated environments for executing commands and
    managing files. Subclasses must implement the execute() method.
    """

    def __init__(self, sandbox_id: str | None = None):
        """Initialize the sandbox.

        Args:
            sandbox_id: Unique identifier for this sandbox. Generated if not provided.
        """
        self._id = sandbox_id or str(uuid.uuid4())  # pragma: no cover

    @property
    def id(self) -> str:
        """Unique identifier for this sandbox."""
        return self._id  # pragma: no cover

    @abstractmethod
    def execute(
        self, command: str, timeout: int | None = None
    ) -> ExecuteResponse:  # pragma: no cover
        """Execute a command in the sandbox.

        Args:
            command: Command to execute.
            timeout: Maximum execution time in seconds.

        Returns:
            ExecuteResponse with output, exit code, and truncation status.
        """
        ...

    @abstractmethod
    def edit(  # pragma: no cover
        self, path: str, old_string: str, new_string: str, replace_all: bool = False
    ) -> EditResult:
        """Edit a file by replacing strings.

        Args:
            path: File path to edit.
            old_string: String to find and replace.
            new_string: Replacement string.
            replace_all: If True, replace all occurrences. Otherwise, replace only first.

        Returns:
            EditResult with path, error, or occurrence count.
        """
        ...

    def ls_info(self, path: str) -> list[FileInfo]:  # pragma: no cover
        """List files using ls command."""
        path = shlex.quote(path)
        result = self.execute(f"ls -la {path}")
        if result.exit_code != 0:
            return []

        entries: list[FileInfo] = []
        for line in result.output.strip().split("\n")[1:]:  # Skip total line
            if not line.strip():
                continue

            parts = line.split()
            if len(parts) < 9:
                continue

            perms = parts[0]
            size = int(parts[4]) if parts[4].isdigit() else None
            name = " ".join(parts[8:])

            if name in (".", ".."):
                continue

            full_path = f"{path.rstrip('/')}/{name}"
            entries.append(
                FileInfo(
                    name=name,
                    path=full_path,
                    is_dir=perms.startswith("d"),
                    size=size,
                )
            )

        return sorted(entries, key=lambda x: (not x["is_dir"], x["name"]))

    def _read_bytes(self, path: str) -> bytes:  # pragma: no cover
        """Read raw bytes from file using cat command."""
        path = shlex.quote(path)
        result = self.execute(f"cat {path}")

        if result.exit_code != 0:
            return f"[Error: {result.output}]".encode()

        return result.output.encode("utf-8", errors="replace")

    def read(self, path: str, offset: int = 0, limit: int = 2000) -> str:  # pragma: no cover
        """Read file using cat command with line numbers."""
        # Use sed to handle offset and limit
        start = offset + 1  # sed is 1-indexed
        end = offset + limit

        path = shlex.quote(path)
        result = self.execute(f"sed -n '{start},{end}p' {path} | cat -n")

        if result.exit_code != 0:
            return f"Error: {result.output}"

        if result.truncated:
            return result.output + "\n\n... (output truncated)"

        return result.output

    def write(self, path: str, content: str) -> WriteResult:  # pragma: no cover
        """Write file using cat with heredoc."""
        # Escape special characters for heredoc
        escaped = content.replace("\\", "\\\\").replace("$", "\\$").replace("`", "\\`")

        # Use a unique delimiter
        delimiter = f"EOF_{uuid.uuid4().hex[:8]}"

        quoted_path = shlex.quote(path)
        command = (
            f"mkdir -p $(dirname {quoted_path}) && cat > {quoted_path} << '{delimiter}'\n"
            f"{escaped}\n"
            f"{delimiter}"
        )
        result = self.execute(command)

        if result.exit_code != 0:
            return WriteResult(error=result.output)

        return WriteResult(path=path)

    def glob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:  # pragma: no cover
        """Find files using find command."""
        # Convert glob to find pattern
        path = shlex.quote(path)
        result = self.execute(f"find {path} -name '{pattern}' -type f 2>/dev/null")

        if result.exit_code != 0:
            return []

        entries: list[FileInfo] = []
        for file_path in result.output.strip().split("\n"):
            if not file_path:
                continue

            name = file_path.split("/")[-1]
            entries.append(
                FileInfo(
                    name=name,
                    path=file_path,
                    is_dir=False,
                    size=None,
                )
            )

        return sorted(entries, key=lambda x: x["path"])

    def grep_raw(  # pragma: no cover
        self,
        pattern: str,
        path: str | None = None,
        glob: str | None = None,
        ignore_hidden: bool = True,
    ) -> list[GrepMatch] | str:
        """Search using grep command."""
        search_path = path or "/"

        search_path = shlex.quote(search_path)

        options = ["-rn"]
        if ignore_hidden:
            options.extend(["--exclude='.*'", "--exclude-dir='.*'"])
        if glob:
            options.append(f"--include='{glob}'")

        options_str = " ".join(options)
        cmd = f"grep {options_str} '{pattern}' {search_path}"

        result = self.execute(cmd)

        if result.exit_code == 1:  # No matches
            return []
        if result.exit_code != 0:
            return f"Error: {result.output}"

        matches: list[GrepMatch] = []
        for line in result.output.strip().split("\n"):
            if not line:
                continue

            # Parse grep output: file:line:content
            parts = line.split(":", 2)
            if len(parts) >= 3:
                try:
                    matches.append(
                        GrepMatch(
                            path=parts[0],
                            line_number=int(parts[1]),
                            line=parts[2],
                        )
                    )
                except ValueError:
                    continue

        return matches

id property

Unique identifier for this sandbox.

__init__(sandbox_id=None)

Initialize the sandbox.

Parameters:

Name Type Description Default
sandbox_id str | None

Unique identifier for this sandbox. Generated if not provided.

None
Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def __init__(self, sandbox_id: str | None = None):
    """Initialize the sandbox.

    Args:
        sandbox_id: Unique identifier for this sandbox. Generated if not provided.
    """
    self._id = sandbox_id or str(uuid.uuid4())  # pragma: no cover

execute(command, timeout=None) abstractmethod

Execute a command in the sandbox.

Parameters:

Name Type Description Default
command str

Command to execute.

required
timeout int | None

Maximum execution time in seconds.

None

Returns:

Type Description
ExecuteResponse

ExecuteResponse with output, exit code, and truncation status.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
@abstractmethod
def execute(
    self, command: str, timeout: int | None = None
) -> ExecuteResponse:  # pragma: no cover
    """Execute a command in the sandbox.

    Args:
        command: Command to execute.
        timeout: Maximum execution time in seconds.

    Returns:
        ExecuteResponse with output, exit code, and truncation status.
    """
    ...

ls_info(path)

List files using ls command.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def ls_info(self, path: str) -> list[FileInfo]:  # pragma: no cover
    """List files using ls command."""
    path = shlex.quote(path)
    result = self.execute(f"ls -la {path}")
    if result.exit_code != 0:
        return []

    entries: list[FileInfo] = []
    for line in result.output.strip().split("\n")[1:]:  # Skip total line
        if not line.strip():
            continue

        parts = line.split()
        if len(parts) < 9:
            continue

        perms = parts[0]
        size = int(parts[4]) if parts[4].isdigit() else None
        name = " ".join(parts[8:])

        if name in (".", ".."):
            continue

        full_path = f"{path.rstrip('/')}/{name}"
        entries.append(
            FileInfo(
                name=name,
                path=full_path,
                is_dir=perms.startswith("d"),
                size=size,
            )
        )

    return sorted(entries, key=lambda x: (not x["is_dir"], x["name"]))

read(path, offset=0, limit=2000)

Read file using cat command with line numbers.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def read(self, path: str, offset: int = 0, limit: int = 2000) -> str:  # pragma: no cover
    """Read file using cat command with line numbers."""
    # Use sed to handle offset and limit
    start = offset + 1  # sed is 1-indexed
    end = offset + limit

    path = shlex.quote(path)
    result = self.execute(f"sed -n '{start},{end}p' {path} | cat -n")

    if result.exit_code != 0:
        return f"Error: {result.output}"

    if result.truncated:
        return result.output + "\n\n... (output truncated)"

    return result.output

write(path, content)

Write file using cat with heredoc.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def write(self, path: str, content: str) -> WriteResult:  # pragma: no cover
    """Write file using cat with heredoc."""
    # Escape special characters for heredoc
    escaped = content.replace("\\", "\\\\").replace("$", "\\$").replace("`", "\\`")

    # Use a unique delimiter
    delimiter = f"EOF_{uuid.uuid4().hex[:8]}"

    quoted_path = shlex.quote(path)
    command = (
        f"mkdir -p $(dirname {quoted_path}) && cat > {quoted_path} << '{delimiter}'\n"
        f"{escaped}\n"
        f"{delimiter}"
    )
    result = self.execute(command)

    if result.exit_code != 0:
        return WriteResult(error=result.output)

    return WriteResult(path=path)

edit(path, old_string, new_string, replace_all=False) abstractmethod

Edit a file by replacing strings.

Parameters:

Name Type Description Default
path str

File path to edit.

required
old_string str

String to find and replace.

required
new_string str

Replacement string.

required
replace_all bool

If True, replace all occurrences. Otherwise, replace only first.

False

Returns:

Type Description
EditResult

EditResult with path, error, or occurrence count.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
@abstractmethod
def edit(  # pragma: no cover
    self, path: str, old_string: str, new_string: str, replace_all: bool = False
) -> EditResult:
    """Edit a file by replacing strings.

    Args:
        path: File path to edit.
        old_string: String to find and replace.
        new_string: Replacement string.
        replace_all: If True, replace all occurrences. Otherwise, replace only first.

    Returns:
        EditResult with path, error, or occurrence count.
    """
    ...

glob_info(pattern, path='/')

Find files using find command.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def glob_info(self, pattern: str, path: str = "/") -> list[FileInfo]:  # pragma: no cover
    """Find files using find command."""
    # Convert glob to find pattern
    path = shlex.quote(path)
    result = self.execute(f"find {path} -name '{pattern}' -type f 2>/dev/null")

    if result.exit_code != 0:
        return []

    entries: list[FileInfo] = []
    for file_path in result.output.strip().split("\n"):
        if not file_path:
            continue

        name = file_path.split("/")[-1]
        entries.append(
            FileInfo(
                name=name,
                path=file_path,
                is_dir=False,
                size=None,
            )
        )

    return sorted(entries, key=lambda x: x["path"])

grep_raw(pattern, path=None, glob=None, ignore_hidden=True)

Search using grep command.

Source code in src/pydantic_ai_backends/backends/docker/sandbox.py
Python
def grep_raw(  # pragma: no cover
    self,
    pattern: str,
    path: str | None = None,
    glob: str | None = None,
    ignore_hidden: bool = True,
) -> list[GrepMatch] | str:
    """Search using grep command."""
    search_path = path or "/"

    search_path = shlex.quote(search_path)

    options = ["-rn"]
    if ignore_hidden:
        options.extend(["--exclude='.*'", "--exclude-dir='.*'"])
    if glob:
        options.append(f"--include='{glob}'")

    options_str = " ".join(options)
    cmd = f"grep {options_str} '{pattern}' {search_path}"

    result = self.execute(cmd)

    if result.exit_code == 1:  # No matches
        return []
    if result.exit_code != 0:
        return f"Error: {result.output}"

    matches: list[GrepMatch] = []
    for line in result.output.strip().split("\n"):
        if not line:
            continue

        # Parse grep output: file:line:content
        parts = line.split(":", 2)
        if len(parts) >= 3:
            try:
                matches.append(
                    GrepMatch(
                        path=parts[0],
                        line_number=int(parts[1]),
                        line=parts[2],
                    )
                )
            except ValueError:
                continue

    return matches

SessionManager

pydantic_ai_backends.backends.docker.session.SessionManager

Manages user sessions and their Docker containers.

This class provides a way to manage multiple Docker sandbox instances for different user sessions. It handles: - Creating new sandboxes for new sessions - Reusing existing sandboxes for returning sessions - Cleaning up idle sessions automatically

Example
Python
from pydantic_ai_backends import SessionManager

manager = SessionManager(default_runtime="python-datascience")

# Get sandbox for user
sandbox = await manager.get_or_create("user-123")

# Later: cleanup idle sessions
cleaned = await manager.cleanup_idle(max_idle=1800)  # 30 min
print(f"Cleaned up {cleaned} idle sessions")
Source code in src/pydantic_ai_backends/backends/docker/session.py
Python
class SessionManager:
    """Manages user sessions and their Docker containers.

    This class provides a way to manage multiple Docker sandbox instances
    for different user sessions. It handles:
    - Creating new sandboxes for new sessions
    - Reusing existing sandboxes for returning sessions
    - Cleaning up idle sessions automatically

    Example:
        ```python
        from pydantic_ai_backends import SessionManager

        manager = SessionManager(default_runtime="python-datascience")

        # Get sandbox for user
        sandbox = await manager.get_or_create("user-123")

        # Later: cleanup idle sessions
        cleaned = await manager.cleanup_idle(max_idle=1800)  # 30 min
        print(f"Cleaned up {cleaned} idle sessions")
        ```
    """

    def __init__(
        self,
        default_runtime: RuntimeConfig | str | None = None,
        default_idle_timeout: int = 3600,
        workspace_root: str | Path | None = None,
    ):
        """Initialize the session manager.

        Args:
            default_runtime: Default RuntimeConfig or name for new sandboxes.
            default_idle_timeout: Default idle timeout in seconds (default: 1 hour).
            workspace_root: Root directory for persistent session storage.
                           If set, creates {workspace_root}/{session_id}/workspace
                           and mounts it as a volume. Files persist across container restarts.
        """
        self._sessions: dict[str, DockerSandbox] = {}
        self._default_runtime = default_runtime
        self._default_idle_timeout = default_idle_timeout
        self._cleanup_task: asyncio.Task[None] | None = None
        self._workspace_root = Path(workspace_root) if workspace_root else None

    @property
    def sessions(self) -> dict[str, DockerSandbox]:
        """Active sessions dictionary (read-only access)."""
        return dict(self._sessions)

    @property
    def session_count(self) -> int:
        """Number of active sessions."""
        return len(self._sessions)

    async def get_or_create(
        self,
        session_id: str,
        runtime: RuntimeConfig | str | None = None,
    ) -> DockerSandbox:
        """Get an existing sandbox or create a new one.

        If a sandbox exists for the session_id and is still alive,
        it will be returned. Otherwise, a new sandbox will be created.

        Args:
            session_id: Unique identifier for the session.
            runtime: RuntimeConfig or name to use (defaults to manager's default).

        Returns:
            DockerSandbox instance for the session.

        Raises:
            ValueError: If no runtime specified and no default runtime set.
        """
        from pydantic_ai_backends.backends.docker.sandbox import DockerSandbox

        # Check for existing session
        if session_id in self._sessions:
            sandbox = self._sessions[session_id]
            if sandbox.is_alive():
                sandbox._last_activity = time.time()
                return sandbox
            # Container died, remove from cache
            del self._sessions[session_id]

        # Prepare volumes for persistent storage
        volumes: dict[str, str] | None = None
        if self._workspace_root:
            session_workspace = self._workspace_root / session_id / "workspace"
            session_workspace.mkdir(parents=True, exist_ok=True)
            volumes = {str(session_workspace.resolve()): "/workspace"}

        # Create new sandbox
        effective_runtime = runtime or self._default_runtime
        sandbox = DockerSandbox(
            runtime=effective_runtime,
            session_id=session_id,
            idle_timeout=self._default_idle_timeout,
            volumes=volumes,
        )
        sandbox.start()
        self._sessions[session_id] = sandbox
        return sandbox

    async def release(self, session_id: str) -> bool:
        """Release a session and stop its container.

        Args:
            session_id: Session identifier to release.

        Returns:
            True if session was found and released, False otherwise.
        """
        if session_id not in self._sessions:
            return False

        sandbox = self._sessions.pop(session_id)
        sandbox.stop()
        return True

    async def cleanup_idle(self, max_idle: int | None = None) -> int:
        """Clean up idle sessions.

        Removes and stops sandboxes that have been idle for longer than
        the specified time.

        Args:
            max_idle: Maximum idle time in seconds. Uses default if not specified.

        Returns:
            Number of sessions cleaned up.
        """
        max_idle = max_idle if max_idle is not None else self._default_idle_timeout
        now = time.time()
        to_remove: list[str] = []

        for session_id, sandbox in self._sessions.items():
            if now - sandbox._last_activity > max_idle:
                to_remove.append(session_id)

        for session_id in to_remove:
            await self.release(session_id)

        return len(to_remove)

    def start_cleanup_loop(self, interval: int = 300) -> None:
        """Start background cleanup loop.

        Periodically cleans up idle sessions.

        Args:
            interval: Cleanup interval in seconds (default: 5 minutes).
        """
        if self._cleanup_task is not None:
            return  # Already running

        async def _loop() -> None:  # pragma: no cover
            while True:
                await asyncio.sleep(interval)
                await self.cleanup_idle()

        self._cleanup_task = asyncio.create_task(_loop())

    def stop_cleanup_loop(self) -> None:
        """Stop the background cleanup loop."""
        if self._cleanup_task is not None:
            self._cleanup_task.cancel()
            self._cleanup_task = None

    async def shutdown(self) -> int:
        """Shutdown all sessions and stop cleanup loop.

        Returns:
            Number of sessions that were stopped.
        """
        self.stop_cleanup_loop()

        count = len(self._sessions)
        session_ids = list(self._sessions.keys())

        for session_id in session_ids:
            await self.release(session_id)

        return count

    def __contains__(self, session_id: str) -> bool:
        """Check if a session exists."""
        return session_id in self._sessions

    def __len__(self) -> int:
        """Return number of active sessions."""
        return len(self._sessions)

__init__(default_runtime=None, default_idle_timeout=3600, workspace_root=None)

Initialize the session manager.

Parameters:

Name Type Description Default
default_runtime RuntimeConfig | str | None

Default RuntimeConfig or name for new sandboxes.

None
default_idle_timeout int

Default idle timeout in seconds (default: 1 hour).

3600
workspace_root str | Path | None

Root directory for persistent session storage. If set, creates {workspace_root}/{session_id}/workspace and mounts it as a volume. Files persist across container restarts.

None
Source code in src/pydantic_ai_backends/backends/docker/session.py
Python
def __init__(
    self,
    default_runtime: RuntimeConfig | str | None = None,
    default_idle_timeout: int = 3600,
    workspace_root: str | Path | None = None,
):
    """Initialize the session manager.

    Args:
        default_runtime: Default RuntimeConfig or name for new sandboxes.
        default_idle_timeout: Default idle timeout in seconds (default: 1 hour).
        workspace_root: Root directory for persistent session storage.
                       If set, creates {workspace_root}/{session_id}/workspace
                       and mounts it as a volume. Files persist across container restarts.
    """
    self._sessions: dict[str, DockerSandbox] = {}
    self._default_runtime = default_runtime
    self._default_idle_timeout = default_idle_timeout
    self._cleanup_task: asyncio.Task[None] | None = None
    self._workspace_root = Path(workspace_root) if workspace_root else None

RuntimeConfig

pydantic_ai_backends.types.RuntimeConfig

Bases: BaseModel

Configuration for a Docker runtime environment.

A runtime defines a pre-configured execution environment with specific packages and settings. Can be used with DockerSandbox to provide ready-to-use environments without manual package installation.

Example
Python
from pydantic_ai_backends import RuntimeConfig, DockerSandbox

# Custom runtime with ML packages
ml_runtime = RuntimeConfig(
    name="ml-env",
    description="Machine learning environment",
    base_image="python:3.12-slim",
    packages=["torch", "transformers", "datasets"],
)

sandbox = DockerSandbox(runtime=ml_runtime)
Source code in src/pydantic_ai_backends/types.py
Python
class RuntimeConfig(BaseModel):
    """Configuration for a Docker runtime environment.

    A runtime defines a pre-configured execution environment with specific
    packages and settings. Can be used with DockerSandbox to provide
    ready-to-use environments without manual package installation.

    Example:
        ```python
        from pydantic_ai_backends import RuntimeConfig, DockerSandbox

        # Custom runtime with ML packages
        ml_runtime = RuntimeConfig(
            name="ml-env",
            description="Machine learning environment",
            base_image="python:3.12-slim",
            packages=["torch", "transformers", "datasets"],
        )

        sandbox = DockerSandbox(runtime=ml_runtime)
        ```
    """

    name: str
    """Unique name for the runtime (e.g., "python-datascience")."""

    description: str = ""
    """Human-readable description of the runtime."""

    # Image source (one of these)
    image: str | None = None
    """Ready-to-use Docker image (e.g., "myregistry/python-ds:v1")."""

    base_image: str | None = None
    """Base image to build upon (e.g., "python:3.12-slim")."""

    # Packages to install (only if base_image)
    packages: list[str] = []
    """Packages to install (e.g., ["pandas", "numpy", "matplotlib"])."""

    package_manager: Literal["pip", "npm", "apt", "cargo"] = "pip"
    """Package manager to use for installation."""

    # Additional configuration
    setup_commands: list[str] = []
    """Additional setup commands to run (e.g., ["apt-get update"])."""

    env_vars: dict[str, str] = {}
    """Environment variables to set in the container."""

    work_dir: str = "/workspace"
    """Working directory inside the container."""

    # Cache settings
    cache_image: bool = True
    """Whether to cache the built image locally."""

name instance-attribute

Unique name for the runtime (e.g., "python-datascience").

description = '' class-attribute instance-attribute

Human-readable description of the runtime.

image = None class-attribute instance-attribute

Ready-to-use Docker image (e.g., "myregistry/python-ds:v1").

base_image = None class-attribute instance-attribute

Base image to build upon (e.g., "python:3.12-slim").

packages = [] class-attribute instance-attribute

Packages to install (e.g., ["pandas", "numpy", "matplotlib"]).

package_manager = 'pip' class-attribute instance-attribute

Package manager to use for installation.

setup_commands = [] class-attribute instance-attribute

Additional setup commands to run (e.g., ["apt-get update"]).

env_vars = {} class-attribute instance-attribute

Environment variables to set in the container.

work_dir = '/workspace' class-attribute instance-attribute

Working directory inside the container.

cache_image = True class-attribute instance-attribute

Whether to cache the built image locally.

Built-in Runtimes

Python
from pydantic_ai_backends import BUILTIN_RUNTIMES

# Available runtimes
print(BUILTIN_RUNTIMES.keys())
# dict_keys(['python-minimal', 'python-datascience', 'python-web', 'node-minimal', 'node-react'])

# Use a runtime
from pydantic_ai_backends import DockerSandbox
sandbox = DockerSandbox(runtime="python-datascience")
Runtime Base Image Packages
python-minimal python:3.12-slim (none)
python-datascience python:3.12-slim pandas, numpy, matplotlib, scikit-learn, seaborn
python-web python:3.12-slim fastapi, uvicorn, sqlalchemy, httpx
node-minimal node:20-slim (none)
node-react node:20-slim typescript, vite, react, react-dom, @types/react