Memory Storage
The MemoryRecord Model
Each memory is a structured record with content, scope, categories, importance, and an embedding vector:
from pydantic import BaseModel, Field
from datetime import datetime
from uuid import uuid4
class MemoryRecord(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
content: str = Field(default="")
scope: str = Field(default="/")
categories: list[str] = Field(default_factory=list)
metadata: dict = Field(default_factory=dict)
importance: float = Field(default=0.5, ge=0.0, le=1.0)
created_at: datetime = Field(default_factory=datetime.utcnow)
last_accessed: datetime = Field(default_factory=datetime.utcnow)
embedding: list[float] | None = Field(default=None)
source: str | None = Field(default=None)
private: bool = Field(default=False)
Step 1: Serialization
Convert MemoryRecord to/from Valkey JSON documents:
import json
def serialize_record(record: MemoryRecord) -> dict:
return {
"id": record.id,
"content": record.content,
"scope": record.scope,
"categories_str": ",".join(record.categories),
"metadata_str": json.dumps(record.metadata),
"importance": record.importance,
"created_at": record.created_at.timestamp(),
"last_accessed": record.last_accessed.timestamp(),
"embedding": record.embedding,
"source": record.source or "",
"private": "true" if record.private else "false",
}
Step 2: Create the HNSW Index
from glide import (
ft, VectorField, VectorAlgorithm, VectorFieldAttributesHnsw,
VectorType, DistanceMetricType, NumericField, TextField,
FtCreateOptions, DataType,
)
INDEX_NAME = "memory_idx"
KEY_PREFIX = "memory:"
VECTOR_DIM = 1536 # Amazon Titan embeddings
async def ensure_index(client):
# Check if index already exists
existing = await ft.list(client)
if INDEX_NAME.encode() in existing:
return
hnsw = VectorFieldAttributesHnsw(
dimensions=VECTOR_DIM,
distance_metric=DistanceMetricType.COSINE,
type=VectorType.FLOAT32,
)
await ft.create(
client, INDEX_NAME,
schema=[
TextField("$.scope", alias="scope"),
TextField("$.categories_str", alias="categories_str"),
NumericField("$.importance", alias="importance"),
NumericField("$.created_at", alias="created_at"),
VectorField("$.embedding", VectorAlgorithm.HNSW,
alias="embedding", attributes=hnsw),
],
options=FtCreateOptions(data_type=DataType.JSON, prefixes=[KEY_PREFIX]),
)
Valkey Command:
FT.CREATE memory_idx ON JSON PREFIX 1 "memory:"
SCHEMA $.scope AS scope TAG
$.categories_str AS categories_str TAG
$.importance AS importance NUMERIC
$.created_at AS created_at NUMERIC
$.embedding AS embedding VECTOR HNSW 6
TYPE FLOAT32 DIM 1536 DISTANCE_METRIC COSINE
Step 3: Store a Memory
from glide import glide_json
async def store(client, record: MemoryRecord, ttl: int = 3600):
key = f"{KEY_PREFIX}{record.id}"
doc = serialize_record(record)
await glide_json.set(client, key, "$", json.dumps(doc))
await client.expire(key, ttl)
Valkey Commands:
JSON.SET memory:abc-123-def $ '{"id":"abc-123-def","content":"Always check for null...","embedding":[0.12,-0.45,...],...}'
EXPIRE memory:abc-123-def 3600
Step 4: Recall by Semantic Similarity
import struct
async def recall(client, query_embedding: list[float], limit: int = 5):
# Pack embedding to bytes for FT.SEARCH
vec_bytes = struct.pack(f"<{len(query_embedding)}f", *query_embedding)
result = await ft.search(
client, INDEX_NAME,
f"(*)=>[KNN {limit} @embedding $vec AS score]",
options=FtSearchOptions(params={"vec": vec_bytes}),
)
# Parse results - score is cosine distance, convert to similarity
memories = []
if result and len(result) >= 2 and result[1]:
for key, fields in result[1].items():
doc = json.loads(fields[b"$"])
score = 1.0 - float(fields[b"score"])
memories.append((doc, score))
return sorted(memories, key=lambda x: x[1], reverse=True)
Valkey Command:
FT.SEARCH memory_idx
"(*)==>[KNN 5 @embedding $vec AS score]"
PARAMS 2 vec <binary_vector_6144_bytes>
LIMIT 0 5
Step 5: Filtered Search
async def search(client, query_embedding, scope: str = None, categories: list = None, limit: int = 5):
# Build filter expression
filters = []
if scope:
escaped = scope.replace("/", "\\/")
filters.append(f"@scope:{{{escaped}*}}")
if categories:
joined = "|".join(categories)
filters.append(f"@categories_str:{{{joined}}}")
filter_str = " ".join(filters) if filters else "*"
query = f"({filter_str})=>[KNN {limit} @embedding $vec AS score]"
vec_bytes = struct.pack(f"<{len(query_embedding)}f", *query_embedding)
result = await ft.search(client, INDEX_NAME, query, options=FtSearchOptions(params={"vec": vec_bytes}))
# ... parse results same as recall()
Next Steps
The storage backend is complete. Next we wire it into CrewAI's Memory system and run real agents.