Skip to content

API Reference

Core

memv.Memory

Main entry point for memv.

Example
Python
memory = Memory(db_path="memory.db", embedding_client=embedder, llm_client=llm)
await memory.open()

await memory.add_message(Message(...))
await memory.process(user_id="user123")  # Extract knowledge

results = await memory.retrieve("query", user_id="user123")
print(results.to_prompt())  # Formatted for LLM context

await memory.close()
Auto-processing:
Python
memory = Memory(
    db_path="memory.db",
    embedding_client=embedder,
    llm_client=llm,
    auto_process=True,      # Enable automatic processing
    batch_threshold=10,     # Messages before trigger
)
async with memory:
    # Messages buffer automatically, processing triggers at threshold
    await memory.add_exchange(user_id, user_msg, assistant_msg)

    # Force processing of buffered messages
    await memory.flush(user_id)

    # Wait for background processing to complete
    await memory.wait_for_processing(user_id, timeout=30)

open async

Python
open() -> None

Open all database connections and initialize components.

close async

Python
close(cancel_pending: bool = True) -> None

Close all database connections.

Parameters:

Name Type Description Default
cancel_pending bool

If True, cancel any running processing tasks. If False, wait for them to complete first.

True

add_message async

Python
add_message(message: Message) -> None

Add a message to memory.

Messages are stored immediately. Call process() to extract knowledge.

add_exchange async

Python
add_exchange(
    user_id: str,
    user_message: str,
    assistant_message: str,
    timestamp: datetime | None = None,
) -> tuple[Message, Message]

Convenience method to add a user/assistant exchange.

If auto_process is enabled, buffers messages and triggers background processing when batch_threshold is reached.

Returns the created Message objects.

retrieve async

Python
retrieve(
    query: str,
    user_id: str,
    top_k: int = 10,
    vector_weight: float = 0.5,
    include_episodes: bool = True,
    at_time: datetime | None = None,
    include_expired: bool = False,
) -> RetrievalResult

Retrieve relevant knowledge and episodes for a query.

Parameters:

Name Type Description Default
query str

Search query

required
user_id str

Filter results to this user only (required for privacy)

required
top_k int

Number of results to return per category

10
vector_weight float

Balance between vector (1.0) and text (0.0) search

0.5
include_episodes bool

Whether to search and return episodes

True
at_time datetime | None

If provided, filter knowledge by validity at this event time. Returns only knowledge that was valid at that point in time.

None
include_expired bool

If True, include superseded (expired) records. Useful for viewing full history of a fact.

False

Returns:

Type Description
RetrievalResult

RetrievalResult containing knowledge and episodes.

RetrievalResult

Use result.to_prompt() to get formatted context for LLM.

process async

Python
process(user_id: str) -> int

Process unprocessed messages for a user into episodes and extract knowledge.

Flow: 1. Get messages not yet assigned to episodes 2. Segment into episodes (boundary detection) 3. Generate episode title/narrative 4. Index episode for retrieval 5. Retrieve existing knowledge for context 6. Run predict-calibrate extraction 7. Store extracted knowledge with embeddings

Parameters:

Name Type Description Default
user_id str

Process messages for this user

required

Returns:

Type Description
int

Number of knowledge entries extracted

process_async

Python
process_async(user_id: str) -> ProcessTask

Non-blocking process. Returns handle to monitor/await.

Parameters:

Name Type Description Default
user_id str

Process messages for this user

required

Returns:

Type Description
ProcessTask

ProcessTask handle to monitor progress or await completion

Example

task = memory.process_async(user_id)

... do other work ...

count = await task.wait()

process_messages async

Python
process_messages(
    messages: list[Message], user_id: str
) -> int

Process explicit messages into an episode and extract knowledge.

Lower-level method for when you want direct control over what gets processed.

Parameters:

Name Type Description Default
messages list[Message]

Messages to process (will be grouped into one episode)

required
user_id str

User ID for the episode

required

Returns:

Type Description
int

Number of knowledge entries extracted

wait_for_processing async

Python
wait_for_processing(
    user_id: str, timeout: float | None = None
) -> int

Wait for background processing to complete.

Parameters:

Name Type Description Default
user_id str

User whose processing to wait for

required
timeout float | None

Max seconds to wait (None = wait forever)

None

Returns:

Type Description
int

Number of knowledge entries extracted, or 0 if no task running

Raises:

Type Description
TimeoutError

If timeout is exceeded

flush async

Python
flush(user_id: str) -> int

Force processing of buffered messages regardless of threshold.

Schedules processing if there are unprocessed messages and waits for completion.

Parameters:

Name Type Description Default
user_id str

User whose messages to process

required

Returns:

Type Description
int

Number of knowledge entries extracted

clear_user async

Python
clear_user(user_id: str) -> dict[str, int]

Delete all data for a user: messages, episodes, knowledge, and indices.

This is a destructive operation. Use with caution.

Parameters:

Name Type Description Default
user_id str

User whose data to delete

required

Returns:

Type Description
dict[str, int]

Dict with counts of deleted items per category

memv.MemoryConfig dataclass

Configuration for Memory system.

Provides centralized configuration with sensible defaults. Can be passed to Memory() or individual params can be overridden.

Example
Python
config = MemoryConfig(
    max_statements_for_prediction=5,
    enable_episode_merging=False,
)
memory = Memory(config=config, embedding_client=embedder, llm_client=llm)

Models

memv.models.Message

Bases: BaseModel

memv.models.MessageRole

Bases: StrEnum

memv.models.Episode

Bases: BaseModel

memv.models.SemanticKnowledge

Bases: BaseModel

invalidate

Python
invalidate() -> None

Mark this knowledge as superseded.

is_valid_at

Python
is_valid_at(event_time: datetime) -> bool

Check if fact was true at given event time.

is_current

Python
is_current() -> bool

Check if this is the current (non-expired) record.

memv.models.RetrievalResult

Bases: BaseModel

Results from memory retrieval.

as_text

Python
as_text() -> str

Simple text representation of knowledge statements.

to_prompt

Python
to_prompt() -> str

Format retrieval results for LLM context injection.

Groups knowledge by source episode and includes episode context to avoid redundancy.

memv.models.ProcessTask

Bases: BaseModel

Handle for monitoring/awaiting async processing.

done property

Python
done: bool

Check if processing has completed (success or failure).

wait async

Python
wait() -> int

Wait for processing to complete and return knowledge count.

memv.models.ProcessStatus

Bases: StrEnum

memv.models.ExtractedKnowledge

Bases: BaseModel

Output of predict-calibrate extraction.


Protocols

memv.protocols.EmbeddingClient

Bases: Protocol

memv.protocols.LLMClient

Bases: Protocol

Protocol for LLM completions with structured output.


Built-in Adapters

memv.embeddings.OpenAIEmbedAdapter

memv.llm.PydanticAIAdapter

LLM client using PydanticAI.

Supports multiple providers out of the box: - "openai:gpt-4.1-mini" - "anthropic:claude-3-5-sonnet-latest" - "google-gla:gemini-2.5-flash" - "groq:llama-3.3-70b-versatile"

See https://ai.pydantic.dev/models/ for full list.

generate async

Python
generate(prompt: str) -> str

Generate unstructured text response.

generate_structured async

Python
generate_structured(
    prompt: str, response_model: type[T]
) -> T

Generate structured response matching the Pydantic model.