IntermediatePython~20 min

Valkey Streams

Pub/Sub vs Streams

Feature Pub/Sub Streams
Durability Ephemeral Persistent until trimmed
Replay No Yes - XRANGE
Consumer groups No Yes
Ordering No guarantees Strict by ID
Backpressure None MAXLEN trimming

Step 1: Add Messages

import valkey, json, time
client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)

# XADD - append to stream
entry_id = client.xadd("ai:tasks", {
    "type": "embedding",
    "text": "Hello world",
    "model": "text-embedding-3-small",
})
print(f"Added: {entry_id}")

# With MAXLEN cap
client.xadd("ai:tasks", {"type": "completion"}, maxlen=10000)

Step 2: Read Messages

# XREAD - blocking read for new messages
entries = client.xread({"ai:tasks": "0-0"}, count=10, block=5000)
for stream, messages in entries:
    for msg_id, data in messages:
        print(f"[{msg_id}] {data}")

# XRANGE - replay from beginning
messages = client.xrange("ai:tasks", min="-", max="+", count=5)

# XREVRANGE - newest first
latest = client.xrevrange("ai:tasks", max="+", min="-", count=1)

Step 3: Stream Management

length = client.xlen("ai:tasks")
info = client.xinfo_stream("ai:tasks")
client.xtrim("ai:tasks", maxlen=1000)

Step 4: Consumer Loop

def consume(stream_key):
    last_id = "$"  # Only new messages
    while True:
        entries = client.xread({stream_key: last_id}, count=10, block=2000)
        if not entries: continue
        for stream, messages in entries:
            for msg_id, data in messages:
                print(f"Processing: {data}")
                last_id = msg_id

Key: Use "$" for new messages only, "0-0" to replay from start.

Command Purpose Latency
XADD Append message ~0.1ms
XREAD BLOCK Wait for new ~0.1ms + block
XRANGE Read range / replay ~0.1ms
XLEN Stream length ~0.1ms
XTRIM Cap stream size ~0.1ms