AdvancedPython~20 min

Hierarchical Rate Limiting

The Problem

Acme Corp (org):     1,000,000 tokens/hour
  ├─ Engineering:      500,000 tokens/hour
  │   ├─ alice:        100,000 tokens/hour
  │   │   ├─ agent-1:   25,000 tokens/hour
  │   │   └─ agent-2:   25,000 tokens/hour
  │   └─ bob:          100,000 tokens/hour
  └─ Marketing:       200,000 tokens/hour

Alice's agent-1 calls GPT-4. We check:
  1. Is the org under 1M tokens? ✅
  2. Is engineering under 500K? ✅
  3. Is alice under 100K? ✅
  4. Is agent-1 under 25K? ❌ → DENIED

Step 1: Define Tiers

from dataclasses import dataclass

@dataclass
class Tier:
    name: str           # "org", "team", "user", "agent", "model"
    identifier: str     # "acme-corp", "engineering", "alice"
    max_requests: int
    max_tokens: int
    window_seconds: int

tiers = [
    Tier("org", "acme-corp", 10000, 1_000_000, 3600),
    Tier("team", "engineering", 5000, 500_000, 3600),
    Tier("user", "alice", 1000, 100_000, 3600),
    Tier("agent", "agent-research-1", 200, 25_000, 3600),
]

Step 2: Cascading Check with Pipeline

The key insight: use a Valkey pipeline to check all tiers in a single round-trip:

def hierarchical_check(tiers: list, tokens: int) -> dict:
    now = time.time()

    # Phase 1: Read all counters in one pipeline
    pipe = client.pipeline(transaction=False)
    for tier in tiers:
        window_num = int(now // tier.window_seconds)
        pipe.get(f"hier:{tier.name}:{tier.identifier}:req:{window_num}")
        pipe.get(f"hier:{tier.name}:{tier.identifier}:tok:{window_num}")
    results = pipe.execute()

    # Phase 2: Check each tier
    blocked_by = None
    for i, tier in enumerate(tiers):
        current_req = int(results[i * 2] or 0)
        current_tok = int(results[i * 2 + 1] or 0)
        if current_req + 1 > tier.max_requests or current_tok + tokens > tier.max_tokens:
            blocked_by = tier.name
            break

    # Phase 3: If allowed, increment all tiers atomically
    if not blocked_by:
        pipe = client.pipeline(transaction=True)
        for tier in tiers:
            window_num = int(now // tier.window_seconds)
            req_key = f"hier:{tier.name}:{tier.identifier}:req:{window_num}"
            tok_key = f"hier:{tier.name}:{tier.identifier}:tok:{window_num}"
            pipe.incr(req_key); pipe.expire(req_key, tier.window_seconds)
            pipe.incrby(tok_key, tokens); pipe.expire(tok_key, tier.window_seconds)
        pipe.execute()

    return {"allowed": blocked_by is None, "blocked_by": blocked_by}

Performance: Pipelines keep it to 2 round trips regardless of tier count. 5 tiers = 10 GET + 20 INCR/EXPIRE = ~0.5ms total.

Performance

Tiers Pipeline Commands Round Trips Latency
3 6 GET + 12 INCR/EXPIRE 2 ~0.3ms
5 10 GET + 20 INCR/EXPIRE 2 ~0.5ms
7 14 GET + 28 INCR/EXPIRE 2 ~0.7ms