BeginnerPython~15 min

Getting Started with Pub/Sub

What is Pub/Sub?

Valkey Pub/Sub is a fire-and-forget messaging system. Publishers send messages to channels , and any number of subscribers listening on that channel receive them instantly. It's perfect for:

Pub/Sub vs Streams: Pub/Sub is ephemeral - if no one is listening, messages are lost. Valkey Streams (Cookbook 03) are durable - messages persist and can be replayed. Use Pub/Sub for live broadcasting, Streams for guaranteed delivery.

Prerequisites

Step 1: Start Valkey

docker run -d --name valkey -p 6379:6379 valkey/valkey:latest

Step 2: Install Dependencies

pip install valkey

Step 3: Create a Publisher

import valkey
import time
import json

client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)

def publish_message(channel: str, message: dict):
    """Publish a JSON message to a channel."""
    payload = json.dumps(message)
    num_subscribers = client.publish(channel, payload)
    print(f"Published to {channel} → {num_subscribers} subscriber(s)")
    return num_subscribers

# Publish some messages
publish_message("ai:events", {
    "type": "prediction",
    "model": "gpt-4",
    "result": "positive",
    "confidence": 0.95,
    "timestamp": time.time(),
})
# Published to ai:events → 0 subscriber(s)  (no one listening yet)

Step 4: Create a Subscriber

Run this in a separate terminal - subscribers block while waiting for messages:

import valkey
import json

client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)

# Create a Pub/Sub object
pubsub = client.pubsub()

# Subscribe to the channel
pubsub.subscribe("ai:events")
print("Subscribed to ai:events - waiting for messages...")

# Listen for messages (blocks)
for message in pubsub.listen():
    if message["type"] == "message":
        data = json.loads(message["data"])
        print(f"Received: {data}")

# Output when publisher sends a message:
# Received: {'type': 'prediction', 'model': 'gpt-4', 'result': 'positive', ...}

Step 5: Pattern Subscriptions

Subscribe to multiple channels using glob patterns:

# Subscribe to ALL ai:* channels at once
pubsub.psubscribe("ai:*")

# This matches:
#   ai:events
#   ai:predictions
#   ai:agent:tool_calls
#   ai:llm:tokens

for message in pubsub.listen():
    if message["type"] == "pmessage":
        channel = message["channel"]
        data = json.loads(message["data"])
        print(f"[{channel}] {data}")

Step 6: Non-Blocking Subscriber

For applications that need to do other work while listening:

import threading

def message_handler(message):
    """Called for each message received."""
    if message["type"] == "message":
        data = json.loads(message["data"])
        print(f"Handler: {data}")

# Subscribe with a callback (non-blocking)
pubsub.subscribe(**{"ai:events": message_handler})

# Run in background thread
thread = pubsub.run_in_thread(sleep_time=0.01)
print("Subscriber running in background")

# Do other work...
time.sleep(10)

# Stop when done
thread.stop()

How It Works

Operation Valkey Command Latency
Publish message PUBLISH channel message ~0.1ms
Subscribe to channel SUBSCRIBE channel ~0.1ms
Pattern subscribe PSUBSCRIBE pattern ~0.1ms
Unsubscribe UNSUBSCRIBE channel ~0.1ms
Check active channels PUBSUB CHANNELS ~0.1ms
Count subscribers PUBSUB NUMSUB channel ~0.1ms

Next up: In the next cookbook, we'll use Pub/Sub to stream LLM tokens in real-time - broadcasting each token as it arrives from the model to multiple connected clients.

Next → 02 - Streaming LLM Tokens