AdvancedPython~25 min

Agent State

The Problem

AI agents run multi-step workflows: plan → search → analyze → respond. If the process crashes at step 3, you lose everything and start over. Worse, if the agent calls the same expensive tool twice, you pay double. You need:

Three Valkey Patterns, One Agent

┌─────────────────────────────────────────────────────────┐
│                    Valkey Server                        │
│                                                         │
│  HASH  agent:state:{run_id}  → Checkpoint (current step,│
│                                 intermediate results)   │
│                                                         │
│  STRING tool:cache:{hash}    → Cached tool outputs      │
│         (with TTL)              (avoid duplicate calls)  │
│                                                         │
│  STREAM agent:log:{run_id}   → Ordered event log        │
│                                 (replay & debugging)     │
└─────────────────────────────────────────────────────────┘

Pattern 1: Agent Checkpointing with HASH

Save the agent's state after each step. If it crashes, resume from the last checkpoint.

import json
from glide import GlideClient, GlideClientConfiguration, NodeAddress

async def save_checkpoint(client, run_id, step, data):
    """Save agent state after completing a step."""
    key = f"agent:state:{run_id}"
    await client.hset(key, {
        "current_step": str(step),
        "status": "in_progress",
        f"result_step_{step}": json.dumps(data),
        "updated_at": str(time.time()),
    })
    await client.expire(key, 86400)  # 24h TTL


async def load_checkpoint(client, run_id):
    """Load the last checkpoint to resume."""
    key = f"agent:state:{run_id}"
    state = await client.hgetall(key)
    if not state:
        return None
    return {k.decode(): v.decode() for k, v in state.items()}


async def mark_complete(client, run_id):
    await client.hset(f"agent:state:{run_id}", {"status": "complete"})

Pattern 2: Tool Result Caching with STRING

Cache expensive tool outputs (API calls, web searches, database queries) so the agent doesn't repeat them.

import hashlib

async def cached_tool_call(client, tool_name, args, tool_fn, ttl=300):
    """Call a tool, caching the result in Valkey."""
    # Create a cache key from tool name + arguments
    args_hash = hashlib.md5(json.dumps(args, sort_keys=True).encode()).hexdigest()[:12]
    cache_key = f"tool:cache:{tool_name}:{args_hash}"

    # Check cache first
    cached = await client.get(cache_key)
    if cached:
        print(f"⚡ Tool cache HIT: {tool_name}")
        return json.loads(cached)

    # Cache miss - call the tool
    print(f"🔄 Tool cache MISS: {tool_name} - calling...")
    result = tool_fn(**args)

    # Store with TTL
    await client.set(cache_key, json.dumps(result))
    await client.expire(cache_key, ttl)

    return result


# Usage
result = await cached_tool_call(
    client, "web_search",
    {"query": "Valkey vector search benchmarks"},
    tool_fn=web_search,
    ttl=600,  # cache for 10 minutes
)

Worth knowing: SET key value EX ttl is the simplest caching pattern in Valkey. For tool results, the TTL should match how quickly the data goes stale - 5 minutes for web searches, 1 hour for database queries, 24 hours for static lookups.

Pattern 3: Activity Log with STREAM

Log every agent action to a Valkey Stream. This creates an ordered, persistent audit trail for debugging and replay.

async def log_action(client, run_id, action, details):
    """Append an action to the agent's activity log."""
    stream_key = f"agent:log:{run_id}"
    await client.xadd(stream_key, [
        ("action", action),
        ("details", json.dumps(details)),
        ("ts", str(time.time())),
    ])


async def get_action_log(client, run_id):
    """Read the full activity log for a run."""
    from glide import StreamReadOptions
    stream_key = f"agent:log:{run_id}"
    result = await client.xread({stream_key: "0"}, StreamReadOptions(count=1000))
    if not result:
        return []

    entries = []
    for key, events in result.items():
        for entry_id, fields in events.items():
            entry = {k.decode(): v.decode() for k, v in fields}
            entry["id"] = entry_id.decode()
            entries.append(entry)
    return entries

Putting It All Together: A Resumable Agent

async def run_agent(client, run_id, query):
    # Check for existing checkpoint
    checkpoint = await load_checkpoint(client, run_id)
    start_step = int(checkpoint["current_step"]) + 1 if checkpoint else 1

    if start_step > 1:
        print(f"♻️  Resuming from step {start_step}")

    # Step 1: Search
    if start_step <= 1:
        await log_action(client, run_id, "search", {"query": query})
        results = await cached_tool_call(
            client, "web_search", {"query": query}, web_search)
        await save_checkpoint(client, run_id, 1, {"search_results": results})

    # Step 2: Analyze
    if start_step <= 2:
        await log_action(client, run_id, "analyze", {"input": "search results"})
        analysis = call_llm(f"Analyze: {results}")
        await save_checkpoint(client, run_id, 2, {"analysis": analysis})

    # Step 3: Respond
    if start_step <= 3:
        await log_action(client, run_id, "respond", {"status": "generating"})
        response = call_llm(f"Respond based on: {analysis}")
        await save_checkpoint(client, run_id, 3, {"response": response})

    await mark_complete(client, run_id)
    await log_action(client, run_id, "complete", {"status": "done"})
    return response

Debugging: Inspect the Activity Log

log = await get_action_log(client, "run_001")
for entry in log:
    print(f"  [{entry['id']}] {entry['action']}: {entry['details']}")

# Output:
# [1710000000001-0] search: {"query": "Valkey benchmarks"}
# [1710000000002-0] analyze: {"input": "search results"}
# [1710000000003-0] respond: {"status": "generating"}
# [1710000000004-0] complete: {"status": "done"}

Valkey Commands Reference

Pattern Command Latency
Save checkpoint HSET agent:state:{id} step 2 result '{...}' ~0.1ms
Load checkpoint HGETALL agent:state:{id} ~0.1ms
Cache tool result SET tool:cache:{hash} '{...}' EX 300 ~0.1ms
Check tool cache GET tool:cache:{hash} ~0.1ms
Log action XADD agent:log:{id} * action "search" ... ~0.1ms
Read full log XREAD STREAMS agent:log:{id} 0 ~0.1ms

Summary: Five Valkey Data Structures, One Conversation System

Across these 5 cookbooks, we've used every major Valkey data structure for a different aspect of conversation memory:

Cookbook Data Structure Purpose
01 Getting Started LIST Ordered chat history
02 Session Management HASH Session metadata
03 Semantic Memory JSON + FT.SEARCH Vector search over conversations
04 Semantic Caching JSON + FT.SEARCH Cache LLM responses by meaning
05 Agent State HASH + STRING + STREAM Checkpoints, tool cache, event log

One Valkey instance. Six data structures. Complete AI memory infrastructure.