Consumer Groups
Why Consumer Groups?
With plain XREAD, every consumer gets every message. With consumer groups, messages are distributed - enabling parallel processing of AI tasks like embeddings, completions, and tool calls.
Step 1: Create a Group
import valkey
client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)
try:
client.xgroup_create("ai:tasks", "workers", id="0", mkstream=True)
except valkey.ResponseError:
pass # Group already exists
Step 2: Consume from Group
def worker(worker_name: str):
while True:
entries = client.xreadgroup(
"workers", worker_name,
{"ai:tasks": ">"},
count=5, block=2000,
)
if not entries: continue
for stream, messages in entries:
for msg_id, data in messages:
print(f"[{worker_name}] Processing: {data}")
# Process the task...
client.xack("ai:tasks", "workers", msg_id)
Step 3: Handle Pending Messages
# Check unacknowledged messages
pending = client.xpending("ai:tasks", "workers")
print(f"Pending: {pending}")
# Claim stuck messages (idle > 30s)
claimed = client.xclaim(
"ai:tasks", "workers", "worker-2",
min_idle_time=30000,
message_ids=["1710000000000-0"],
)
At-least-once delivery: Messages stay pending until XACK. If a worker crashes, use XPENDING + XCLAIM to reassign to a healthy worker.
| Command | Purpose |
|---|---|
XGROUP CREATE |
Create consumer group |
XREADGROUP |
Read as group member |
XACK |
Acknowledge processing |
XPENDING |
List unacknowledged |
XCLAIM |
Reassign stuck messages |