Loading content...
TTL-based expiration makes a probabilistic bet: after N seconds, cached data is likely stale enough to refresh. But what if you can't tolerate that uncertainty? What if serving stale data for even a few seconds causes real harm—incorrect prices on a trading platform, outdated inventory shown to customers, or security tokens remaining valid after revocation?
Event-based invalidation fundamentally changes the approach. Instead of waiting for time to pass, the cache is actively notified when source data changes. The moment a database record updates, a message propagates to all caches holding that data, instructing them to invalidate or refresh.
This approach trades complexity for precision. You gain tighter consistency guarantees, but you must build the infrastructure to detect changes, route them to caches, and handle the inevitable edge cases of distributed messaging.
By the end of this page, you will understand the architectural patterns for event-driven cache invalidation: publish-subscribe systems, database change data capture, cache invalidation buses, and the consistency guarantees you can (and cannot) achieve. You'll also learn the failure modes and mitigations essential for production reliability.
TTL is reactive—the cache reacts to expiration by fetching new data. Event-based invalidation is proactive—the system actively notifies caches to invalidate before they would naturally expire.
The Event Invalidation Flow:
1. Data Mutation: Application writes to database
2. Event Generation: Change event is created (what changed, when, by whom)
3. Event Routing: Event is sent to messaging system
4. Cache Subscription: Caches subscribe to relevant events
5. Invalidation: On receiving event, cache invalidates matching entries
6. Refresh (optional): Next access fetches fresh data, or proactively refresh
This flow requires infrastructure that TTL doesn't need: event buses, subscriptions, and coordination between writers and caches.
12345678910111213141516171819202122232425
┌─────────────────────────────────────────────────────────────────┐│ EVENT-BASED INVALIDATION │└─────────────────────────────────────────────────────────────────┘ ┌──────────┐ ┌──────────────┐ ┌────────────────────┐ │ App │────▶│ Database │────▶│ Change Detection │ │ Server │ │ (Write) │ │ (Trigger/CDC) │ └──────────┘ └──────────────┘ └─────────┬──────────┘ │ ┌────────▼────────┐ │ Message Bus │ │ (Kafka/Redis/ │ │ RabbitMQ) │ └────────┬────────┘ ┌─────────────────────────┼─────────────────────────┐ │ │ │ ┌────▼─────┐ ┌────▼─────┐ ┌────▼─────┐ │ Cache │ │ Cache │ │ Cache │ │ Node 1 │ │ Node 2 │ │ Node N │ │ (Redis) │ │ (Redis) │ │ (Redis) │ └──────────┘ └──────────┘ └──────────┘ Legend: ────▶ Write operation ────▶ Event propagation (async)Why Event-Based?
Consider these scenarios where TTL alone is insufficient:
E-commerce inventory: A product shows "in stock" with a 5-minute TTL. User adds to cart. In reality, the last unit sold 3 minutes ago. User sees error at checkout. Bad experience.
Security token revocation: Admin revokes a user's access token. With 30-minute TTL, user retains access for up to 30 more minutes. Unacceptable security risk.
Price changes: Flash sale starts. Prices drop 50%. With 10-minute TTL, some users see old prices for 10 minutes. Leads to complaints, refund requests, legal issues in some jurisdictions.
In each case, the business requirement is "data must update within seconds of change"—a requirement TTL fundamentally cannot satisfy with short, predictable latency.
Event-based invalidation reduces staleness latency but doesn't guarantee strong consistency. Events can be delayed, lost, or processed out of order. For true strong consistency, you need different patterns (read-through with cache-aside, or no caching). Event invalidation provides eventual consistency with bounded staleness.
Publish-Subscribe (Pub/Sub) is the foundational pattern for event-based invalidation. Publishers (data writers) emit events when data changes. Subscribers (caches) listen for relevant events and act on them.
Key Characteristics:
Decoupling: Publishers don't know about subscribers. They simply publish events to topics/channels. This allows caches to be added/removed without modifying write logic.
Fan-out: One event reaches all subscribers. When product X updates, all 20 cache nodes holding product X can be simultaneously notified.
Asynchronous: Publishing doesn't wait for subscribers to process. This keeps write latency low.
At-least-once delivery: Most pub/sub systems guarantee messages are delivered at least once (but possibly duplicated). Invalidation logic must be idempotent.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
import redisimport jsonimport threadingfrom typing import Callable, Dict, Any class CacheInvalidationListener: """ Listens for cache invalidation events via Redis Pub/Sub. When events arrive, invalidates matching keys in local cache. """ def __init__( self, redis_client: redis.Redis, local_cache: Dict[str, Any], channels: list[str] ): self.redis = redis_client self.local_cache = local_cache self.pubsub = self.redis.pubsub() self.channels = channels self._running = False def start(self): """Start listening for invalidation events.""" self.pubsub.subscribe(*self.channels) self._running = True # Run listener in background thread self._thread = threading.Thread(target=self._listen, daemon=True) self._thread.start() def _listen(self): """Main listener loop.""" for message in self.pubsub.listen(): if not self._running: break if message['type'] != 'message': continue try: event = json.loads(message['data']) self._handle_event(event) except Exception as e: print(f"Error processing invalidation event: {e}") def _handle_event(self, event: dict): """ Process an invalidation event. Event format: { "type": "invalidate" | "update", "entity": "product" | "user" | "order", "id": "12345", "timestamp": 1699999999, "data": {...} # Optional: new value for update events } """ event_type = event.get('type') entity = event.get('entity') entity_id = event.get('id') cache_key = f"{entity}:{entity_id}" if event_type == 'invalidate': # Simply remove from cache; next access will fetch fresh if cache_key in self.local_cache: del self.local_cache[cache_key] print(f"Invalidated cache key: {cache_key}") elif event_type == 'update': # Proactively update cache with new value new_data = event.get('data') if new_data: self.local_cache[cache_key] = new_data print(f"Updated cache key: {cache_key}") def stop(self): """Stop the listener.""" self._running = False self.pubsub.unsubscribe() class CacheInvalidationPublisher: """ Publishes cache invalidation events when data changes. Call from your data write path to notify all caches. """ def __init__(self, redis_client: redis.Redis): self.redis = redis_client def invalidate(self, entity: str, entity_id: str): """Publish an invalidation event.""" event = { "type": "invalidate", "entity": entity, "id": entity_id, "timestamp": int(time.time()) } channel = f"cache:invalidation:{entity}" self.redis.publish(channel, json.dumps(event)) def update(self, entity: str, entity_id: str, new_data: dict): """Publish an update event with new data.""" event = { "type": "update", "entity": entity, "id": entity_id, "timestamp": int(time.time()), "data": new_data } channel = f"cache:invalidation:{entity}" self.redis.publish(channel, json.dumps(event)) # Usage Example:# # # In your data layer, after database write:# def update_product(product_id: str, new_price: float):# db.execute("UPDATE products SET price = %s WHERE id = %s", (new_price, product_id))# # # Publish invalidation event# publisher.invalidate("product", product_id)# # # In your cache server:# listener = CacheInvalidationListener(# redis_client=redis.Redis(),# local_cache=my_cache,# channels=["cache:invalidation:product", "cache:invalidation:user"]# )# listener.start()Redis Pub/Sub is fire-and-forget: if a subscriber is offline when an event is published, it misses the event permanently. For production systems requiring guaranteed delivery, use Redis Streams, Kafka, or dedicated message queues instead. Redis Pub/Sub is suitable for best-effort invalidation combined with TTL fallback.
Change Data Capture (CDC) is a technique that detects and captures changes made to a database, typically by reading the database's transaction log (binlog in MySQL, WAL in PostgreSQL). CDC provides a reliable, ordered stream of all data mutations without modifying application code.
Why CDC for Cache Invalidation?
No application changes: CDC reads database logs directly. Your application doesn't need to publish events explicitly—every database change is automatically captured.
Complete coverage: CDC captures all changes, including those made by batch jobs, migrations, or direct SQL updates that bypass your application.
Guaranteed ordering: Transaction logs maintain strict ordering. If A happens before B in the database, the CDC stream reflects this order.
Minimal overhead: Reading logs is typically more efficient than firing application-level events on every write.
12345678910111213141516171819202122232425262728293031
┌─────────────────────────────────────────────────────────────────────────┐│ CDC-BASED CACHE INVALIDATION │└─────────────────────────────────────────────────────────────────────────┘ ┌──────────┐ ┌──────────────┐ ┌─────────────────────┐ │ App │────▶│ Database │────▶│ Transaction Log │ │ Server │ │ (MySQL/PG) │ │ (binlog/WAL) │ └──────────┘ └──────────────┘ └──────────┬──────────┘ │ (reads log) ┌────────▼────────┐ │ Debezium / │ │ Maxwell / │ │ Custom CDC │ └────────┬────────┘ │ ┌────────▼────────┐ │ Kafka Topics │ │ (one per table)│ └────────┬────────┘ │ ┌──────────────────────────┼──────────────────────────┐ │ │ │ ┌────▼────────┐ ┌────▼────────┐ ┌────▼────────┐ │ Invalidator │ │ Invalidator │ │ Analytics │ │ Service 1 │ │ Service 2 │ │ Consumer │ └──────┬──────┘ └──────┬──────┘ └─────────────┘ │ │ ┌──────▼──────┐ ┌──────▼──────┐ │ Cache │ │ Cache │ │ Cluster A │ │ Cluster B │ └─────────────┘ └─────────────┘Popular CDC Tools:
Debezium: Open-source CDC platform built on Kafka Connect. Supports MySQL, PostgreSQL, MongoDB, SQL Server, Oracle, and more. Provides JSON events with before/after row state. Production-ready, widely adopted.
Maxwell: Lightweight MySQL-only CDC tool. Simpler than Debezium, good for smaller deployments. Outputs to Kafka, Redis, or custom sinks.
pg_logical (PostgreSQL): Native PostgreSQL logical decoding. Can output changes in various formats. Good for PostgreSQL-heavy environments.
DynamoDB Streams: AWS-native CDC for DynamoDB. Integrates with Lambda for serverless processing.
MongoDB Change Streams: Native real-time change notification in MongoDB. Allows subscribing to changes at collection or database level.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
from kafka import KafkaConsumerimport jsonimport redisfrom typing import Optional class CDCCacheInvalidator: """ Consumes Debezium CDC events and invalidates corresponding cache entries. Handles insert, update, and delete operations from the database. """ def __init__( self, kafka_brokers: list[str], cache: redis.Redis, table_topic_mapping: dict[str, str] ): """ Args: kafka_brokers: Kafka broker addresses cache: Redis cache client table_topic_mapping: Maps table names to Kafka topics e.g., {"products": "dbserver.inventory.products"} """ self.cache = cache self.table_mapping = table_topic_mapping self.consumer = KafkaConsumer( *table_topic_mapping.values(), bootstrap_servers=kafka_brokers, group_id='cache-invalidator', value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', enable_auto_commit=True ) def run(self): """Main processing loop.""" print("Starting CDC cache invalidator...") for message in self.consumer: try: self._process_event(message.topic, message.value) except Exception as e: print(f"Error processing CDC event: {e}") def _process_event(self, topic: str, event: dict): """ Process a single Debezium CDC event. Debezium event structure: { "before": {...} or null, # Row state before change "after": {...} or null, # Row state after change "source": {...}, # Source database info "op": "c" | "u" | "d" | "r", # Operation type "ts_ms": 1699999999000 # Timestamp } Operations: - "c" = create (INSERT) - "u" = update (UPDATE) - "d" = delete (DELETE) - "r" = read (snapshot/initial load) """ payload = event.get('payload', event) # Handle envelope vs payload operation = payload.get('op') before = payload.get('before') after = payload.get('after') # Determine entity ID (from after for inserts/updates, before for deletes) entity_data = after if operation in ('c', 'u', 'r') else before if not entity_data: return entity_id = entity_data.get('id') if not entity_id: return # Derive cache key from topic and entity ID table_name = topic.split('.')[-1] # e.g., "products" from "dbserver.inventory.products" cache_key = f"{table_name}:{entity_id}" if operation == 'd': # DELETE: Remove from cache self.cache.delete(cache_key) print(f"[DELETE] Invalidated {cache_key}") elif operation in ('c', 'u', 'r'): # INSERT/UPDATE/SNAPSHOT: Invalidate or update # Option 1: Just invalidate, let next read populate self.cache.delete(cache_key) print(f"[{operation.upper()}] Invalidated {cache_key}") # Option 2: Proactively update cache with new data # self.cache.setex(cache_key, 300, json.dumps(after)) # print(f"[{operation.upper()}] Updated {cache_key}") def _build_cache_key(self, table: str, entity_id: str, extra_keys: Optional[list] = None) -> list[str]: """ Build all cache keys that need invalidation for a given entity. Some entities have multiple cache representations: - products:123 (by ID) - products:sku:ABC123 (by SKU) - products:category:electronics (listing) This method returns all keys that should be invalidated. """ keys = [f"{table}:{entity_id}"] if extra_keys: keys.extend(extra_keys) return keys # Usage:# invalidator = CDCCacheInvalidator(# kafka_brokers=['localhost:9092'],# cache=redis.Redis(host='localhost', port=6379),# table_topic_mapping={# 'products': 'dbserver.inventory.products',# 'users': 'dbserver.userdb.users',# 'orders': 'dbserver.orderdb.orders'# }# )# invalidator.run()Start with invalidation, not updating: It's safer to delete cache entries and let the next read repopulate than to try keeping the cache in sync via CDC events. Proactive updates risk version conflicts if events arrive out of order. Use idempotent operations: CDC may deliver duplicate events during failures. Deleting non-existent keys is safe; updating with stale data is not.
A critical design decision is the granularity of invalidation: how precisely do you target which cache entries to invalidate? Finer granularity preserves more cache hits; coarser granularity is simpler and safer.
| Granularity | Description | Cache Key Example | Trade-offs |
|---|---|---|---|
| Exact Key | Invalidate only the specific cache entry for the changed record | product:12345 | Highest precision, lowest blast radius, but must track exact cache keys per entity |
| Entity Type | Invalidate all cache entries for a type when any instance changes | product:* (wildcard) | Simpler, but over-invalidates; one product change clears all products |
| Related Entities | Invalidate entity and all logically related entries | product:123, category:45, search:electronics | Comprehensive, but requires modeling entity relationships |
| Full Cache Flush | Clear entire cache on any significant change | FLUSHDB | Simplest, guarantees freshness, but destroys hit rate temporarily |
The Related Entity Problem:
Real applications have complex data relationships. A single database write may affect multiple cached representations:
Scenario: Product price changes
Affected cache entries:
1. product:12345 (direct entity cache)
2. category:electronics (listing includes this product)
3. search:"laptop" (search results include this product)
4. homepage:featured (product is featured on homepage)
5. user:67890:recommendations (personalized recommendations)
6. cart:abc123 (cart containing this product)
Tracking all these relationships is the hard problem of cache invalidation. Solutions include:
1. Tagging systems: Each cache entry is tagged with related entity IDs. Invalidation queries tags, not keys.
cache.set("search:laptop", results, tags=["product:123", "product:456"])
# Later:
cache.invalidate_by_tag("product:123") # Invalidates all entries tagged with product:123
2. Dependency tracking: Maintain explicit dependency graph between cache entries. When entity changes, traverse graph to find all dependents.
3. Conservative over-invalidation: Accept that some invalidations will be broader than necessary. A category cache is invalidated when any product in that category changes, even if the listing wouldn't visually change.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
import redisimport jsonfrom typing import Set, Any, Optionalimport time class TaggedCache: """ Cache with tag-based invalidation support. Each cache entry can be associated with multiple tags. Invalidating a tag invalidates all entries with that tag. """ def __init__(self, redis_client: redis.Redis, default_ttl: int = 300): self.redis = redis_client self.default_ttl = default_ttl def set(self, key: str, value: Any, tags: list[str] = None, ttl: int = None) -> None: """ Store a value with associated tags. Args: key: Cache key value: Value to cache (will be JSON serialized) tags: List of tags to associate with this entry ttl: Time-to-live in seconds """ ttl = ttl or self.default_ttl tags = tags or [] # Store the value self.redis.setex(f"cache:{key}", ttl, json.dumps(value)) # For each tag, add this key to the tag's set # Also track which tags belong to this key for cleanup if tags: pipe = self.redis.pipeline() # Store tags associated with this key key_tags_key = f"cache_tags:{key}" pipe.delete(key_tags_key) pipe.sadd(key_tags_key, *tags) pipe.expire(key_tags_key, ttl + 60) # Slightly longer than value TTL # Add key to each tag's member set for tag in tags: tag_members_key = f"tag_members:{tag}" pipe.sadd(tag_members_key, key) # Don't expire tag sets; they're cleaned up on access pipe.execute() def get(self, key: str) -> Optional[Any]: """Retrieve a cached value.""" data = self.redis.get(f"cache:{key}") return json.loads(data) if data else None def invalidate_by_tag(self, tag: str) -> int: """ Invalidate all cache entries associated with a tag. Returns the number of entries invalidated. """ tag_members_key = f"tag_members:{tag}" # Get all keys with this tag members = self.redis.smembers(tag_members_key) if not members: return 0 # Delete all cache entries pipe = self.redis.pipeline() count = 0 for key_bytes in members: key = key_bytes.decode('utf-8') if isinstance(key_bytes, bytes) else key_bytes pipe.delete(f"cache:{key}") pipe.delete(f"cache_tags:{key}") count += 1 # Clear the tag's member set pipe.delete(tag_members_key) pipe.execute() return count def invalidate_by_tags(self, tags: list[str]) -> int: """Invalidate by multiple tags. Union of all tagged entries.""" total = 0 for tag in tags: total += self.invalidate_by_tag(tag) return total # Usage example:cache = TaggedCache(redis.Redis()) # Cache a search result, tagged with all product IDs in resultssearch_results = {"products": [{"id": 123}, {"id": 456}, {"id": 789}]}cache.set( key="search:laptops:page1", value=search_results, tags=["product:123", "product:456", "product:789", "category:laptops"]) # Cache a product detail pagecache.set( key="product:123", value={"id": 123, "name": "Laptop", "price": 999}, tags=["product:123", "category:laptops"]) # When product 123 is updated, invalidate all related entriesupdated_count = cache.invalidate_by_tag("product:123")print(f"Invalidated {updated_count} cache entries") # Invalidates both entries aboveTag-based invalidation adds storage overhead (tag membership sets) and write overhead (updating tag sets on every cache write). For high-write workloads, consider whether the precision benefits outweigh these costs. Sometimes broader invalidation is more efficient.
Event-based invalidation introduces several subtle consistency challenges that don't exist with simple TTL. Understanding these is critical for building reliable systems.
Challenge 1: Race Between Cache Write and Invalidation
Consider this sequence:
T1: Request A reads from DB, gets value V1
T2: Write B updates DB to V2
T3: Invalidation event for B is published
T4: Request A writes V1 to cache (stale!)
T5: Invalidation event processed, deletes cache
T6: Next request correctly gets V2
The problem: between T4 and T5, the cache holds stale V1. If the invalidation event is delayed, this window can be significant.
Challenge 2: Out-of-Order Events
In distributed systems, event delivery order isn't guaranteed:
T1: Write sets value to V1
T2: Invalidation event E1 published
T3: Write sets value to V2
T4: Invalidation event E2 published
T5: E2 arrives at cache, processed (invalidates V1 entry)
T6: E1 arrives at cache, processed (no-op, already invalidated)
This works fine. But consider:
T1: Write sets value to V1
T2: Invalidation + refresh: cache updates to V1
T3: Write sets value to V2
T4: Invalidation + refresh: cache updates to V2
T5: E2 arrives, updates cache to V2
T6: E1 arrives LATE, updates cache to V1 (STALE!)
If invalidation events carry new values (proactive refresh), out-of-order delivery can regress cache to older versions.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
import redisimport jsonfrom typing import Any, Optional class VersionedCache: """ Cache that tracks version numbers to prevent stale updates. Only accepts updates with version >= current version. Invalidations always work (delete doesn't need version check). """ def __init__(self, redis_client: redis.Redis): self.redis = redis_client def get(self, key: str) -> Optional[dict]: """Get value with version metadata.""" data = self.redis.hgetall(f"vcache:{key}") if not data: return None return { 'value': json.loads(data[b'value']), 'version': int(data[b'version']) } def set_if_newer(self, key: str, value: Any, version: int, ttl: int = 300) -> bool: """ Set value only if the provided version is newer than current. Uses Lua script for atomic compare-and-set. Returns True if value was updated, False if rejected (stale). """ lua_script = """ local key = KEYS[1] local new_value = ARGV[1] local new_version = tonumber(ARGV[2]) local ttl = tonumber(ARGV[3]) -- Get current version local current_version = redis.call('HGET', key, 'version') if current_version then current_version = tonumber(current_version) if new_version <= current_version then -- New version is not newer; reject update return 0 end end -- Version is newer (or no current version); update redis.call('HSET', key, 'value', new_value, 'version', new_version) redis.call('EXPIRE', key, ttl) return 1 """ result = self.redis.eval( lua_script, 1, # Number of keys f"vcache:{key}", # KEYS[1] json.dumps(value), # ARGV[1] version, # ARGV[2] ttl # ARGV[3] ) return result == 1 def invalidate(self, key: str) -> bool: """ Invalidate (delete) a cache entry. Deletion always succeeds regardless of version. """ return self.redis.delete(f"vcache:{key}") > 0 # Usage in CDC event processor:# # def process_cdc_event(event):# version = event['source']['lsn'] # Use log sequence number as version# entity_id = event['after']['id']# new_data = event['after']# # # This will reject the update if an event with higher LSN already processed# updated = cache.set_if_newer(# key=f"product:{entity_id}",# value=new_data,# version=version# )# # if updated:# print(f"Cache updated to version {version}")# else:# print(f"Rejected stale update (version {version})")Challenge 3: Invalidation During Cache Population
Another race condition:
T1: Cache miss occurs
T2: Read from database (slow query)
T3: While query running, data changes in DB
T4: Invalidation event processed (cache is empty, no-op)
T5: Query completes, stale result written to cache
T6: Cache now holds stale data with no pending invalidation
Mitigation: Cache-Aside with Version Validation
Before writing to cache, validate that the data is still current. One approach: include a version/timestamp in the cached data and check against the source before writing.
Alternatively, use very short "guard" TTL on freshly written entries so they expire quickly even if an invalidation was missed.
These race conditions are inherent to eventually consistent systems. You can minimize their window (faster event delivery, shorter queries) or mitigate their impact (version checks, short fallback TTL) but cannot eliminate them entirely. If you require absolute consistency, you may need to avoid caching or use synchronous invalidation (which has its own latency costs).
Distributed systems fail. Networks partition, services crash, queues overflow. Event-based invalidation must be resilient to these failures while maintaining acceptable consistency bounds.
Failure Mode 1: Message Bus Unavailable
If Kafka/Redis/RabbitMQ goes down, invalidation events can't be delivered.
Mitigation:
Failure Mode 2: Consumer Crash/Slowdown
A cache invalidator service crashes or falls behind processing events.
Mitigation:
Failure Mode 3: Event Loss
With at-most-once delivery systems (Redis Pub/Sub), events can be lost.
Mitigation:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
from kafka import KafkaConsumerfrom kafka.errors import KafkaErrorimport redisimport jsonimport timeimport loggingfrom typing import Optionalfrom dataclasses import dataclassfrom prometheus_client import Counter, Gauge, Histogram # Metrics for monitoringEVENTS_PROCESSED = Counter('cache_invalidation_events_processed_total', 'Total events processed', ['status'])CONSUMER_LAG = Gauge('cache_invalidation_consumer_lag', 'Consumer lag in messages')EVENT_PROCESSING_TIME = Histogram('cache_invalidation_processing_seconds', 'Event processing time') @dataclassclass InvalidationEvent: entity_type: str entity_id: str operation: str timestamp: int version: Optional[int] = None class ResilientInvalidationConsumer: """ Kafka consumer for cache invalidation with failure handling. Features: - Automatic reconnection on broker failures - Dead letter queue for failed events - Lag monitoring and alerting - Graceful shutdown """ def __init__( self, kafka_brokers: list[str], topics: list[str], cache: redis.Redis, dlq_topic: str = 'cache-invalidation-dlq', max_retries: int = 3 ): self.brokers = kafka_brokers self.topics = topics self.cache = cache self.dlq_topic = dlq_topic self.max_retries = max_retries self.running = False self.logger = logging.getLogger(__name__) self.consumer = None self._init_consumer() def _init_consumer(self): """Initialize Kafka consumer with resilient settings.""" self.consumer = KafkaConsumer( *self.topics, bootstrap_servers=self.brokers, group_id='cache-invalidator', value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='earliest', # Don't skip events on restart enable_auto_commit=False, # Manual commit after processing max_poll_interval_ms=300000, # 5 min max processing time session_timeout_ms=45000, heartbeat_interval_ms=15000 ) def run(self): """Main processing loop with error recovery.""" self.running = True while self.running: try: self._process_batch() except KafkaError as e: self.logger.error(f"Kafka error: {e}") EVENTS_PROCESSED.labels(status='kafka_error').inc() self._reconnect() except Exception as e: self.logger.error(f"Unexpected error: {e}") EVENTS_PROCESSED.labels(status='error').inc() time.sleep(1) # Brief pause before retry def _process_batch(self): """Process a batch of messages with error handling per message.""" messages = self.consumer.poll(timeout_ms=1000, max_records=100) for topic_partition, records in messages.items(): for record in records: try: with EVENT_PROCESSING_TIME.time(): self._process_single(record) EVENTS_PROCESSED.labels(status='success').inc() except Exception as e: self.logger.error(f"Failed to process event: {e}") self._handle_failed_event(record, e) EVENTS_PROCESSED.labels(status='failed').inc() # Commit after successful processing if messages: self.consumer.commit() # Update lag metric self._update_lag_metric() def _process_single(self, record): """Process a single invalidation event.""" event = self._parse_event(record.value) if event.operation in ('delete', 'update', 'create'): cache_key = f"{event.entity_type}:{event.entity_id}" self.cache.delete(cache_key) self.logger.debug(f"Invalidated cache key: {cache_key}") def _parse_event(self, raw: dict) -> InvalidationEvent: """Parse raw event into structured format.""" payload = raw.get('payload', raw) return InvalidationEvent( entity_type=payload.get('entity_type') or payload.get('source', {}).get('table'), entity_id=str(payload.get('entity_id') or payload.get('after', {}).get('id')), operation=payload.get('op', 'update'), timestamp=payload.get('ts_ms', int(time.time() * 1000)), version=payload.get('version') ) def _handle_failed_event(self, record, error: Exception): """Send failed event to dead letter queue for manual review.""" # In production, publish to DLQ topic # For now, log with full context self.logger.error( f"Event failed after retries. " f"Topic: {record.topic}, Partition: {record.partition}, " f"Offset: {record.offset}, Error: {error}" ) def _update_lag_metric(self): """Calculate and update consumer lag metric.""" try: end_offsets = self.consumer.end_offsets(self.consumer.assignment()) committed = self.consumer.committed(self.consumer.assignment()) total_lag = 0 for tp in self.consumer.assignment(): end = end_offsets.get(tp, 0) current = committed.get(tp) if current is not None: total_lag += end - current.offset CONSUMER_LAG.set(total_lag) except Exception as e: self.logger.warning(f"Failed to calculate lag: {e}") def _reconnect(self): """Reconnect to Kafka after failure.""" self.logger.info("Attempting to reconnect to Kafka...") time.sleep(5) # Backoff before reconnect try: self.consumer.close() except: pass self._init_consumer() self.logger.info("Reconnected to Kafka") def shutdown(self): """Graceful shutdown.""" self.running = False if self.consumer: self.consumer.close()Never rely solely on event-based invalidation. Always layer TTL underneath events. Events provide fast invalidation in the happy path; TTL provides safety net when events fail. This 'belt and suspenders' approach is standard in production caching architectures.
Understanding when to use each invalidation strategy is crucial. Most production systems use both, but the balance depends on specific requirements.
| Factor | TTL-based | Event-based | Winner |
|---|---|---|---|
| Implementation Complexity | Simple (just a timestamp) | Complex (pub/sub, CDC, consumers) | TTL |
| Operational Overhead | Minimal | Significant (message bus, monitoring) | TTL |
| Consistency Guarantee | Bounded staleness (≤ TTL) | Near real-time (<1s typical) | Event |
| Infrastructure Required | None (built into cache) | Message bus, CDC, consumers | TTL |
| Scalability of Writes | Independent of cache count | Events fan out to all caches | TTL |
| Failure Handling | Automatic (time passes) | Requires explicit handling | TTL |
| Data Freshness | Eventually (on expiration) | Immediately (on change) | Event |
| Debugging | Simple (check TTL values) | Complex (trace event flow) | TTL |
Decision Framework:
Use TTL-only when:
Add event-based invalidation when:
Common Hybrid Patterns:
In most systems, 80% of data can tolerate TTL-based staleness. Focus event-based invalidation on the 20% where it truly matters. This keeps complexity bounded while providing strong guarantees where needed.
Event-based invalidation provides the precision that TTL cannot, but at the cost of significant complexity. Let's consolidate the key insights:
What's Next:
Event invalidation decides when to evict; we also need strategies for which entries to evict when cache is full. The next page explores LRU (Least Recently Used) eviction—the most common algorithm for selecting which cached entries to discard when memory pressure demands capacity management.
You now understand the architecture, implementation, and trade-offs of event-based cache invalidation. Combined with TTL knowledge from the previous page, you can design invalidation strategies that balance freshness, complexity, and reliability for any production workload.