Loading learning content...
Every invalidation strategy we've explored so far operates at the individual entry level: expire this key, evict that entry, invalidate these tags. But what if you could invalidate everything at once, instantly, without touching each entry individually?
Cache versioning flips the invalidation model. Instead of tracking which entries are stale, you declare a new "version" or "generation" of the cache. All entries from previous versions are instantly considered stale—not by modifying them, but by changing what counts as valid.
This approach is particularly powerful for:
By the end of this page, you will understand cache versioning strategies: key prefix versioning, generation counters, schema version embedding, namespace isolation, and atomic cache transitions. You'll learn how to perform zero-downtime cache invalidations during deployments and data migrations.
The Core Idea:
Every cache key includes a version identifier. When reading from cache, only entries matching the current version are considered valid. Changing the current version instantly invalidates all old entries without touching them.
Traditional Invalidation:
// Must find and delete each stale entry
cache.delete("product:123")
cache.delete("product:456")
cache.delete("product:789")
// ... for every affected key
Version-Based Invalidation:
// Change the version once; all old entries are now stale
CURRENT_VERSION = "v2" // Was "v1"
// Old keys: "v1:product:123" - no longer matched
// New keys: "v2:product:123" - will be populated fresh
The old entries still exist in memory until evicted by TTL or capacity pressure, but they're never served because lookups use the new version prefix.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
Traditional Key Structure:+─────────────────────────────────────────────+ entity:id+ └──┬──┘ └┬┘+ │ │+ Type Identifier++Example: "product:12345"+++Versioned Key Structure:+─────────────────────────────────────────────+ version:entity:id+ └──┬───┘ └──┬──┘ └┬┘+ │ │ │+ Version Type Identifier++Example: "v3:product:12345"+++Multiple Version Dimensions:+─────────────────────────────────────────────+ app_version:schema_version:entity:id+ └────┬─────┘ └─────┬──────┘ └──┬──┘ └┬┘+ │ │ │ │+ App Deploy DB Schema Type Identifier++Example: "a2.1.0:s15:product:12345"+++Cache State During Version Transition:+─────────────────────────────────────────────++Before (CURRENT_VERSION = "v1"):+┌─────────────────────────────────────────────┐+│ Cache Contents │+│ │+│ v1:product:123 → {"name": "Widget", ...} │ ← Valid+│ v1:product:456 → {"name": "Gadget", ...} │ ← Valid+│ v1:user:789 → {"email": "a@b.com"} │ ← Valid+└─────────────────────────────────────────────┘ +After (CURRENT_VERSION = "v2"):+┌─────────────────────────────────────────────┐+│ Cache Contents │+│ │+│ v1:product:123 → {"name": "Widget", ...} │ ← Stale (ignored)+│ v1:product:456 → {"name": "Gadget", ...} │ ← Stale (ignored)+│ v1:user:789 → {"email": "a@b.com"} │ ← Stale (ignored)+│ v2:product:123 → {"name": "New Widget"...} │ ← Valid (new)+└─────────────────────────────────────────────┘Why This Works:
Trade-offs:
Why not just FLUSHALL? Cache versioning provides gradual migration. Old entries serve as fallback during version transition failures. FLUSHALL is atomic and irreversible—if something goes wrong, you have zero cache. Versioning allows rollback by reverting to the old version value.
The simplest versioning approach prepends a version string to every cache key. The version is stored in a well-known location (environment variable, configuration service, or a dedicated cache key).
Implementation Components:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
import redisimport jsonimport timefrom typing import Any, Optionalfrom functools import lru_cache class VersionedCache: """ Cache with key prefix versioning for instant global invalidation. All keys are prefixed with current version. Changing version instantly invalidates all old entries. """ VERSION_KEY = "__cache_version__" DEFAULT_VERSION = "v1" def __init__(self, redis_client: redis.Redis, default_ttl: int = 300): self.redis = redis_client self.default_ttl = default_ttl self._cached_version: Optional[str] = None self._version_cache_time: float = 0 self._version_cache_ttl: float = 5.0 # Cache version locally for 5 seconds def _get_current_version(self) -> str: """ Get current cache version with local caching. We cache the version locally to avoid Redis roundtrip on every cache operation. This means version changes take up to 5 seconds to propagate (acceptable for most use cases). """ now = time.time() if (self._cached_version is not None and now - self._version_cache_time < self._version_cache_ttl): return self._cached_version # Fetch from Redis version = self.redis.get(self.VERSION_KEY) if version is None: # Initialize version if not set self.redis.set(self.VERSION_KEY, self.DEFAULT_VERSION) version = self.DEFAULT_VERSION else: version = version.decode('utf-8') self._cached_version = version self._version_cache_time = now return version def _build_key(self, key: str) -> str: """Build versioned cache key.""" version = self._get_current_version() return f"{version}:{key}" def get(self, key: str) -> Optional[Any]: """Get value from cache using versioned key.""" versioned_key = self._build_key(key) data = self.redis.get(versioned_key) if data is None: return None return json.loads(data) def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: """Set value in cache using versioned key.""" versioned_key = self._build_key(key) ttl = ttl or self.default_ttl self.redis.setex(versioned_key, ttl, json.dumps(value)) def delete(self, key: str) -> bool: """Delete specific key (current version only).""" versioned_key = self._build_key(key) return self.redis.delete(versioned_key) > 0 def bump_version(self, new_version: Optional[str] = None) -> str: """ Increment cache version, instantly invalidating all old entries. Returns the new version string. """ if new_version is None: # Auto-generate version from timestamp new_version = f"v{int(time.time())}" self.redis.set(self.VERSION_KEY, new_version) # Clear local cache to force refresh self._cached_version = new_version self._version_cache_time = time.time() return new_version def get_version(self) -> str: """Get current version (for monitoring/debugging).""" return self._get_current_version() def cleanup_old_versions(self, keep_versions: int = 2) -> int: """ Remove entries from old versions to reclaim memory. Scans keys and deletes those not matching recent versions. Use sparingly - SCAN can be slow on large datasets. Returns count of deleted keys. """ current = self._get_current_version() deleted = 0 # This is expensive; use during maintenance windows only cursor = 0 while True: cursor, keys = self.redis.scan(cursor, match="v*:*", count=1000) for key in keys: key_str = key.decode('utf-8') key_version = key_str.split(':')[0] if key_version != current: self.redis.delete(key) deleted += 1 if cursor == 0: break return deleted # Usage:cache = VersionedCache(redis.Redis()) # Normal operationscache.set("product:123", {"name": "Widget", "price": 19.99})product = cache.get("product:123") # After deployment or data changeold_version = cache.get_version()new_version = cache.bump_version()print(f"Bumped version from {old_version} to {new_version}") # All old entries are now stale# Next get("product:123") will be a cache missLocal version caching means there's a delay (5 seconds in the example) before all application instances see the new version. For most invalidation scenarios, this is acceptable. For stricter requirements, reduce the local cache TTL or use pub/sub to broadcast version changes.
Generation counters apply versioning at a finer granularity. Instead of one global version, each entity type (or even each entity) has its own generation number. This allows selective invalidation without affecting unrelated cache entries.
Use Cases:
The Pattern:
Generation for 'products': 42
Generation for 'users': 17
Generation for 'categories': 8
Cache key format: {entity}:g{generation}:{id}
Examples:
products:g42:abc123 (product abc123, generation 42)
users:g17:user456 (user user456, generation 17)
When products change, bump products generation to 43. Product keys now use products:g43:*, instantly invalidating all product entries while users and categories remain valid.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
import redisimport jsonimport timefrom typing import Any, Optional, Dictfrom dataclasses import dataclass @dataclassclass GenerationInfo: entity: str generation: int updated_at: float class GenerationalCache: """ Cache with per-entity generation counters for selective invalidation. Each entity type has its own generation. Bumping an entity's generation invalidates all entries for that entity without affecting others. """ GENERATION_PREFIX = "__gen__" def __init__(self, redis_client: redis.Redis, default_ttl: int = 300): self.redis = redis_client self.default_ttl = default_ttl # Local generation cache (entity → GenerationInfo) self._gen_cache: Dict[str, GenerationInfo] = {} self._gen_cache_ttl = 5.0 # Seconds def _get_generation(self, entity: str) -> int: """Get current generation for entity type.""" now = time.time() # Check local cache if entity in self._gen_cache: info = self._gen_cache[entity] if now - info.updated_at < self._gen_cache_ttl: return info.generation # Fetch from Redis gen_key = f"{self.GENERATION_PREFIX}{entity}" gen_value = self.redis.get(gen_key) if gen_value is None: # Initialize at generation 1 generation = 1 self.redis.set(gen_key, generation) else: generation = int(gen_value) # Update local cache self._gen_cache[entity] = GenerationInfo( entity=entity, generation=generation, updated_at=now ) return generation def _build_key(self, entity: str, identifier: str) -> str: """Build key with entity-specific generation.""" generation = self._get_generation(entity) return f"{entity}:g{generation}:{identifier}" def get(self, entity: str, identifier: str) -> Optional[Any]: """Get value from cache.""" key = self._build_key(entity, identifier) data = self.redis.get(key) if data is None: return None return json.loads(data) def set(self, entity: str, identifier: str, value: Any, ttl: Optional[int] = None) -> None: """Set value in cache.""" key = self._build_key(entity, identifier) ttl = ttl or self.default_ttl self.redis.setex(key, ttl, json.dumps(value)) def delete(self, entity: str, identifier: str) -> bool: """Delete specific entry.""" key = self._build_key(entity, identifier) return self.redis.delete(key) > 0 def invalidate_entity(self, entity: str) -> int: """ Invalidate all entries for an entity type. Bumps the generation counter, instantly making all existing entries stale. Returns the new generation number. """ gen_key = f"{self.GENERATION_PREFIX}{entity}" # Atomic increment new_generation = self.redis.incr(gen_key) # Update local cache self._gen_cache[entity] = GenerationInfo( entity=entity, generation=new_generation, updated_at=time.time() ) return new_generation def get_generation_info(self) -> Dict[str, int]: """Get all known entity generations (for monitoring).""" result = {} # Scan for generation keys cursor = 0 while True: cursor, keys = self.redis.scan( cursor, match=f"{self.GENERATION_PREFIX}*", count=100 ) for key in keys: key_str = key.decode('utf-8') entity = key_str[len(self.GENERATION_PREFIX):] gen_value = self.redis.get(key) if gen_value: result[entity] = int(gen_value) if cursor == 0: break return result # Usage:cache = GenerationalCache(redis.Redis()) # Store data for different entitiescache.set("product", "123", {"name": "Widget", "price": 19.99})cache.set("product", "456", {"name": "Gadget", "price": 29.99})cache.set("user", "789", {"name": "Alice", "email": "alice@example.com"}) # Get dataproduct = cache.get("product", "123") # Returns Widgetuser = cache.get("user", "789") # Returns Alice # Product catalog updated - invalidate all productsnew_gen = cache.invalidate_entity("product")print(f"Products now at generation {new_gen}") # Products are now cache missescache.get("product", "123") # Returns None (cache miss) # Users are unaffectedcache.get("user", "789") # Still returns Alice # Monitor generationsprint(cache.get_generation_info())# {'product': 2, 'user': 1}Global version: Simplest, but invalidates everything. Use for deployments. Per-entity generation: Selective invalidation, moderate complexity. Use for data updates. Per-ID generation: Maximum precision, highest overhead. Use only for hot/critical entities.
When the structure of cached data changes (not just values), you need schema versioning. This is critical during:
The Problem:
# Before deployment:
cached_product = {"id": 123, "name": "Widget", "price": 19.99}
# After deployment, code expects:
expected_product = {
"id": 123,
"name": "Widget",
"price": {"amount": 19.99, "currency": "USD"} # Changed!
}
# Old cached data causes runtime errors:
price_amount = cached_product["price"]["amount"] # TypeError!
Schema versioning ensures old-format data is never read by new code.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
import redisimport jsonfrom typing import Any, Optional, Callable, Dictfrom dataclasses import dataclassimport hashlib @dataclassclass SchemaVersion: version: int migrator: Optional[Callable[[dict], dict]] = None class SchemaVersionedCache: """ Cache that embeds schema version in cached data. Each cached value includes its schema version. On read: - If version matches current: return data - If version is older: optionally migrate, or treat as miss - If version is newer: treat as miss (rollback scenario) This handles deployments where data structure changes. """ def __init__( self, redis_client: redis.Redis, current_schema: int = 1, migrators: Optional[Dict[int, Callable[[dict], dict]]] = None, default_ttl: int = 300 ): """ Args: redis_client: Redis connection current_schema: Current schema version number migrators: Dict mapping old_version → migration function default_ttl: Default TTL in seconds """ self.redis = redis_client self.current_schema = current_schema self.migrators = migrators or {} self.default_ttl = default_ttl def get(self, key: str, allow_migration: bool = True) -> Optional[Any]: """ Get value, handling schema version mismatches. Args: key: Cache key allow_migration: If True, attempt to migrate old schema data Returns: Value if found and schema matches (or migrated), None otherwise """ raw = self.redis.get(key) if raw is None: return None try: envelope = json.loads(raw) except json.JSONDecodeError: # Corrupted data, treat as miss self.redis.delete(key) return None stored_schema = envelope.get("__schema_version__", 1) data = envelope.get("data") if stored_schema == self.current_schema: # Perfect match return data if stored_schema > self.current_schema: # Future schema (rollback scenario) - treat as miss # Don't delete; the future version might roll forward again return None if not allow_migration: # Old schema, migration disabled return None # Attempt migration return self._migrate(key, data, stored_schema) def _migrate(self, key: str, data: dict, from_version: int) -> Optional[dict]: """ Attempt to migrate data from old schema to current. Applies migrators in sequence: v1 → v2 → v3 → ... → current """ current_data = data current_version = from_version while current_version < self.current_schema: migrator = self.migrators.get(current_version) if migrator is None: # No migrator for this version, can't proceed # Treat as cache miss return None try: current_data = migrator(current_data) current_version += 1 except Exception as e: # Migration failed, treat as miss print(f"Migration from v{current_version} failed: {e}") return None # Migration successful - update cache with new schema self.set(key, current_data) return current_data def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: """Set value with current schema version embedded.""" envelope = { "__schema_version__": self.current_schema, "data": value } ttl = ttl or self.default_ttl self.redis.setex(key, ttl, json.dumps(envelope)) def delete(self, key: str) -> bool: """Delete key.""" return self.redis.delete(key) > 0 def get_cache_stats(self) -> dict: """ Analyze schema versions in cache (for monitoring). Warning: Uses SCAN, slow on large caches. """ stats = {"total": 0, "by_version": {}, "corrupted": 0} cursor = 0 while True: cursor, keys = self.redis.scan(cursor, count=1000) for key in keys: if key.startswith(b"__"): continue raw = self.redis.get(key) if raw: try: envelope = json.loads(raw) version = envelope.get("__schema_version__", "unknown") stats["by_version"][version] = stats["by_version"].get(version, 0) + 1 stats["total"] += 1 except: stats["corrupted"] += 1 if cursor == 0: break return stats # Usage example with migrations: # Define migrators for each schema transitiondef migrate_v1_to_v2(data: dict) -> dict: """Migrate from flat price to price object.""" price = data.get("price", 0) data["price"] = {"amount": price, "currency": "USD"} return data def migrate_v2_to_v3(data: dict) -> dict: """Add created_at field.""" data["created_at"] = None # Unknown for migrated data return data # Initialize cache with current schema and migratorscache = SchemaVersionedCache( redis_client=redis.Redis(), current_schema=3, migrators={ 1: migrate_v1_to_v2, # v1 → v2 2: migrate_v2_to_v3, # v2 → v3 }) # Old cached data (v1 schema) will be automatically migrated on read# New data is stored with v3 schema product = cache.get("product:123")# If cached as v1, migrates: v1 → v2 → v3, returns v3 format# If cached as v3, returns directlyInline migration adds latency to cache reads. For complex migrations or high-throughput paths, prefer bump and refresh: invalidate via version bump, let reads repopulate with fresh data. Use inline migration only for backwards-compatible, low-overhead transformations.
Versioning naturally extends to namespace isolation—separating cache entries by tenant, environment, or application instance. This is essential for:
The Pattern:
Key format: {namespace}:{entity}:{id}
Examples:
tenant-acme:product:123 (Tenant ACME's product 123)
tenant-globex:product:123 (Tenant Globex's product 123 - different!)
staging:user:456 (Staging environment)
prod:user:456 (Production environment - different!)
feature-new-ui:session:789 (New UI feature flag users)
Changing a tenant's namespace version invalidates only that tenant's cache.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
import redisimport jsonimport timefrom typing import Any, Optional, Dictfrom dataclasses import dataclassfrom contextvars import ContextVar # Context variable for current tenantcurrent_tenant: ContextVar[str] = ContextVar('current_tenant', default='default') @dataclass class NamespaceConfig: version: int updated_at: float class NamespacedCache: """ Multi-tenant cache with per-namespace versioning. Each namespace (tenant, environment, etc.) has its own version. Invalidating a namespace only affects that namespace's entries. """ NS_VERSION_PREFIX = "__ns_version__" def __init__( self, redis_client: redis.Redis, default_ttl: int = 300, environment: str = "prod" ): self.redis = redis_client self.default_ttl = default_ttl self.environment = environment # Local cache of namespace versions self._ns_cache: Dict[str, NamespaceConfig] = {} self._ns_cache_ttl = 5.0 def _get_namespace(self) -> str: """Get current namespace from context.""" tenant = current_tenant.get() return f"{self.environment}:{tenant}" def _get_ns_version(self, namespace: str) -> int: """Get version for namespace.""" now = time.time() if namespace in self._ns_cache: config = self._ns_cache[namespace] if now - config.updated_at < self._ns_cache_ttl: return config.version version_key = f"{self.NS_VERSION_PREFIX}{namespace}" version = self.redis.get(version_key) if version is None: version = 1 self.redis.set(version_key, version) else: version = int(version) self._ns_cache[namespace] = NamespaceConfig( version=version, updated_at=now ) return version def _build_key(self, key: str) -> str: """Build fully qualified key with namespace and version.""" namespace = self._get_namespace() version = self._get_ns_version(namespace) return f"{namespace}:v{version}:{key}" def get(self, key: str) -> Optional[Any]: """Get value from current namespace.""" full_key = self._build_key(key) data = self.redis.get(full_key) return json.loads(data) if data else None def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: """Set value in current namespace.""" full_key = self._build_key(key) ttl = ttl or self.default_ttl self.redis.setex(full_key, ttl, json.dumps(value)) def delete(self, key: str) -> bool: """Delete key from current namespace.""" full_key = self._build_key(key) return self.redis.delete(full_key) > 0 def invalidate_namespace(self, namespace: Optional[str] = None) -> int: """ Invalidate all entries in a namespace. If namespace not specified, uses current namespace. Returns new version number. """ if namespace is None: namespace = self._get_namespace() version_key = f"{self.NS_VERSION_PREFIX}{namespace}" new_version = self.redis.incr(version_key) # Update local cache self._ns_cache[namespace] = NamespaceConfig( version=new_version, updated_at=time.time() ) return new_version def invalidate_tenant(self, tenant: str) -> int: """Invalidate all entries for a specific tenant.""" namespace = f"{self.environment}:{tenant}" return self.invalidate_namespace(namespace) def get_namespace_versions(self) -> Dict[str, int]: """Get all namespace versions (for monitoring).""" result = {} cursor = 0 while True: cursor, keys = self.redis.scan( cursor, match=f"{self.NS_VERSION_PREFIX}*", count=100 ) for key in keys: key_str = key.decode('utf-8') namespace = key_str[len(self.NS_VERSION_PREFIX):] version = self.redis.get(key) if version: result[namespace] = int(version) if cursor == 0: break return result # Usage example: cache = NamespacedCache(redis.Redis(), environment="prod") # Set tenant context (typically done in request middleware)token = current_tenant.set("acme")try: # All operations now scoped to tenant "acme" cache.set("config", {"theme": "dark"}) cache.set("limits", {"max_users": 100}) # Get data (namespace-scoped) config = cache.get("config") # Returns ACME's configfinally: current_tenant.reset(token) # Different tenant sees nothingtoken = current_tenant.set("globex")try: config = cache.get("config") # Returns None - different namespace cache.set("config", {"theme": "light"}) # Globex's configfinally: current_tenant.reset(token) # Invalidate specific tenant's cache (e.g., after data import)cache.invalidate_tenant("acme")# Only ACME's cache is invalidated; Globex unaffected # Monitor namespace versionsprint(cache.get_namespace_versions())# {'prod:acme': 2, 'prod:globex': 1}Python's contextvars provide thread-safe, async-safe context propagation. Set the tenant context once at request entry (in middleware), and all cache operations automatically scope to that tenant. This pattern prevents accidental cross-tenant data leaks.
Deployments are the most common trigger for version-based invalidation. But naive version bumping causes a cold cache storm—all instances simultaneously experience cache misses, overwhelming the database.
The Cold Cache Storm:
Deployment timeline:
T=0: Version bumped from v1 to v2
T=0.1s: Instance 1 sees v2, all keys are misses
T=0.2s: Instance 2 sees v2, all keys are misses
T=0.3s: Instance 3 sees v2, all keys are misses
...
T=1s: All N instances hammering database
Database load: 0 → 100x normal in seconds
Result: Database overload, cascading failures
Mitigation Strategies:
Strategy 1: Gradual Version Rollout
Don't bump version globally. Instead, roll out the new version gradually:
T=0: 10% of instances see v2
T=5m: 25% of instances see v2
T=10m: 50% of instances see v2
T=15m: 100% of instances see v2
The cache warms gradually as each wave of instances populates v2 entries.
Strategy 2: Pre-Warming
Before bumping version, pre-populate critical cache entries with the new version:
# Before deployment
for key in CRITICAL_KEYS:
data = fetch_from_database(key)
cache.set_with_version("v2", key, data) # Pre-warm v2
# During deployment
cache.bump_version("v2") # v2 entries already warm!
Strategy 3: Stale-While-Revalidate
Allow serving stale v1 data while asynchronously populating v2.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
import redisimport jsonimport timeimport threadingfrom typing import Any, Optional, Callable, Listfrom concurrent.futures import ThreadPoolExecutorfrom enum import Enum class TransitionState(Enum): STABLE = "stable" # Single active version MIGRATING = "migrating" # Transitioning between versions class DeploymentCache: """ Cache with graceful deployment transitions. Supports: - Pre-warming new version before cutover - Gradual rollout with percentage control - Fallback to old version during transition - Automatic warmup of critical keys """ VERSION_KEY = "__deploy_cache_version__" OLD_VERSION_KEY = "__deploy_cache_old_version__" TRANSITION_STATE_KEY = "__deploy_transition_state__" ROLLOUT_PERCENT_KEY = "__deploy_rollout_percent__" def __init__( self, redis_client: redis.Redis, default_ttl: int = 300, instance_id: Optional[str] = None ): self.redis = redis_client self.default_ttl = default_ttl self.instance_id = instance_id or str(id(self)) self._local_version: Optional[str] = None self._local_version_time: float = 0 def _get_state(self) -> dict: """Get current transition state.""" pipe = self.redis.pipeline() pipe.get(self.VERSION_KEY) pipe.get(self.OLD_VERSION_KEY) pipe.get(self.TRANSITION_STATE_KEY) pipe.get(self.ROLLOUT_PERCENT_KEY) results = pipe.execute() return { "current_version": results[0].decode() if results[0] else "v1", "old_version": results[1].decode() if results[1] else None, "state": results[2].decode() if results[2] else "stable", "rollout_percent": int(results[3]) if results[3] else 100, } def _should_use_new_version(self, state: dict) -> bool: """Determine if this request should use new version.""" if state["state"] == "stable": return True # During migration, use rollout percentage # Hash instance ID for consistent assignment instance_hash = hash(self.instance_id) % 100 return instance_hash < state["rollout_percent"] def _build_key(self, key: str, version: str) -> str: """Build versioned key.""" return f"{version}:{key}" def get(self, key: str) -> Optional[Any]: """ Get value with transition-aware logic. During migration: - Check new version first (if in rollout) - Fall back to old version if new version misses """ state = self._get_state() use_new = self._should_use_new_version(state) if use_new or state["state"] == "stable": # Try current version versioned_key = self._build_key(key, state["current_version"]) data = self.redis.get(versioned_key) if data: return json.loads(data) # During migration, try old version as fallback if state["state"] == "migrating" and state["old_version"]: old_key = self._build_key(key, state["old_version"]) data = self.redis.get(old_key) if data: old_data = json.loads(data) # Optionally: copy to new version (async warmup) if use_new: self._async_copy_to_new( key, old_data, state["current_version"] ) return old_data return None def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: """Set value in current version.""" state = self._get_state() version = state["current_version"] versioned_key = self._build_key(key, version) ttl = ttl or self.default_ttl self.redis.setex(versioned_key, ttl, json.dumps(value)) def _async_copy_to_new(self, key: str, data: Any, new_version: str): """Asynchronously copy data to new version for warmup.""" def copy(): new_key = self._build_key(key, new_version) self.redis.setex(new_key, self.default_ttl, json.dumps(data)) threading.Thread(target=copy, daemon=True).start() # ---- Deployment Operations ---- def start_migration(self, new_version: str) -> None: """ Begin migration to new version. Sets up dual-version state for gradual transition. """ state = self._get_state() pipe = self.redis.pipeline() pipe.set(self.OLD_VERSION_KEY, state["current_version"]) pipe.set(self.VERSION_KEY, new_version) pipe.set(self.TRANSITION_STATE_KEY, "migrating") pipe.set(self.ROLLOUT_PERCENT_KEY, 0) # Start at 0% pipe.execute() def set_rollout_percent(self, percent: int) -> None: """Set percentage of traffic using new version.""" percent = max(0, min(100, percent)) self.redis.set(self.ROLLOUT_PERCENT_KEY, percent) def complete_migration(self) -> None: """Finalize migration, commit to new version only.""" pipe = self.redis.pipeline() pipe.set(self.ROLLOUT_PERCENT_KEY, 100) pipe.set(self.TRANSITION_STATE_KEY, "stable") pipe.delete(self.OLD_VERSION_KEY) pipe.execute() def rollback_migration(self) -> None: """Abort migration, revert to old version.""" state = self._get_state() if not state["old_version"]: raise ValueError("No old version to rollback to") pipe = self.redis.pipeline() pipe.set(self.VERSION_KEY, state["old_version"]) pipe.set(self.TRANSITION_STATE_KEY, "stable") pipe.set(self.ROLLOUT_PERCENT_KEY, 100) pipe.delete(self.OLD_VERSION_KEY) pipe.execute() def prewarm_keys( self, keys: List[str], fetch_fn: Callable[[str], Any], new_version: str, parallelism: int = 10 ) -> int: """ Pre-warm specified keys in new version before migration. Returns count of keys successfully pre-warmed. """ warmed = 0 def warm_key(key: str): try: data = fetch_fn(key) if data is not None: versioned_key = self._build_key(key, new_version) self.redis.setex( versioned_key, self.default_ttl, json.dumps(data) ) return 1 except Exception as e: print(f"Failed to warm {key}: {e}") return 0 with ThreadPoolExecutor(max_workers=parallelism) as executor: results = executor.map(warm_key, keys) warmed = sum(results) return warmed # Deployment workflow: cache = DeploymentCache(redis.Redis()) # 1. Pre-warm new version with critical keysdef fetch_from_db(key: str): # Your database fetch logic return {"data": f"value for {key}"} critical_keys = ["config:global", "products:featured", "users:admin"]warmed = cache.prewarm_keys(critical_keys, fetch_from_db, "v2")print(f"Pre-warmed {warmed} keys") # 2. Start migrationcache.start_migration("v2") # 3. Gradually increase rolloutcache.set_rollout_percent(10) # 10% on new versiontime.sleep(300) # Monitor for 5 minutescache.set_rollout_percent(50) # 50% on new versiontime.sleep(300) # Monitorcache.set_rollout_percent(100) # Full rollout # 4. Finalize (or rollback if issues)if everything_ok: cache.complete_migration()else: cache.rollback_migration()Identify your "critical keys"—the 1% of keys that serve 99% of traffic. Pre-warm these before version bump. Use access logs, monitoring data, or explicit hot-key tracking to identify them. Pre-warming even 1000 keys can prevent most cold cache impact.
Cache versioning is powerful but requires careful implementation. Here are production-tested best practices:
| Scenario | Recommended Strategy | Key Considerations |
|---|---|---|
| Code deployment | Global version bump | Pre-warm critical keys, gradual rollout |
| Database migration | Schema version + bump | May need data migration, not just invalidation |
| Single tenant data import | Tenant-specific bump | Other tenants unaffected |
| Configuration change | Entity-type generation bump | Only config entries invalidated |
| Cache corruption recovery | Global version bump + flush old | Immediate bump, async cleanup |
| A/B test rollout | Namespace isolation | Separate caches per treatment group |
During version transitions, you temporarily have two versions of data in cache. Monitor Redis memory and set appropriate maxmemory-policy to evict old entries under pressure. If memory is tight, explicitly clean up old version entries after transition completes.
Cache versioning provides a powerful complement to entry-level invalidation strategies. Let's consolidate the key insights:
Module Complete:
You've now mastered the complete toolkit of cache invalidation strategies:
These strategies combine to form robust, production-ready caching architectures that balance freshness, performance, and operational simplicity.
Congratulations! You've completed the Cache Invalidation Strategies module. You now have deep expertise in the full spectrum of invalidation techniques—from fine-grained TTL tuning to bulk version-based transitions. You're equipped to design and operate cache invalidation systems that can handle the notorious "two hard things in computer science."