Production Patterns
Pattern 1: Backpressure with MAXLEN
import valkey, time
client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)
# Approximate trim (faster, recommended)
client.xadd("ai:tasks", {"data": "..."}, maxlen=10000)
# Time-based trim (remove entries older than 1 hour)
cutoff_ms = int((time.time() - 3600) * 1000)
client.xtrim("ai:tasks", minid=cutoff_ms)
Pattern 2: Monitoring with XINFO
def stream_health(stream_key):
info = client.xinfo_stream(stream_key)
groups = client.xinfo_groups(stream_key)
return {
"length": info["length"],
"groups": len(groups),
"consumers": sum(g["consumers"] for g in groups),
"pending": sum(g["pending"] for g in groups),
}
print(stream_health("ai:tasks"))
Pattern 3: Resilient Pub/Sub Subscriber
def resilient_subscriber(channel):
while True:
try:
c = valkey.Valkey(host="localhost", decode_responses=True)
ps = c.pubsub()
ps.subscribe(channel)
print(f"Connected to {channel}")
for msg in ps.listen():
if msg["type"] == "message":
print(msg["data"])
except redis.ConnectionError:
print("Disconnected, reconnecting in 2s...")
time.sleep(2)
Pattern 4: Dead Letter Queue
def process_with_dlq(stream, group, consumer, max_retries=3):
entries = client.xreadgroup(group, consumer, {stream: ">"}, count=10, block=2000)
if not entries: return
for s, messages in entries:
for msg_id, data in messages:
try:
process(data)
client.xack(stream, group, msg_id)
except Exception:
# After max retries, move to DLQ
pending = client.xpending_range(stream, group, msg_id, msg_id, 1)
if pending and pending[0]["times_delivered"] >= max_retries:
client.xadd(f"{stream}:dlq", data)
client.xack(stream, group, msg_id)
Production Checklist
| Area | Recommendation |
|---|---|
| Backpressure | Always use MAXLEN on XADD |
| Monitoring | Track stream length, pending count, consumer lag |
| Reconnection | Wrap Pub/Sub listeners in retry loops |
| Acknowledgments | Always XACK after processing |
| Dead letters | Move failed messages to DLQ after N retries |
| Scaling | Add consumers to groups for horizontal scale |
| Memory | Use XTRIM or MAXLEN to bound stream size |
Congrats! You completed all 6 Pub/Sub & Streaming cookbooks. Check out the interactive demo to experiment.