IntermediatePython~20 min

RAG Pipeline with Haystack + Valkey

Build a full retrieval-augmented generation pipeline with Valkey as the vector store. Embed your documents once, then answer questions grounded in your data - not the LLM's training set.

How RAG Works

User question
  → embed question
  → find similar docs in Valkey (KNN)
  → inject docs into prompt
  → LLM generates grounded answer

Valkey handles the retrieval step - sub-millisecond KNN over your document embeddings.

Step 1: Indexing Pipeline

Run this once to embed and store your documents:

from haystack import Pipeline, Document
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter
from haystack_integrations.document_stores.valkey import ValkeyDocumentStore

document_store = ValkeyDocumentStore(
    nodes_list=[("localhost", 6379)],
    index_name="rag_docs",
    embedding_dim=768,
    distance_metric="cosine",
)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component(
    "embedder",
    SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-mpnet-base-v2")
)
indexing_pipeline.add_component("writer", DocumentWriter(document_store=document_store))
indexing_pipeline.connect("embedder.documents", "writer.documents")

# Your documents - swap this for file loaders, web crawlers, etc.
docs = [
    Document(content="Valkey supports vector search natively via its module system."),
    Document(content="The ValkeyDocumentStore integrates directly with Haystack pipelines."),
    Document(content="Cosine similarity measures the angle between two embedding vectors."),
    Document(content="RAG grounds LLM responses in retrieved facts, reducing hallucinations."),
    Document(content="Haystack pipelines are composable - swap any component without rewriting the rest."),
]

indexing_pipeline.run({"embedder": {"documents": docs}})
print(f"Indexed {document_store.count_documents()} documents")

Step 2: Query Pipeline

Wire together embedding, retrieval, prompt building, and LLM generation:

from haystack import Pipeline
from haystack.utils import Secret
from haystack.dataclasses import ChatMessage
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack_integrations.components.retrievers.valkey import ValkeyEmbeddingRetriever

prompt_template = [
    ChatMessage.from_system(
        "Answer the question using only the provided context. "
        "If the context doesn't contain the answer, say 'I don't know'."
    ),
    ChatMessage.from_user(
        "Context:\n{% for doc in documents %}{{ doc.content }}\n{% endfor %}\n"
        "Question: {{query}}"
    ),
]

query_pipeline = Pipeline()
query_pipeline.add_component(
    "text_embedder",
    SentenceTransformersTextEmbedder(model="sentence-transformers/all-mpnet-base-v2")
)
query_pipeline.add_component(
    "retriever",
    ValkeyEmbeddingRetriever(document_store=document_store, top_k=3)
)
query_pipeline.add_component(
    "prompt_builder",
    ChatPromptBuilder(template=prompt_template, required_variables=["query", "documents"])
)
query_pipeline.add_component(
    "generator",
    OpenAIChatGenerator(
        api_key=Secret.from_env_var("OPENAI_API_KEY"),
        model="gpt-4o"
    )
)

query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever.documents", "prompt_builder.documents")
query_pipeline.connect("prompt_builder.messages", "generator.messages")

Step 3: Ask a Question

query = "How does Valkey integrate with Haystack?"

result = query_pipeline.run({
    "text_embedder": {"text": query},
    "prompt_builder": {"query": query},
})

print(result["generator"]["replies"][0].content)

Output:

The ValkeyDocumentStore integrates directly with Haystack pipelines,
allowing you to store document embeddings in Valkey and retrieve them
using the ValkeyEmbeddingRetriever component.

Pipeline Architecture

The indexing and query pipelines are intentionally separate. You index once (or on a schedule), then query thousands of times. Valkey's in-memory KNN means retrieval adds ~1ms to your total latency - negligible compared to the LLM call.

Stage Component What it does
Index SentenceTransformersDocumentEmbedder Generates 768-dim embeddings for each doc
Index DocumentWriter Writes docs + embeddings to Valkey
Query SentenceTransformersTextEmbedder Embeds the user's question
Query ValkeyEmbeddingRetriever KNN search - returns top-k similar docs
Query ChatPromptBuilder Injects retrieved docs into the prompt
Query OpenAIChatGenerator Generates the final grounded answer

Swap the Embedding Model

Any Haystack-compatible embedder works. Just keep embedding_dim consistent between the document store and both embedders:

# OpenAI embeddings (1536-dim)
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder

document_store = ValkeyDocumentStore(
    nodes_list=[("localhost", 6379)],
    index_name="rag_docs_openai",
    embedding_dim=1536,  # text-embedding-3-small
    distance_metric="cosine",
)