IntermediatePython~25 min

Context Assembly Pipeline

Build a complete context assembly system that gathers conversation history, retrieved knowledge, user preferences, and tool outputs from Valkey before every LLM call.

The Context Assembly Function

Before every LLM call, an agent needs to assemble context from multiple sources. This function is the core of context engineering - it gathers everything the LLM needs into a structured prompt.

import valkey
import json
import struct
import time

client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)
client_bin = valkey.Valkey(host="localhost", port=6379, decode_responses=False)

def assemble_context(user_id: str, session_id: str, agent_id: str, current_message: str) -> list:
    """Assemble complete context for an LLM call from all 5 sources in Valkey."""
    messages = []

    # 1. System instructions (broadest context)
    config = client.hgetall(f"agent:config:{agent_id}")
    if config:
        system_prompt = f"{config.get('role', '')}\n\nConstraints: {config.get('constraints', '')}\nFormat: {config.get('output_format', '')}"
        messages.append({"role": "system", "content": system_prompt})

    # 2. User memory (cross-session preferences)
    memories = client.hgetall(f"memory:{user_id}")
    if memories:
        mem_str = "\n".join(f"- {k}: {v}" for k, v in memories.items())
        messages.append({"role": "system", "content": f"User context:\n{mem_str}"})

    # 3. Retrieved knowledge (RAG - would use FT.SEARCH in production)
    # Simplified here; see the Vector Search cookbook for full implementation
    kb_results = client.lrange(f"kb:results:{session_id}", 0, -1)
    if kb_results:
        docs = [json.loads(d) for d in kb_results]
        kb_str = "\n\n".join(f"[{d.get('title', 'Doc')}]: {d.get('content', '')}" for d in docs)
        messages.append({"role": "system", "content": f"Relevant knowledge:\n{kb_str}"})

    # 4. Conversation history (recent turns)
    history = client.lrange(f"chat:{session_id}", -10, -1)
    for raw in history:
        msg = json.loads(raw)
        messages.append({"role": msg["role"], "content": msg["content"]})

    # 5. Tool outputs from current session
    tool_keys = client.keys(f"tool:{session_id}:step_*")
    for key in sorted(tool_keys):
        data = client.hgetall(key)
        messages.append({
            "role": "system",
            "content": f"Tool '{data.get('tool', 'unknown')}' returned: {data.get('result', '{}')}",
        })

    # 6. Current user message (most specific)
    messages.append({"role": "user", "content": current_message})

    return messages

Token Budgeting

Context windows have limits. You need to budget tokens across sources:

def budget_context(messages: list, max_tokens: int = 8000) -> list:
    """Trim context to fit within the token budget.
    
    Simple heuristic: ~4 chars per token for English text.
    In production, use tiktoken for accurate counting.
    """
    CHARS_PER_TOKEN = 4
    max_chars = max_tokens * CHARS_PER_TOKEN

    total = sum(len(m["content"]) for m in messages)
    if total <= max_chars:
        return messages  # Fits already

    # Priority: keep system prompt + current message, trim history from oldest
    system_msgs = [m for m in messages if m["role"] == "system"]
    user_msgs = [m for m in messages if m["role"] != "system"]

    # Always keep the last user message
    current = user_msgs[-1:]
    history = user_msgs[:-1]

    # Trim history from the oldest until it fits
    budget_remaining = max_chars - sum(len(m["content"]) for m in system_msgs + current)
    trimmed_history = []
    for msg in reversed(history):
        if budget_remaining - len(msg["content"]) > 0:
            trimmed_history.insert(0, msg)
            budget_remaining -= len(msg["content"])
        else:
            break

    return system_msgs + trimmed_history + current

Putting It Together

# Setup: store some context
client.hset("agent:config:demo_agent", mapping={
    "role": "You are a helpful AI assistant.",
    "constraints": "Be concise and accurate.",
    "output_format": "Plain text.",
})

client.hset("memory:user_42", mapping={
    "name": "Bob",
    "preferred_language": "English",
    "tier": "free",
})

# Add conversation
for msg in [
    ("user", "Hi, I need help with my account"),
    ("assistant", "Sure! What do you need help with?"),
    ("user", "I want to upgrade to premium"),
]:
    data = json.dumps({"role": msg[0], "content": msg[1], "ts": time.time()})
    client.rpush("chat:sess_42", data)
client.expire("chat:sess_42", 1800)

# Store a tool result
client.hset("tool:sess_42:step_1", mapping={
    "tool": "check_account",
    "result": json.dumps({"plan": "free", "eligible_upgrade": True}),
    "timestamp": str(time.time()),
})
client.expire("tool:sess_42:step_1", 3600)

# Assemble context
context = assemble_context("user_42", "sess_42", "demo_agent", "How much does premium cost?")
budgeted = budget_context(context, max_tokens=4000)

print(f"Assembled {len(context)} messages, budgeted to {len(budgeted)}")
for msg in budgeted:
    print(f"  [{msg['role']}] {msg['content'][:80]}...")

Context Assembly Flow

Step Source Valkey Command Purpose
1 System config HGETALL agent:config:{id} Agent role + constraints
2 User memory HGETALL memory:{user_id} Preferences, history
3 Knowledge base FT.SEARCH or LRANGE Retrieved docs (RAG)
4 Chat history LRANGE chat:{session} -10 -1 Recent conversation
5 Tool outputs HGETALL tool:{session}:step_* Function call results
6 Current message (from input) What the user just said

Key insight from Philipp Schmid (Google DeepMind): "Context engineering is a system, not a string. Context isn't just a static prompt template - it's the output of a system that runs before the main LLM call."