Skip to content

API Reference

This section provides detailed API documentation for the database-pydantic-ai package.

Core Toolset

database_pydantic_ai.sql.toolset

PydanticAI toolset for AI agents used to inference with database on given permission level

SQLDatabaseDeps

Bases: BaseModel

Dependencies for the SQL database toolset.

Attributes:

Name Type Description
database Annotated[SQLDatabaseProtocol, SkipValidation]

Database backend instance.

read_only bool

Enforce read-only mode (blocks INSERT, UPDATE, DELETE etc.)

max_rows int

Maximum rows to return from queries.

query_timeout float

Query timeout in seconds.

id str | None

Optional dependency ID.

Source code in src/database_pydantic_ai/sql/toolset.py
Python
class SQLDatabaseDeps(BaseModel):
    """
    Dependencies for the SQL database toolset.

    Attributes:
        database: Database backend instance.
        read_only: Enforce read-only mode (blocks INSERT, UPDATE, DELETE etc.)
        max_rows: Maximum rows to return from queries.
        query_timeout: Query timeout in seconds.
        id: Optional dependency ID.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    database: Annotated[SQLDatabaseProtocol, SkipValidation]
    read_only: bool = True
    max_rows: int = 100
    query_timeout: float = 30.0
    id: str | None = None

create_database_toolset(*, id=None)

Create a database toolset for AI Agents.

Parameters:

Name Type Description Default
id str | None

Optional toolset ID.

None

Returns:

Type Description
FunctionToolset[SQLDatabaseDeps]

FunctionalToolset with database tools

...

Source code in src/database_pydantic_ai/sql/toolset.py
Python
def create_database_toolset(*, id: str | None = None) -> FunctionToolset[SQLDatabaseDeps]:
    """
    Create a database toolset for AI Agents.

    Args:
        id: Optional toolset ID.

    Returns:
        FunctionalToolset with database tools
    ...
    """
    toolset = FunctionToolset[SQLDatabaseDeps](id=id)

    @toolset.tool
    async def list_tables(ctx: RunContext[SQLDatabaseDeps]) -> list[str]:
        """
        Get names of all tables in the database to understand available data.

        Returns:
            List of all table's names.
        """
        return await ctx.deps.database.get_tables()

    @toolset.tool
    async def get_schema(ctx: RunContext[SQLDatabaseDeps], return_md: bool) -> SchemaInfo | str:
        """
        Get an overview of the database schema.

        Returns:
            List of all tables with their column counts and row counts.
        """
        return await ctx.deps.database.get_schema(return_md=return_md)

    @toolset.tool
    async def describe_table(
        ctx: RunContext[SQLDatabaseDeps], table_name: str
    ) -> TableInfo | str | None:
        """
        Get detailed information about a specific table.

        Args:
            table_name: Name of the table to describe.

        Returns:
            Table structure including columns, types, constraints, and relationships.
        """
        return await ctx.deps.database.get_table_info(table_name)

    @toolset.tool
    async def explain_query(ctx: RunContext[SQLDatabaseDeps], sql_query: str) -> str:
        """
        Get the execution plan for a SQL query without executing it.

        Args:
            sql_query: The SQL query to analyze.

        Returns:
            Query execution plan showing how the database would process the query.

        Use this to:
            - Understand query performance
            - Identify missing indexes
            - Optimize slow queries
        """
        return await ctx.deps.database.explain(sql_query)

    @toolset.tool
    async def query(
        ctx: RunContext[SQLDatabaseDeps], sql_query: str, max_rows: int | None = None
    ) -> QueryResult:
        """
        Execute a SQL query and return the results.

        Args:
            sql_query: SQL query to be executed.
            max_rows: Maximum number of rows to be returned (default: 100)

        Returns:
            QueryResults object with queried data.

        Example:
            query("SELECT id, name FROM users WHERE is_banned = true;", max_rows=10)
        """
        try:
            result = await asyncio.wait_for(
                ctx.deps.database.execute(sql_query), timeout=ctx.deps.query_timeout
            )

        except asyncio.TimeoutError:
            return QueryResult(
                columns=[],
                rows=[],
                row_count=0,
                execution_time_ms=0,  # indicate max wait with `0`
            )

        limit = max_rows or ctx.deps.max_rows

        if len(result.rows) > limit:
            result = QueryResult(
                columns=result.columns,
                rows=result.rows[:limit],
                row_count=min(result.row_count, limit),
                execution_time_ms=result.execution_time_ms,
            )

        return result

    return toolset

Default System Prompt

This is the prompt used to instruct the agent:

Markdown
SQLITE_SYSTEM_PROMPT = """
## SQLite Database Toolset

### IMPORTANT
* Database may be running in READ-ONLY mode
* When in read-only mode, only SELECT queries are allowed

You have access to SQLite database tools for database operations and querying:
* `list_tables` - list all tables which are present in database
* `get_schema` - read the database schema
* `describe_table` - describe table's content
* `explain_query` - explains dependencies which given SQL query needs to run
* `query` - execute a SQL query on a database to retrieve data

### Best Practices
* Always try to perform sample data query before performing full query process
* Before running any query, validate it with current schema of the tables and database
* Be careful with querying the database, try to validate beforehand
* Use `LIMIT` clause when querying large tables to avoid retrieving too much data

Backends

SQLite

database_pydantic_ai.sql.backends.sqlite.SQLiteDatabase

Bases: BaseSQLDatabase, SQLDatabaseProtocol

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
class SQLiteDatabase(BaseSQLDatabase, SQLDatabaseProtocol):
    def __init__(self, db_path: str, read_only: bool = True) -> None:
        super().__init__(read_only=read_only)
        self.db_path = db_path
        self._connection: aiosqlite.Connection | None = None

    async def __aenter__(self) -> "SQLiteDatabase":
        """Support for `async with` context manager"""
        await self.connect()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: BaseException | None,
    ) -> None:
        """Ensure the connection is closed when exiting the context."""
        await self.close()

    async def connect(self) -> None:
        """Connect to the database"""
        if not self._connection:
            if self.read_only:
                self._connection = await aiosqlite.connect(f"file:{self.db_path}?mode=ro", uri=True)
            else:
                self._connection = await aiosqlite.connect(self.db_path)

            # Return rows as a dict-like object for easier processing
            self._connection.row_factory = sqlite3.Row

    async def close(self) -> None:
        """Close database connection."""
        if self._connection:
            await self._connection.close()
            self._connection = None

    async def execute(self, query: str, params: tuple[Any, ...] | None = None) -> QueryResult:
        """Execute a SQL query with optional parameters."""
        safe_query = self.check_query_safety(query)
        # if self.read_only and self._is_write_query(query):
        #     raise PermissionError("Database is in read-only mode")

        await self.connect()
        # Check if connection was successfully established
        if self._connection is None:
            raise RuntimeError("Failed to establish database connection")

        start_time = time.perf_counter()

        # Check if connection exists to satisfy MyPy and prevent runtime crashes
        if self._connection is None:
            raise RuntimeError("Database connection is not initialized. Call connect() first.")

        # While using `aiosqlite`, executed call has to be awaited
        async with self._connection.execute(safe_query, params or ()) as cursor:
            rows = await cursor.fetchall()

            # Convert `sqlite3.Row` object to tuples for the protocol
            processed_rows = [tuple(row) for row in rows]
            columns = (
                [description[0] for description in cursor.description] if cursor.description else []
            )

            return QueryResult(
                columns=columns,
                rows=processed_rows,
                row_count=len(processed_rows),
                execution_time_ms=(time.perf_counter() - start_time) * 1000,
            )

    async def get_tables(self) -> list[str]:
        """Get list of tables in the public schema."""
        # Fetch all table names from the database
        query = "SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%';"
        res = await self.execute(query)

        tables = []
        for row in res.rows:
            tables.append(row[0])

        return tables

    async def get_foreign_keys(self, table_name: str) -> list[ForeignKeyInfo]:
        """Get information about foreign keys in given table"""
        tables = await self.get_tables()
        if table_name not in tables:
            return []

        foreign_keys = []
        query = f"PRAGMA foreign_key_list ({table_name});"
        res = await self.execute(query)

        for row in res.rows:
            foreign_keys.append(
                ForeignKeyInfo(column=row[3], references_table=row[2], references_column=row[4])
            )

        return foreign_keys

    async def get_table_info(
        self, table_name: str, return_md: bool = True
    ) -> TableInfo | str | None:
        """Get detailed information about a specific table."""
        tables = await self.get_tables()
        if table_name not in tables:
            return None

        query = f"PRAGMA table_info ({table_name});"
        res = await self.execute(query)

        columns = []
        primary_keys = []
        foreign_keys = []

        for row in res.rows:
            col = ColumnInfo(
                name=row[1],
                data_type=row[2],
                nullable=row[3] == 0,
                default=row[4],
                is_primary_key=row[5] == 1,
            )

            if col.is_primary_key:
                primary_keys.append(col.name)
            columns.append(col)

        # Get foreign keys for the table
        foreign_keys = await self.get_foreign_keys(table_name)

        # Get real row count
        count_res = await self.execute(f"SELECT COUNT(*) FROM {table_name};")
        actual_row_count = count_res.rows[0][0] if count_res.rows else 0

        table = TableInfo(
            name=table_name,
            columns=columns,
            row_count=actual_row_count,
            primary_key=primary_keys,
            foreign_keys=foreign_keys,
        )

        if return_md:
            table_md = self.render_table_as_markdown(table)
            return table_md

        return table

    async def get_schema(self, return_md: bool = True) -> SchemaInfo | str:
        """Get database schema information."""
        table_names = await self.get_tables()

        tasks = [self.get_table_info(table_name, return_md=return_md) for table_name in table_names]
        tables = await asyncio.gather(*tasks)

        if return_md:
            str_tables = [str(t) for t in tables if t]
            return "\n".join(str_tables)

        # Filter out empty responses in the output
        return SchemaInfo(tables=[t for t in tables if t])

    async def explain(self, query: str) -> str:
        query = f"EXPLAIN QUERY PLAN {query}"

        try:
            res = await self.execute(query)

            explanation_lines = []
            for row in res.rows:
                explanation_lines.append(" | ".join(map(str, row)))

            return "\n".join(explanation_lines)

        except sqlite3.OperationalError:
            return "Invalid query, please try again"

__aenter__() async

Support for async with context manager

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def __aenter__(self) -> "SQLiteDatabase":
    """Support for `async with` context manager"""
    await self.connect()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Ensure the connection is closed when exiting the context.

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: BaseException | None,
) -> None:
    """Ensure the connection is closed when exiting the context."""
    await self.close()

connect() async

Connect to the database

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def connect(self) -> None:
    """Connect to the database"""
    if not self._connection:
        if self.read_only:
            self._connection = await aiosqlite.connect(f"file:{self.db_path}?mode=ro", uri=True)
        else:
            self._connection = await aiosqlite.connect(self.db_path)

        # Return rows as a dict-like object for easier processing
        self._connection.row_factory = sqlite3.Row

close() async

Close database connection.

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def close(self) -> None:
    """Close database connection."""
    if self._connection:
        await self._connection.close()
        self._connection = None

execute(query, params=None) async

Execute a SQL query with optional parameters.

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def execute(self, query: str, params: tuple[Any, ...] | None = None) -> QueryResult:
    """Execute a SQL query with optional parameters."""
    safe_query = self.check_query_safety(query)
    # if self.read_only and self._is_write_query(query):
    #     raise PermissionError("Database is in read-only mode")

    await self.connect()
    # Check if connection was successfully established
    if self._connection is None:
        raise RuntimeError("Failed to establish database connection")

    start_time = time.perf_counter()

    # Check if connection exists to satisfy MyPy and prevent runtime crashes
    if self._connection is None:
        raise RuntimeError("Database connection is not initialized. Call connect() first.")

    # While using `aiosqlite`, executed call has to be awaited
    async with self._connection.execute(safe_query, params or ()) as cursor:
        rows = await cursor.fetchall()

        # Convert `sqlite3.Row` object to tuples for the protocol
        processed_rows = [tuple(row) for row in rows]
        columns = (
            [description[0] for description in cursor.description] if cursor.description else []
        )

        return QueryResult(
            columns=columns,
            rows=processed_rows,
            row_count=len(processed_rows),
            execution_time_ms=(time.perf_counter() - start_time) * 1000,
        )

get_tables() async

Get list of tables in the public schema.

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def get_tables(self) -> list[str]:
    """Get list of tables in the public schema."""
    # Fetch all table names from the database
    query = "SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%';"
    res = await self.execute(query)

    tables = []
    for row in res.rows:
        tables.append(row[0])

    return tables

get_foreign_keys(table_name) async

Get information about foreign keys in given table

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def get_foreign_keys(self, table_name: str) -> list[ForeignKeyInfo]:
    """Get information about foreign keys in given table"""
    tables = await self.get_tables()
    if table_name not in tables:
        return []

    foreign_keys = []
    query = f"PRAGMA foreign_key_list ({table_name});"
    res = await self.execute(query)

    for row in res.rows:
        foreign_keys.append(
            ForeignKeyInfo(column=row[3], references_table=row[2], references_column=row[4])
        )

    return foreign_keys

get_table_info(table_name, return_md=True) async

Get detailed information about a specific table.

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def get_table_info(
    self, table_name: str, return_md: bool = True
) -> TableInfo | str | None:
    """Get detailed information about a specific table."""
    tables = await self.get_tables()
    if table_name not in tables:
        return None

    query = f"PRAGMA table_info ({table_name});"
    res = await self.execute(query)

    columns = []
    primary_keys = []
    foreign_keys = []

    for row in res.rows:
        col = ColumnInfo(
            name=row[1],
            data_type=row[2],
            nullable=row[3] == 0,
            default=row[4],
            is_primary_key=row[5] == 1,
        )

        if col.is_primary_key:
            primary_keys.append(col.name)
        columns.append(col)

    # Get foreign keys for the table
    foreign_keys = await self.get_foreign_keys(table_name)

    # Get real row count
    count_res = await self.execute(f"SELECT COUNT(*) FROM {table_name};")
    actual_row_count = count_res.rows[0][0] if count_res.rows else 0

    table = TableInfo(
        name=table_name,
        columns=columns,
        row_count=actual_row_count,
        primary_key=primary_keys,
        foreign_keys=foreign_keys,
    )

    if return_md:
        table_md = self.render_table_as_markdown(table)
        return table_md

    return table

get_schema(return_md=True) async

Get database schema information.

Source code in src/database_pydantic_ai/sql/backends/sqlite.py
Python
async def get_schema(self, return_md: bool = True) -> SchemaInfo | str:
    """Get database schema information."""
    table_names = await self.get_tables()

    tasks = [self.get_table_info(table_name, return_md=return_md) for table_name in table_names]
    tables = await asyncio.gather(*tasks)

    if return_md:
        str_tables = [str(t) for t in tables if t]
        return "\n".join(str_tables)

    # Filter out empty responses in the output
    return SchemaInfo(tables=[t for t in tables if t])

PostgreSQL

database_pydantic_ai.sql.backends.postgres.PostgreSQLDatabase

Bases: BaseSQLDatabase, SQLDatabaseProtocol

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
class PostgreSQLDatabase(BaseSQLDatabase, SQLDatabaseProtocol):
    def __init__(
        self, user: str, password: str, db: str, host: str, read_only: bool = True
    ) -> None:
        super().__init__(read_only=read_only)
        self.user = user
        self.password = password
        self.db = db
        self.host = host
        self._pool: asyncpg.Pool | None = None

        self.dsn = f"postgresql://{self.user}:{self.password}@{self.host}/{self.db}"

    async def __aenter__(self) -> "PostgreSQLDatabase":
        """Support for `async with` context manager"""
        await self.connect()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Ensure the pool is closed when exiting the context."""
        await self.close()

    async def connect(
        self,
        min_size: int = 1,
        max_size: int = 10,
        command_timeout: float = 60.0,
        timeout: float = 120.0,
    ) -> asyncpg.Pool:
        """
        Connect to the database.

        Args:
            min_size: Minimum size of the connection pool.
            max_size: Maximum size of the connection pool.
            command_timeout: Timeout for individual queries in seconds.
            timeout: Timeout for establishing the connection in seconds.
        """
        if not self._pool:
            self._pool = await asyncpg.create_pool(
                self.dsn,
                min_size=min_size,
                max_size=max_size,
                command_timeout=command_timeout,  # Timeout for individual queries
                timeout=timeout,  # Timeout for establishing connection
            )
        return self._pool

    async def close(self) -> None:
        """Close database connection."""
        if self._pool:
            await self._pool.close()
            self._pool = None

    async def execute(self, query: str, params: tuple[Any, ...] | None = None) -> QueryResult:
        """Execute a SQL query with optional parameters."""
        safe_query = self.check_query_safety(query)

        pool = await self.connect()
        # Check if connection pool was successfully established
        if pool is None:
            raise RuntimeError("Failed to establish database connection")

        start_time = time.perf_counter()

        # Make the query w/ params
        if params:
            args = params if isinstance(params, (list, tuple)) else (params,)

            # Use `fetch` if you want rows back, or `execute` if you just want the status
            records = await pool.fetch(safe_query, *args)
        else:
            records = await pool.fetch(safe_query)

        # Transform data to fit schema
        processed_rows = [tuple(row) for row in records]
        columns = list(records[0].keys()) if records else []

        return QueryResult(
            columns=columns,
            rows=processed_rows,
            row_count=len(processed_rows),
            execution_time_ms=(time.perf_counter() - start_time),
        )

    async def get_tables(self) -> list[str]:
        """Get list of tables in the public schema."""
        pool = await self.connect()
        query = """
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'public'
            AND table_type = 'BASE TABLE'
            ORDER BY table_name;
        """
        result = await pool.fetch(query)
        table_list = [r["table_name"] for r in result]

        return table_list

    async def get_foreign_keys(self, table_name: str) -> list[ForeignKeyInfo]:
        """Get information about foreign keys in given table"""
        pool = await self.connect()
        # Use $1 instead of f-string
        query = """
        SELECT DISTINCT
            kcu.column_name AS local_column,
            ccu.table_name AS foreign_table,
            ccu.column_name AS foreign_column
        FROM
            information_schema.table_constraints AS tc
            JOIN information_schema.key_column_usage AS kcu
            ON tc.constraint_name = kcu.constraint_name
            AND tc.table_schema = kcu.table_schema
            JOIN information_schema.constraint_column_usage AS ccu
            ON ccu.constraint_name = tc.constraint_name
            AND ccu.table_schema = tc.table_schema
        WHERE tc.constraint_type = 'FOREIGN KEY'
            AND tc.table_name = $1;
        """

        # Pass table_name as an argument to fetch
        records = await pool.fetch(query, table_name)

        return [
            ForeignKeyInfo(
                column=r["local_column"],
                references_table=r["foreign_table"],
                references_column=r["foreign_column"],
            )
            for r in records
        ]

    async def get_table_info(
        self, table_name: str, return_md: bool = True
    ) -> TableInfo | str | None:
        """Get detailed information about a specific table."""
        tables = await self.get_tables()
        if table_name not in tables:
            return None

        query = """
            SELECT DISTINCT ON (c.ordinal_position)
                c.column_name,
                c.data_type,
                c.is_nullable,
                c.column_default,
                CASE WHEN tc.constraint_type = 'PRIMARY KEY' THEN TRUE ELSE FALSE END AS is_primary
            FROM information_schema.columns c
            LEFT JOIN information_schema.key_column_usage kcu
                ON c.table_name = kcu.table_name
                AND c.column_name = kcu.column_name
            LEFT JOIN information_schema.table_constraints tc
                ON kcu.constraint_name = tc.constraint_name
                AND tc.constraint_type = 'PRIMARY KEY'
            WHERE c.table_name = $1
            AND c.table_schema = 'public'
            ORDER BY c.ordinal_position, is_primary DESC;
        """
        pool = await self.connect()
        records = await pool.fetch(query, table_name)

        columns = []
        primary_keys = []
        foreign_keys = []

        for r in records:
            col = ColumnInfo(
                name=r["column_name"],
                data_type=r["data_type"],
                nullable=(r["is_nullable"] == "YES"),
                default=r["column_default"],
                is_primary_key=r["is_primary"],
            )

            if col.is_primary_key:
                primary_keys.append(col.name)
            columns.append(col)

        # Get foreign keys using your existing method
        foreign_keys = await self.get_foreign_keys(table_name)

        # Get real row count
        count_res = await self.execute(f"SELECT COUNT(*) FROM {table_name};")
        actual_row_count = count_res.rows[0][0] if count_res.rows else 0

        table = TableInfo(
            name=table_name,
            columns=columns,
            row_count=actual_row_count,
            foreign_keys=foreign_keys,
            primary_key=primary_keys,
        )

        if return_md:
            table_md = self.render_table_as_markdown(table)
            return table_md

        return table

    async def get_schema(self, return_md: bool = True) -> SchemaInfo | str:
        """Get database schema information."""
        table_names = await self.get_tables()

        tasks = [self.get_table_info(table_name, return_md=return_md) for table_name in table_names]
        tables = await asyncio.gather(*tasks)

        if return_md:
            str_tables = [str(t) for t in tables if t]
            return "\n".join(str_tables)

        # Filter out empty responses in the output
        return SchemaInfo(tables=[t for t in tables if t])

    async def explain(self, query: str) -> str:
        """Get query execution plan."""
        pool = await self.connect()
        query = f"EXPLAIN {query}"

        explanation = ""

        try:
            result = await pool.fetch(query)

            for r in result:
                explanation += f"{r['QUERY PLAN']}"

            return explanation

        except asyncpg.exceptions.PostgresSyntaxError:
            return "Invalid query, please try again"

__aenter__() async

Support for async with context manager

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def __aenter__(self) -> "PostgreSQLDatabase":
    """Support for `async with` context manager"""
    await self.connect()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Ensure the pool is closed when exiting the context.

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Ensure the pool is closed when exiting the context."""
    await self.close()

connect(min_size=1, max_size=10, command_timeout=60.0, timeout=120.0) async

Connect to the database.

Parameters:

Name Type Description Default
min_size int

Minimum size of the connection pool.

1
max_size int

Maximum size of the connection pool.

10
command_timeout float

Timeout for individual queries in seconds.

60.0
timeout float

Timeout for establishing the connection in seconds.

120.0
Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def connect(
    self,
    min_size: int = 1,
    max_size: int = 10,
    command_timeout: float = 60.0,
    timeout: float = 120.0,
) -> asyncpg.Pool:
    """
    Connect to the database.

    Args:
        min_size: Minimum size of the connection pool.
        max_size: Maximum size of the connection pool.
        command_timeout: Timeout for individual queries in seconds.
        timeout: Timeout for establishing the connection in seconds.
    """
    if not self._pool:
        self._pool = await asyncpg.create_pool(
            self.dsn,
            min_size=min_size,
            max_size=max_size,
            command_timeout=command_timeout,  # Timeout for individual queries
            timeout=timeout,  # Timeout for establishing connection
        )
    return self._pool

close() async

Close database connection.

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def close(self) -> None:
    """Close database connection."""
    if self._pool:
        await self._pool.close()
        self._pool = None

execute(query, params=None) async

Execute a SQL query with optional parameters.

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def execute(self, query: str, params: tuple[Any, ...] | None = None) -> QueryResult:
    """Execute a SQL query with optional parameters."""
    safe_query = self.check_query_safety(query)

    pool = await self.connect()
    # Check if connection pool was successfully established
    if pool is None:
        raise RuntimeError("Failed to establish database connection")

    start_time = time.perf_counter()

    # Make the query w/ params
    if params:
        args = params if isinstance(params, (list, tuple)) else (params,)

        # Use `fetch` if you want rows back, or `execute` if you just want the status
        records = await pool.fetch(safe_query, *args)
    else:
        records = await pool.fetch(safe_query)

    # Transform data to fit schema
    processed_rows = [tuple(row) for row in records]
    columns = list(records[0].keys()) if records else []

    return QueryResult(
        columns=columns,
        rows=processed_rows,
        row_count=len(processed_rows),
        execution_time_ms=(time.perf_counter() - start_time),
    )

get_tables() async

Get list of tables in the public schema.

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def get_tables(self) -> list[str]:
    """Get list of tables in the public schema."""
    pool = await self.connect()
    query = """
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = 'public'
        AND table_type = 'BASE TABLE'
        ORDER BY table_name;
    """
    result = await pool.fetch(query)
    table_list = [r["table_name"] for r in result]

    return table_list

get_foreign_keys(table_name) async

Get information about foreign keys in given table

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def get_foreign_keys(self, table_name: str) -> list[ForeignKeyInfo]:
    """Get information about foreign keys in given table"""
    pool = await self.connect()
    # Use $1 instead of f-string
    query = """
    SELECT DISTINCT
        kcu.column_name AS local_column,
        ccu.table_name AS foreign_table,
        ccu.column_name AS foreign_column
    FROM
        information_schema.table_constraints AS tc
        JOIN information_schema.key_column_usage AS kcu
        ON tc.constraint_name = kcu.constraint_name
        AND tc.table_schema = kcu.table_schema
        JOIN information_schema.constraint_column_usage AS ccu
        ON ccu.constraint_name = tc.constraint_name
        AND ccu.table_schema = tc.table_schema
    WHERE tc.constraint_type = 'FOREIGN KEY'
        AND tc.table_name = $1;
    """

    # Pass table_name as an argument to fetch
    records = await pool.fetch(query, table_name)

    return [
        ForeignKeyInfo(
            column=r["local_column"],
            references_table=r["foreign_table"],
            references_column=r["foreign_column"],
        )
        for r in records
    ]

get_table_info(table_name, return_md=True) async

Get detailed information about a specific table.

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def get_table_info(
    self, table_name: str, return_md: bool = True
) -> TableInfo | str | None:
    """Get detailed information about a specific table."""
    tables = await self.get_tables()
    if table_name not in tables:
        return None

    query = """
        SELECT DISTINCT ON (c.ordinal_position)
            c.column_name,
            c.data_type,
            c.is_nullable,
            c.column_default,
            CASE WHEN tc.constraint_type = 'PRIMARY KEY' THEN TRUE ELSE FALSE END AS is_primary
        FROM information_schema.columns c
        LEFT JOIN information_schema.key_column_usage kcu
            ON c.table_name = kcu.table_name
            AND c.column_name = kcu.column_name
        LEFT JOIN information_schema.table_constraints tc
            ON kcu.constraint_name = tc.constraint_name
            AND tc.constraint_type = 'PRIMARY KEY'
        WHERE c.table_name = $1
        AND c.table_schema = 'public'
        ORDER BY c.ordinal_position, is_primary DESC;
    """
    pool = await self.connect()
    records = await pool.fetch(query, table_name)

    columns = []
    primary_keys = []
    foreign_keys = []

    for r in records:
        col = ColumnInfo(
            name=r["column_name"],
            data_type=r["data_type"],
            nullable=(r["is_nullable"] == "YES"),
            default=r["column_default"],
            is_primary_key=r["is_primary"],
        )

        if col.is_primary_key:
            primary_keys.append(col.name)
        columns.append(col)

    # Get foreign keys using your existing method
    foreign_keys = await self.get_foreign_keys(table_name)

    # Get real row count
    count_res = await self.execute(f"SELECT COUNT(*) FROM {table_name};")
    actual_row_count = count_res.rows[0][0] if count_res.rows else 0

    table = TableInfo(
        name=table_name,
        columns=columns,
        row_count=actual_row_count,
        foreign_keys=foreign_keys,
        primary_key=primary_keys,
    )

    if return_md:
        table_md = self.render_table_as_markdown(table)
        return table_md

    return table

get_schema(return_md=True) async

Get database schema information.

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def get_schema(self, return_md: bool = True) -> SchemaInfo | str:
    """Get database schema information."""
    table_names = await self.get_tables()

    tasks = [self.get_table_info(table_name, return_md=return_md) for table_name in table_names]
    tables = await asyncio.gather(*tasks)

    if return_md:
        str_tables = [str(t) for t in tables if t]
        return "\n".join(str_tables)

    # Filter out empty responses in the output
    return SchemaInfo(tables=[t for t in tables if t])

explain(query) async

Get query execution plan.

Source code in src/database_pydantic_ai/sql/backends/postgres.py
Python
async def explain(self, query: str) -> str:
    """Get query execution plan."""
    pool = await self.connect()
    query = f"EXPLAIN {query}"

    explanation = ""

    try:
        result = await pool.fetch(query)

        for r in result:
            explanation += f"{r['QUERY PLAN']}"

        return explanation

    except asyncpg.exceptions.PostgresSyntaxError:
        return "Invalid query, please try again"

Protocol

database_pydantic_ai.sql.protocol.SQLDatabaseProtocol

Bases: Protocol

Protocol for database backends.

Source code in src/database_pydantic_ai/sql/protocol.py
Python
@runtime_checkable
class SQLDatabaseProtocol(Protocol):
    """Protocol for database backends."""

    read_only: bool

    async def connect(self) -> None:
        """Connect to the database"""
        ...

    async def close(self) -> None:
        """Close database connection."""
        ...

    async def execute(
        self,
        query: str,
        params: tuple[Any, ...] | None = None,
    ) -> QueryResult:
        """Execute a SQL query with optional parameters."""
        ...

    async def get_tables(self) -> list[str]:
        """Get list of tables in the database"""
        ...

    async def get_foreign_keys(self, table_name: str) -> list[ForeignKeyInfo]:
        """Get information about foreign keys in given table"""
        ...

    async def get_table_info(self, table_name: str) -> TableInfo | str | None:
        """Get detailed information about a specific table."""
        ...

    async def get_schema(self, return_md: bool = True) -> SchemaInfo | str:
        """Get database schema information."""
        ...

    async def explain(self, query: str) -> str:
        """Get query execution plan."""
        ...

connect() async

Connect to the database

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def connect(self) -> None:
    """Connect to the database"""
    ...

close() async

Close database connection.

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def close(self) -> None:
    """Close database connection."""
    ...

execute(query, params=None) async

Execute a SQL query with optional parameters.

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def execute(
    self,
    query: str,
    params: tuple[Any, ...] | None = None,
) -> QueryResult:
    """Execute a SQL query with optional parameters."""
    ...

get_tables() async

Get list of tables in the database

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def get_tables(self) -> list[str]:
    """Get list of tables in the database"""
    ...

get_foreign_keys(table_name) async

Get information about foreign keys in given table

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def get_foreign_keys(self, table_name: str) -> list[ForeignKeyInfo]:
    """Get information about foreign keys in given table"""
    ...

get_table_info(table_name) async

Get detailed information about a specific table.

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def get_table_info(self, table_name: str) -> TableInfo | str | None:
    """Get detailed information about a specific table."""
    ...

get_schema(return_md=True) async

Get database schema information.

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def get_schema(self, return_md: bool = True) -> SchemaInfo | str:
    """Get database schema information."""
    ...

explain(query) async

Get query execution plan.

Source code in src/database_pydantic_ai/sql/protocol.py
Python
async def explain(self, query: str) -> str:
    """Get query execution plan."""
    ...

Types

database_pydantic_ai.types

CustomTypes

Bases: BaseModel

Base class for custom Pydantic models in the database-pydantic-ai library.

This class provides a common configuration for all custom models in the library, allowing arbitrary types to be used in Pydantic models.

Source code in src/database_pydantic_ai/types.py
Python
class CustomTypes(BaseModel):
    """
    Base class for custom Pydantic models in the database-pydantic-ai library.

    This class provides a common configuration for all custom models in the library,
    allowing arbitrary types to be used in Pydantic models.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

QueryResult

Bases: CustomTypes

Result of a database query.

Source code in src/database_pydantic_ai/types.py
Python
class QueryResult(CustomTypes):
    """Result of a database query."""

    columns: list[str]
    rows: list[tuple[Any, ...]]
    row_count: int
    execution_time_ms: float

    def __len__(self) -> int:
        return len(self.rows)

ColumnInfo

Bases: CustomTypes

Information about a table column.

Source code in src/database_pydantic_ai/types.py
Python
class ColumnInfo(CustomTypes):
    """Information about a table column."""

    name: str
    data_type: str
    nullable: bool = True
    default: str | None = None
    is_primary_key: bool = False

ForeignKeyInfo

Bases: CustomTypes

Foreign key relationship.

Source code in src/database_pydantic_ai/types.py
Python
class ForeignKeyInfo(CustomTypes):
    """Foreign key relationship."""

    column: str
    references_table: str
    references_column: str

TableInfo

Bases: CustomTypes

Information about a database table.

Source code in src/database_pydantic_ai/types.py
Python
class TableInfo(CustomTypes):
    """Information about a database table."""

    name: str
    columns: list[ColumnInfo]
    row_count: int | None = None
    primary_key: list[str] | None = None
    foreign_keys: list[ForeignKeyInfo] | None = None

SchemaInfo

Bases: CustomTypes

Database schema information.

Source code in src/database_pydantic_ai/types.py
Python
class SchemaInfo(CustomTypes):
    """Database schema information."""

    tables: list[TableInfo | str]
    views: list[str] | None = None