ML Integration
The Prediction Pipeline
At inference time, your model needs a feature vector. The pipeline looks like:
# Request → Fetch Features → Build Vector → Model.predict() → Response
# ↑ ↑
# Valkey (~0.1ms) scikit-learn (~1ms)
The goal: keep feature fetching so fast that it's invisible compared to model inference.
Example 1: Fraud Detection with scikit-learn
A fraud model trained on 4 features. At inference, we fetch them from Valkey and predict:
import numpy as np
from src import ValkeyFeatureStore, Entity, FeatureView, Feature, FeatureType
# Setup store (same as Cookbook 01)
store = ValkeyFeatureStore(host="localhost", port=6379)
user = Entity(name="user", join_keys=["user_id"])
risk_features = FeatureView(
name="user_risk_profile",
entity=user,
features=[
Feature("txn_count_1h", FeatureType.INT),
Feature("avg_txn_amount", FeatureType.FLOAT),
Feature("unique_merchants_24h", FeatureType.INT),
Feature("fraud_score", FeatureType.FLOAT),
],
ttl=86400,
)
store.register(risk_features)
# Seed some features
store.write("user_risk_profile", "user_001", {
"txn_count_1h": 12,
"avg_txn_amount": 85.50,
"unique_merchants_24h": 4,
"fraud_score": 0.15,
})
# ── Inference Time ─────────────────────────────────────────────────
def predict_fraud(user_id: str) -> dict:
"""Fetch features from Valkey and run fraud prediction."""
# 1. Fetch features (~0.1ms)
features = store.read(
"user_risk_profile",
user_id,
["txn_count_1h", "avg_txn_amount", "unique_merchants_24h"],
)
if not features:
return {"error": "No features found"}
# 2. Build NumPy feature vector
vector = np.array([[
features["txn_count_1h"],
features["avg_txn_amount"],
features["unique_merchants_24h"],
]])
# 3. Run model (replace with your trained model)
# prediction = model.predict_proba(vector)[0][1]
# For demo, use a simple heuristic:
score = 0.1
if features["txn_count_1h"] > 10: score += 0.3
if features["avg_txn_amount"] > 200: score += 0.2
if features["unique_merchants_24h"] > 8: score += 0.2
return {
"user_id": user_id,
"fraud_probability": round(score, 3),
"is_fraud": score > 0.5,
"features_used": features,
}
result = predict_fraud("user_001")
print(result)
# {'user_id': 'user_001', 'fraud_probability': 0.4, 'is_fraud': False,
# 'features_used': {'txn_count_1h': 12, 'avg_txn_amount': 85.5, ...}}
Example 2: Multi-View Feature Vector
A recommendation model needs features from both user and item views:
# Register item features
item = Entity(name="item", join_keys=["item_id"])
item_features = FeatureView(
name="item_catalog",
entity=item,
features=[
Feature("price", FeatureType.FLOAT),
Feature("category", FeatureType.STRING),
Feature("popularity", FeatureType.FLOAT),
],
ttl=86400,
)
store.register(item_features)
# Write item features
store.write("item_catalog", "item_042", {
"price": 29.99,
"category": "electronics",
"popularity": 0.85,
})
# ── Recommendation inference ──────────────────────────────────────
def score_item_for_user(user_id: str, item_id: str) -> float:
"""Score how likely a user is to click on an item."""
# Fetch user features
user_feats = store.read("user_risk_profile", user_id,
["avg_txn_amount"])
# Fetch item features
item_feats = store.read("item_catalog", item_id,
["price", "popularity"])
if not user_feats or not item_feats:
return 0.0
# Simple scoring: users who spend more prefer pricier items
price_affinity = min(1.0, user_feats["avg_txn_amount"] / (item_feats["price"] * 10))
score = price_affinity * 0.4 + item_feats["popularity"] * 0.6
return round(score, 3)
# Score candidates
items = ["item_042", "item_043", "item_044"]
scores = [(item_id, score_item_for_user("user_001", item_id)) for item_id in items]
ranked = sorted(scores, key=lambda x: x[1], reverse=True)
print(ranked)
Example 3: FastAPI Prediction Endpoint
Serve predictions via a REST API with feature fetching built in:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import time
app = FastAPI()
# Initialize store once at startup
store = ValkeyFeatureStore(host="localhost", port=6379)
# ... register feature views ...
class PredictionResponse(BaseModel):
user_id: str
fraud_probability: float
is_fraud: bool
latency_ms: float
@app.get("/predict/fraud/{user_id}")
async def predict_fraud_endpoint(user_id: str):
start = time.time()
# Fetch features from Valkey
features = store.read(
"user_risk_profile", user_id,
["txn_count_1h", "avg_txn_amount", "unique_merchants_24h"],
)
if not features:
raise HTTPException(status_code=404, detail="No features found")
# Run model prediction
score = 0.1
if features.get("txn_count_1h", 0) > 10: score += 0.3
if features.get("avg_txn_amount", 0) > 200: score += 0.2
if features.get("unique_merchants_24h", 0) > 8: score += 0.2
elapsed = (time.time() - start) * 1000
return PredictionResponse(
user_id=user_id,
fraud_probability=round(score, 3),
is_fraud=score > 0.5,
latency_ms=round(elapsed, 2),
)
# Run: uvicorn app:app --reload
# Test: curl localhost:8000/predict/fraud/user_001
Example 4: LLM Context Enrichment
Fetch user features to personalize LLM prompts:
def build_personalized_prompt(user_id: str, question: str) -> str:
"""Fetch user features and build a personalized LLM prompt."""
# Fetch user context from Valkey
features = store.read("user_risk_profile", user_id)
if features:
context = f"""User context:
- Average transaction: ${features.get('avg_txn_amount', 'unknown')}
- Transaction frequency: {features.get('txn_count_1h', 'unknown')}/hour
- Merchant diversity: {features.get('unique_merchants_24h', 'unknown')} unique merchants"""
else:
context = "No user context available."
prompt = f"""{context}
User question: {question}
Please provide a personalized response based on the user's profile."""
return prompt
# Usage
prompt = build_personalized_prompt("user_001", "Should I upgrade my account?")
print(prompt)
# Pass to your LLM: response = openai.chat(prompt)
Embedding Vectors
The feature store supports FeatureType.VECTOR for storing and retrieving embeddings:
# Define a view with vector features
embedding_view = FeatureView(
name="user_embeddings",
entity=user,
features=[
Feature("embedding", FeatureType.VECTOR),
Feature("model_version", FeatureType.STRING),
],
ttl=604800, # 7 days
)
store.register(embedding_view)
# Write an embedding vector
store.write("user_embeddings", "user_001", {
"embedding": [0.1, 0.23, -0.05, 0.87, 0.42], # list → comma-separated
"model_version": "v2.1",
})
# Read embedding back (auto-deserializes to list of floats)
result = store.read("user_embeddings", "user_001")
print(result["embedding"])
# [0.1, 0.23, -0.05, 0.87, 0.42]
# Convert to NumPy for model input
embedding = np.array(result["embedding"])
print(embedding.shape)
# (5,)
Vector storage: Vectors are serialized as comma-separated strings in the Hash field (e.g., "0.1,0.23,-0.05,0.87,0.42"). When read back, FeatureType.VECTOR automatically deserializes to list[float]. For large vectors (768+ dimensions), consider using Valkey's native vector search module instead.
Performance Summary
| Integration | Feature Fetch | Model Inference | Total |
|---|---|---|---|
| Fraud detection (4 features) | ~0.1ms | ~1ms | ~1.1ms |
| Recommendation (2 views) | ~0.15ms | ~0.5ms | ~0.65ms |
| Batch scoring (100 entities) | ~0.3ms | ~10ms | ~10.3ms |
| LLM context (user profile) | ~0.1ms | ~500ms (LLM) | ~500ms |
Next up: Learn production patterns - feature freshness monitoring, versioning, health checks, and observability.