Streaming LLM Tokens
The Problem
When streaming from an LLM (OpenAI, Anthropic, etc.), tokens arrive one at a time. If you have multiple clients watching the same response, you need to fan out each token to all of them. Valkey Pub/Sub is perfect for this:
# Architecture:
# LLM API ──▶ Your Server ──▶ Valkey PUBLISH ──▶ Client A
# (token) channel: llm:{id} ──▶ Client B
# ──▶ Client C
Step 1: Token Publisher (Server Side)
import valkey
import json
import time
import uuid
client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)
def stream_llm_response(prompt: str, request_id: str = None):
"""Simulate an LLM streaming response, publishing each token."""
request_id = request_id or str(uuid.uuid4())[:8]
channel = f"llm:stream:{request_id}"
# Notify clients that streaming has started
client.publish(channel, json.dumps({
"type": "start",
"request_id": request_id,
"model": "gpt-4",
"timestamp": time.time(),
}))
# Simulate token-by-token streaming
tokens = ["Valkey", " is", " an", " open", "-source",
" in", "-memory", " data", " store", "."]
for i, token in enumerate(tokens):
# In production, this would come from the LLM API:
# for chunk in openai.chat.completions.create(stream=True):
# token = chunk.choices[0].delta.content
client.publish(channel, json.dumps({
"type": "token",
"content": token,
"index": i,
"timestamp": time.time(),
}))
time.sleep(0.05) # Simulate token delay
# Notify clients that streaming is complete
client.publish(channel, json.dumps({
"type": "end",
"request_id": request_id,
"total_tokens": len(tokens),
"timestamp": time.time(),
}))
return request_id
# Usage
rid = stream_llm_response("What is Valkey?")
print(f"Streamed response: {rid}")
Step 2: Token Subscriber (Client Side)
import valkey
import json
client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)
def watch_stream(request_id: str):
"""Subscribe to an LLM response stream and reassemble tokens."""
pubsub = client.pubsub()
channel = f"llm:stream:{request_id}"
pubsub.subscribe(channel)
full_response = []
for message in pubsub.listen():
if message["type"] != "message":
continue
data = json.loads(message["data"])
if data["type"] == "start":
print(f"Stream started (model: {data['model']})")
elif data["type"] == "token":
token = data["content"]
full_response.append(token)
print(token, end="", flush=True) # Print token-by-token
elif data["type"] == "end":
print(f"\n--- Complete ({data['total_tokens']} tokens) ---")
break
pubsub.unsubscribe(channel)
return "".join(full_response)
# Run in another terminal:
# result = watch_stream("abc123")
# Output: Valkey is an open-source in-memory data store.
Step 3: OpenAI Integration
Here's the real-world version with the OpenAI API:
from openai import OpenAI
openai_client = OpenAI()
def stream_openai_to_valkey(prompt: str, request_id: str):
"""Stream from OpenAI API → Valkey Pub/Sub → all subscribers."""
channel = f"llm:stream:{request_id}"
client.publish(channel, json.dumps({"type": "start"}))
stream = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
token_count = 0
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
client.publish(channel, json.dumps({
"type": "token",
"content": content,
"index": token_count,
}))
token_count += 1
client.publish(channel, json.dumps({
"type": "end",
"total_tokens": token_count,
}))
Multi-Client Fan-Out
What makes this work: Every PUBLISH is delivered to ALL subscribers on that channel simultaneously. If 100 clients are watching the same LLM response, all 100 receive each token at the same time - with zero additional cost. This is pure O(N) fan-out handled entirely by Valkey.
| Metric | Value |
|---|---|
| Publish latency (per token) | ~0.1ms |
| Delivery to 1 subscriber | ~0.1ms |
| Delivery to 100 subscribers | ~0.5ms |
| Max subscribers per channel | Unlimited |
| Message size limit | 512MB (practical: keep small) |