AdvancedPython~25 min

Production Patterns

Pattern 1: Feature Freshness Monitoring

Every feature Hash includes an _updated_at timestamp. Use it to detect stale features before they poison your model:

import valkey
import time

client = valkey.Valkey(host="localhost", port=6379, decode_responses=True)

def check_freshness(feature_view: str, entity_id: str, threshold_seconds: float = 300):
    """Check if features are fresh enough for inference."""
    key = f"fs:v1:{feature_view}:{entity_id}"
    updated_at = client.hget(key, "_updated_at")

    if updated_at is None:
        return {"status": "missing", "age_seconds": None}

    age = time.time() - float(updated_at)

    return {
        "status": "fresh" if age < threshold_seconds else "stale",
        "age_seconds": round(age, 1),
        "threshold": threshold_seconds,
    }

# Check a single entity
result = check_freshness("user_risk_profile", "user_001", threshold_seconds=60)
print(result)
# {'status': 'fresh', 'age_seconds': 12.3, 'threshold': 60}

Batch Freshness Check

def batch_freshness_check(feature_view: str, entity_ids: list, threshold: float = 300):
    """Check freshness for multiple entities in one pipeline."""
    pipe = client.pipeline(transaction=False)
    for eid in entity_ids:
        pipe.hget(f"fs:v1:{feature_view}:{eid}", "_updated_at")

    results = pipe.execute()
    now = time.time()
    report = {"fresh": 0, "stale": 0, "missing": 0, "stale_entities": []}

    for eid, updated_at in zip(entity_ids, results):
        if updated_at is None:
            report["missing"] += 1
        elif now - float(updated_at) > threshold:
            report["stale"] += 1
            report["stale_entities"].append(eid)
        else:
            report["fresh"] += 1

    return report

# Check 100 entities
ids = [f"user_{i:03d}" for i in range(100)]
report = batch_freshness_check("user_risk_profile", ids, threshold=60)
print(report)
# {'fresh': 85, 'stale': 10, 'missing': 5, 'stale_entities': [...]}

With the Library

from src import ValkeyFeatureStore, Entity, FeatureView, Feature, FeatureType

store = ValkeyFeatureStore(host="localhost", port=6379)
# ... register feature views ...

# The monitor module provides freshness checking
fv = store.get_feature_view("user_risk_profile")
report = store.monitor.check_freshness(
    fv,
    entity_ids=["user_001", "user_002"],
    threshold_seconds=300,
)
print(report)

Pattern 2: Health Checks

Add health checks to your deployment to detect issues early:

def health_check() -> dict:
    """Full health check for the feature store."""
    health = {"status": "healthy", "checks": {}}

    # 1. Valkey connectivity
    try:
        start = time.time()
        client.ping()
        latency = (time.time() - start) * 1000
        health["checks"]["valkey_ping"] = {
            "status": "ok",
            "latency_ms": round(latency, 2),
        }
    except Exception as e:
        health["status"] = "unhealthy"
        health["checks"]["valkey_ping"] = {"status": "failed", "error": str(e)}

    # 2. Write/read test
    try:
        test_key = "fs:health_check"
        client.set(test_key, "ok", ex=10)
        val = client.get(test_key)
        health["checks"]["write_read"] = {"status": "ok" if val == "ok" else "failed"}
    except Exception as e:
        health["status"] = "unhealthy"
        health["checks"]["write_read"] = {"status": "failed", "error": str(e)}

    # 3. Memory usage
    try:
        info = client.info("memory")
        used_mb = info["used_memory"] / (1024 * 1024)
        max_mb = info.get("maxmemory", 0) / (1024 * 1024)
        health["checks"]["memory"] = {
            "used_mb": round(used_mb, 1),
            "max_mb": round(max_mb, 1) if max_mb > 0 else "unlimited",
            "status": "ok",
        }
    except Exception as e:
        health["checks"]["memory"] = {"status": "failed"}

    return health

print(health_check())
# {'status': 'healthy', 'checks': {'valkey_ping': {'status': 'ok', 'latency_ms': 0.12}, ...}}

With the Library

# Built-in health check
health = store.health()
print(health)

# Full info including registered views
info = store.info()
print(info)
# {'registered_views': ['user_profile', 'user_risk_profile'], 'health': {...}}

Pattern 3: Feature Versioning

When your feature schema changes, use versioned key prefixes to migrate safely:

# Current: fs:v1:user_profile:user_001
# New:     fs:v2:user_profile:user_001

# Step 1: Create v2 feature view with new schema
user_features_v2 = FeatureView(
    name="user_profile",
    entity=user,
    features=[
        Feature("age", FeatureType.INT),
        Feature("lifetime_value", FeatureType.FLOAT),
        Feature("segment", FeatureType.STRING),
        Feature("churn_risk", FeatureType.FLOAT),  # ← New feature
    ],
    version="v2",  # ← Version bump
    ttl=3600,
)

# Step 2: Dual-write during migration
def dual_write(entity_id, features, features_v2):
    """Write to both v1 and v2 during migration."""
    pipe = client.pipeline(transaction=False)

    # Write v1
    key_v1 = f"fs:v1:user_profile:{entity_id}"
    pipe.hset(key_v1, mapping={k: str(v) for k, v in features.items()})
    pipe.expire(key_v1, 3600)

    # Write v2 (includes new fields)
    key_v2 = f"fs:v2:user_profile:{entity_id}"
    pipe.hset(key_v2, mapping={k: str(v) for k, v in features_v2.items()})
    pipe.expire(key_v2, 3600)

    pipe.execute()

# Step 3: Migrate readers to v2
# Step 4: Stop writing to v1
# Step 5: Let v1 keys expire naturally (TTL)

Zero-downtime migration: Because Valkey keys include the version prefix, v1 and v2 coexist safely. Dual-write during migration, switch readers, then let v1 expire. No data loss, no downtime.

Pattern 4: Connection Pooling

In production, always use connection pooling to avoid creating a new TCP connection per request:

import valkey

# Create a connection pool (once at app startup)
pool = valkey.ConnectionPool(
    host="localhost",
    port=6379,
    max_connections=50,    # Max concurrent connections
    decode_responses=True,
    socket_connect_timeout=5,
    socket_timeout=2,
    retry_on_timeout=True,
)

# Create client from pool (cheap - reuses connections)
client = valkey.Valkey(connection_pool=pool)

# Use with the feature store library
store = ValkeyFeatureStore(client=client)

# Check pool stats
print(f"Pool connections: {len(pool._available_connections)}")

Pattern 5: Error Handling & Fallbacks

Handle Valkey failures gracefully - don't let a cache miss crash your prediction:

def safe_read_features(store, view_name: str, entity_id: str, defaults: dict = None):
    """Read features with error handling and fallback defaults."""
    try:
        features = store.read(view_name, entity_id)
        if features:
            return features
    except redis.ConnectionError:
        # Valkey is down - use defaults
        print(f"⚠️ Valkey connection error, using defaults")
    except redis.TimeoutError:
        # Valkey is slow - use defaults
        print(f"⚠️ Valkey timeout, using defaults")
    except Exception as e:
        print(f"⚠️ Unexpected error: {e}")

    # Return defaults so the model can still make a prediction
    return defaults or {}

# Usage
features = safe_read_features(store, "user_risk_profile", "user_001",
    defaults={"txn_count_1h": 0, "avg_txn_amount": 50.0, "fraud_score": 0.5})

Pattern 6: Observability Metrics

Track key metrics for your feature store in production:

def get_store_metrics() -> dict:
    """Collect feature store metrics for monitoring dashboards."""
    info = client.info()

    return {
        # Throughput
        "ops_per_second": info.get("instantaneous_ops_per_sec", 0),

        # Memory
        "memory_used_mb": round(info["used_memory"] / 1048576, 1),
        "memory_peak_mb": round(info["used_memory_peak"] / 1048576, 1),

        # Connections
        "connected_clients": info.get("connected_clients", 0),

        # Keys
        "total_keys": info.get("db0", {}).get("keys", 0),

        # Hit rate (if using Valkey as cache too)
        "keyspace_hits": info.get("keyspace_hits", 0),
        "keyspace_misses": info.get("keyspace_misses", 0),
    }

metrics = get_store_metrics()
print(metrics)
# {'ops_per_second': 1250, 'memory_used_mb': 45.2, ...}

Production Checklist

Area Pattern Recommendation
Connections Connection pooling Use ConnectionPool with max_connections=50
Timeouts Socket timeouts Set socket_timeout=2, socket_connect_timeout=5
Retries Retry on timeout Enable retry_on_timeout=True
Freshness Staleness detection Check _updated_at before inference, alert on stale
TTL Feature expiry Set TTL on all feature views (e.g., 1h–24h)
Versioning Key prefix versions Use fs:v1:, fs:v2: for schema changes
Fallbacks Default values Always have sensible defaults for model features
Monitoring Health checks Ping + write/read test on /health endpoint
Memory Maxmemory policy Set maxmemory-policy allkeys-lru in production
Persistence RDB/AOF Enable RDB snapshots for feature store durability

Congratulations! You've completed all 6 cookbooks. You now know how to build, serve, stream, integrate, and operate a production feature store with Valkey. Check out the interactive demo to experiment with these patterns in your browser.