Loading learning content...
Key-value stores aren't a universal solution—they're a specialized tool that excels at specific access patterns. Understanding where they shine (and where they don't) is crucial for making sound architectural decisions.
The key-value model excels when your access pattern is fundamentally lookup by identifier—you know the key, you want the value. When your queries become more complex (search by attributes, aggregate across entities, join related data), you're fighting against the model rather than leveraging it.
By the end of this page, you will understand the primary use cases for key-value stores, see production-grade implementations for each, and learn to recognize when key-value stores are the right (and wrong) choice.
The canonical use case. Caching is the most common application of key-value stores. The pattern is simple: store expensive-to-compute results using a predictable key, serve from cache on subsequent requests.
Why key-value stores excel at caching:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
class CacheManager: """Production caching patterns with key-value stores.""" def __init__(self, redis_client, default_ttl=3600): self.redis = redis_client self.default_ttl = default_ttl # Pattern 1: Simple cache-aside def get_user_cached(self, user_id: str) -> dict: """Lazy-load caching: check cache, fallback to DB.""" cache_key = f"cache:user:{user_id}" # Try cache first cached = self.redis.get(cache_key) if cached: return json.loads(cached) # Cache miss: load from database user = self.database.get_user(user_id) if user: self.redis.setex(cache_key, self.default_ttl, json.dumps(user)) return user # Pattern 2: Query result caching def get_search_results_cached(self, query: str, page: int) -> list: """Cache expensive search queries.""" # Include query params in cache key cache_key = f"cache:search:{hash(query)}:page:{page}" cached = self.redis.get(cache_key) if cached: return json.loads(cached) # Execute expensive search results = self.search_engine.search(query, page) # Cache with shorter TTL for dynamic content self.redis.setex(cache_key, 300, json.dumps(results)) # 5 min return results # Pattern 3: Computed value caching def get_dashboard_stats(self, org_id: str) -> dict: """Cache expensive aggregations.""" cache_key = f"cache:stats:org:{org_id}" cached = self.redis.get(cache_key) if cached: return json.loads(cached) # Expensive computation stats = { "total_users": self.db.count_users(org_id), "active_today": self.db.count_active_today(org_id), "revenue_mtd": self.db.sum_revenue_mtd(org_id), } self.redis.setex(cache_key, 600, json.dumps(stats)) # 10 min return statsUse TTL-based expiration for most caches. For critical consistency, invalidate explicitly on writes. Consider cache stampede protection (lock while regenerating) for expensive computations with high traffic.
Stateless servers, stateful sessions. Web applications need to maintain user state across requests. Key-value stores provide the perfect session backend: fast reads on every request, automatic expiration, and centralized storage for server clusters.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
import uuidimport jsonfrom datetime import datetime class SessionStore: """Centralized session management for web applications.""" def __init__(self, redis_client, session_ttl=86400): # 24 hours self.redis = redis_client self.session_ttl = session_ttl def create_session(self, user_id: str, metadata: dict = None) -> str: """Create new session, return session token.""" session_id = str(uuid.uuid4()) session_key = f"session:{session_id}" session_data = { "user_id": user_id, "created_at": datetime.now().isoformat(), "last_access": datetime.now().isoformat(), **(metadata or {}) } self.redis.setex( session_key, self.session_ttl, json.dumps(session_data) ) # Track user's active sessions self.redis.sadd(f"user:{user_id}:sessions", session_id) return session_id def get_session(self, session_id: str) -> dict | None: """Retrieve and refresh session.""" session_key = f"session:{session_id}" data = self.redis.get(session_key) if not data: return None session = json.loads(data) # Refresh TTL on access (sliding expiration) session["last_access"] = datetime.now().isoformat() self.redis.setex(session_key, self.session_ttl, json.dumps(session)) return session def destroy_session(self, session_id: str) -> None: """Explicitly end a session (logout).""" session_key = f"session:{session_id}" data = self.redis.get(session_key) if data: session = json.loads(data) user_id = session.get("user_id") if user_id: self.redis.srem(f"user:{user_id}:sessions", session_id) self.redis.delete(session_key) def destroy_all_user_sessions(self, user_id: str) -> int: """Logout user from all devices.""" sessions_key = f"user:{user_id}:sessions" session_ids = self.redis.smembers(sessions_key) for session_id in session_ids: self.redis.delete(f"session:{session_id}") self.redis.delete(sessions_key) return len(session_ids)| Storage | Latency | Scalability | Persistence | Best For |
|---|---|---|---|---|
| Cookie-based | None (client) | Unlimited | Client-side | Stateless tokens (JWT) |
| Server memory | ~1μs | Single server | None | Development only |
| Redis | ~100μs | Horizontal | Optional | Production clusters |
| Database | ~1-5ms | Horizontal | Full | Long-term audit needs |
Protecting APIs from abuse. Rate limiting requires tracking request counts per client with sub-second precision. Key-value stores with atomic increment operations are ideal for this.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
class RateLimiter: """Production rate limiting with Redis.""" def __init__(self, redis_client): self.redis = redis_client def fixed_window(self, client_id: str, limit: int, window_seconds: int) -> tuple[bool, dict]: """ Fixed window rate limiting. Simple but can allow 2x burst at window boundaries. """ window = int(time.time()) // window_seconds key = f"ratelimit:{client_id}:{window}" current = self.redis.incr(key) if current == 1: self.redis.expire(key, window_seconds + 1) allowed = current <= limit return allowed, { "limit": limit, "remaining": max(0, limit - current), "reset": (window + 1) * window_seconds } def sliding_window_log(self, client_id: str, limit: int, window_seconds: int) -> tuple[bool, dict]: """ Sliding window using sorted set. More accurate, prevents boundary bursts. """ key = f"ratelimit:sw:{client_id}" now = time.time() window_start = now - window_seconds pipe = self.redis.pipeline() pipe.zremrangebyscore(key, 0, window_start) pipe.zcard(key) pipe.zadd(key, {f"{now}:{uuid.uuid4().hex[:8]}": now}) pipe.expire(key, window_seconds + 1) results = pipe.execute() count = results[1] allowed = count < limit if not allowed: # Remove the request we just added self.redis.zremrangebyscore(key, now, now + 1) return allowed, { "limit": limit, "remaining": max(0, limit - count - 1) if allowed else 0, "retry_after": window_seconds if not allowed else 0 }Live leaderboards, counters, and presence. Any feature requiring instant updates across users benefits from key-value stores' speed and atomic operations.
12345678910111213141516171819202122232425262728293031323334353637383940
class PresenceTracker: """Track online users in real-time.""" def __init__(self, redis_client, timeout=60): self.redis = redis_client self.timeout = timeout def heartbeat(self, user_id: str) -> None: """User sends heartbeat to indicate online status.""" key = f"presence:{user_id}" self.redis.setex(key, self.timeout, "1") # Add to online users set with timestamp self.redis.zadd("online_users", {user_id: time.time()}) def is_online(self, user_id: str) -> bool: """Check if user is currently online.""" return self.redis.exists(f"presence:{user_id}") == 1 def get_online_count(self) -> int: """Count of currently online users.""" cutoff = time.time() - self.timeout self.redis.zremrangebyscore("online_users", 0, cutoff) return self.redis.zcard("online_users") class LiveCounter: """Real-time counters with persistence.""" def __init__(self, redis_client): self.redis = redis_client def increment(self, counter_name: str, amount: int = 1) -> int: """Atomically increment and return new value.""" return self.redis.incrby(f"counter:{counter_name}", amount) def get(self, counter_name: str) -> int: """Get current counter value.""" val = self.redis.get(f"counter:{counter_name}") return int(val) if val else 0Lightweight message queues. Redis lists with blocking pop operations provide simple, effective work queues. For more complex needs, Redis Streams offer consumer groups and acknowledgment.
123456789101112131415161718192021222324252627282930313233343536373839404142434445
class SimpleJobQueue: """Basic job queue using Redis lists.""" def __init__(self, redis_client, queue_name: str): self.redis = redis_client self.queue = f"queue:{queue_name}" self.processing = f"queue:{queue_name}:processing" def enqueue(self, job: dict) -> None: """Add job to queue.""" self.redis.rpush(self.queue, json.dumps(job)) def dequeue(self, timeout: int = 30) -> dict | None: """ Get next job (blocking with timeout). Moves to processing list for reliability. """ # BRPOPLPUSH: pop from queue, push to processing (atomic) result = self.redis.brpoplpush( self.queue, self.processing, timeout=timeout ) return json.loads(result) if result else None def complete(self, job: dict) -> None: """Mark job as complete (remove from processing).""" self.redis.lrem(self.processing, 1, json.dumps(job)) def get_pending_count(self) -> int: """Jobs waiting to be processed.""" return self.redis.llen(self.queue) # Usagequeue = SimpleJobQueue(redis_client, "emails")queue.enqueue({"type": "welcome", "user_id": "123"}) # Worker processwhile True: job = queue.dequeue(timeout=30) if job: process_job(job) queue.complete(job)Redis queues work well for simple use cases. For complex routing, dead letter queues, message replay, or guaranteed exactly-once delivery, consider RabbitMQ, Apache Kafka, or AWS SQS.
Dynamic configuration without deployments. Store application configuration in key-value stores to enable runtime changes without server restarts.
12345678910111213141516171819202122232425262728293031323334353637383940414243
class FeatureFlags: """Runtime feature flag management.""" def __init__(self, redis_client): self.redis = redis_client self.prefix = "feature:" def is_enabled(self, flag_name: str, user_id: str = None) -> bool: """Check if feature is enabled.""" key = f"{self.prefix}{flag_name}" # Global disable check if self.redis.get(f"{key}:disabled"): return False # Check if globally enabled if self.redis.get(f"{key}:enabled"): return True # Check percentage rollout percentage = self.redis.get(f"{key}:percentage") if percentage and user_id: # Consistent hashing for stable user experience user_hash = int(hashlib.md5(user_id.encode()).hexdigest(), 16) return (user_hash % 100) < int(percentage) # Check user allowlist if user_id and self.redis.sismember(f"{key}:users", user_id): return True return False def enable(self, flag_name: str) -> None: """Enable feature globally.""" self.redis.set(f"{self.prefix}{flag_name}:enabled", "1") def set_percentage(self, flag_name: str, percentage: int) -> None: """Enable for percentage of users.""" self.redis.set(f"{self.prefix}{flag_name}:percentage", str(percentage)) def add_user(self, flag_name: str, user_id: str) -> None: """Enable for specific user.""" self.redis.sadd(f"{self.prefix}{flag_name}:users", user_id)Key-value stores are not universal solutions. Recognizing anti-patterns is as important as knowing the right use cases.
| Anti-Pattern | Problem | Better Alternative |
|---|---|---|
| Complex queries | No support for filtering by value | Relational DB or Document DB |
| Relational data | No JOINs, manual denormalization | PostgreSQL, MySQL |
| Large objects | Memory pressure, slow operations | Object storage (S3) |
| Full-text search | No text indexing | Elasticsearch, Algolia |
| Analytics/OLAP | No aggregations | Data warehouse, ClickHouse |
| Primary database | Limited query flexibility | Use alongside relational DB |
Never use KEYS pattern in production—it blocks Redis while scanning ALL keys. If you need to find keys by pattern regularly, maintain explicit indexes. Use SCAN for one-time maintenance tasks.
You now understand where key-value stores excel and when to choose alternatives. Next, we'll explore the limitations in depth to complete your understanding of this powerful but specialized tool.