Loading content...
In the previous page, we explored idempotent consumers—application-level handling of duplicate messages. But what if we could prevent duplicates from reaching consumers in the first place?
Deduplication strategies operate at the infrastructure level, intercepting and filtering duplicate messages before they propagate through the system. This provides defense-in-depth: even if a consumer's idempotency mechanism has a bug, the infrastructure layer catches duplicates.
This page explores the full spectrum of deduplication approaches: where to deduplicate, how to identify duplicates, time-windowed deduplication, content-based deduplication, and distributed deduplication at scale.
By the end of this page, you will understand the layers at which deduplication can be applied, the trade-offs between different deduplication strategies, how to implement time-windowed and content-based deduplication, and how to design distributed deduplication systems that scale to billions of messages.
Deduplication can be implemented at multiple layers of a messaging system. Each layer offers different trade-offs in terms of reliability, performance, and complexity.
| Layer | Deduplication Point | Scope | Pros | Cons |
|---|---|---|---|---|
| Producer | Before sending to broker | Single producer instance | Prevents network retries | Doesn't catch broker-level duplicates |
| Broker | At message acceptance | All messages in topic/queue | Centralized, comprehensive | Broker performance impact |
| Gateway | Before consumer delivery | All consumers in group | Protects all downstream consumers | Additional infrastructure component |
| Consumer | At message processing | Single consumer logic | Full control, business-aware | Repeated work per consumer |
Defense-in-Depth: Multiple Layers
Production systems often implement deduplication at multiple layers. This provides redundancy—if one layer misses a duplicate, another catches it.
A typical defense-in-depth strategy:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
/** * Multi-layer deduplication architecture * * Each layer catches different types of duplicates: * - Producer: Network retry duplicates * - Broker: Producer restart duplicates * - Consumer: Broker redelivery duplicates */ // Layer 1: Idempotent Producerclass IdempotentProducer { private sequenceNumber: number = 0; private pendingMessages: Map<number, Message> = new Map(); async send(message: Message): Promise<void> { const seqNum = ++this.sequenceNumber; // Broker tracks (producerId, sequenceNumber) tuples // If we retry and send same sequence, broker ignores await this.broker.send({ ...message, producerId: this.producerId, sequenceNumber: seqNum, }); }} // Layer 2: Broker Deduplicationclass BrokerDeduplication { private recentMessageIds: Map<string, number> = new Map(); // messageId → timestamp private windowMs: number = 5 * 60 * 1000; // 5-minute window acceptMessage(message: Message): boolean { const now = Date.now(); // Check if we've seen this message ID recently const lastSeen = this.recentMessageIds.get(message.id); if (lastSeen && (now - lastSeen) < this.windowMs) { console.log(`Broker: Duplicate message ${message.id} rejected`); return false; // Duplicate within window } // Accept and record this.recentMessageIds.set(message.id, now); this.cleanupOldEntries(now); return true; } private cleanupOldEntries(now: number): void { for (const [id, timestamp] of this.recentMessageIds) { if (now - timestamp > this.windowMs) { this.recentMessageIds.delete(id); } } }} // Layer 3: Consumer Deduplicationclass DedupConsumer { async processMessage(message: Message): Promise<void> { // Final layer: Database-backed deduplication const processed = await this.db.processedMessages.findUnique({ where: { messageId: message.id }, }); if (processed) { console.log(`Consumer: Duplicate ${message.id}, skipping`); await message.ack(); return; } // Process and record atomically await this.db.$transaction([ this.businessLogic(message), this.db.processedMessages.create({ data: { messageId: message.id }, }), ]); await message.ack(); }}Earlier deduplication is more efficient (prevents unnecessary work downstream) but less reliable (limited context). Later deduplication is more reliable (full context, persistent tracking) but more expensive (wasted processing up to that point). Use earlier layers for efficiency and later layers for correctness.
The most common deduplication strategy uses unique message identifiers. Each message has an ID, and the system tracks which IDs have been processed.
Message ID Sources:
Message IDs can come from various sources, each with different characteristics:
| Source | Example | Uniqueness Guarantee | When to Use |
|---|---|---|---|
| Broker-assigned | Kafka offset, RabbitMQ delivery-tag | Unique within partition/queue | When broker provides stable IDs |
| Producer-assigned UUID | 550e8400-e29b-41d4-a716-446655440000 | Globally unique (practically) | Default approach for most systems |
| Business key | order-12345 | Unique per business entity | When natural keys exist |
| Content hash | sha256(message_body) | Unique per content | When same content = same message |
| Composite | producer-1:seq-1234:ts-1704700000 | Context-dependent | Complex deduplication requirements |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
import { v4 as uuidv4 } from 'uuid'; /** * Identifier-based deduplication service */class IdentifierDeduplication { private storage: DeduplicationStorage; private ttl: number; constructor(storage: DeduplicationStorage, ttlSeconds: number = 86400) { this.storage = storage; this.ttl = ttlSeconds; } /** * Check if a message is a duplicate and record it if not. * * @returns true if duplicate (should skip), false if new (should process) */ async isDuplicate(messageId: string): Promise<boolean> { // Atomic check-and-set operation const wasSet = await this.storage.setIfNotExists( `dedup:${messageId}`, Date.now().toString(), this.ttl ); // setIfNotExists returns true if we set it (new message) // Returns false if it already existed (duplicate) return !wasSet; } /** * Generate a dedup-friendly message ID */ static generateMessageId(producerId: string): string { // Format: {producerId}-{timestamp}-{uuid} // Includes producer for debugging, timestamp for ordering, // UUID for guaranteed uniqueness return `${producerId}-${Date.now()}-${uuidv4()}`; }} // Storage implementationsclass RedisDeduplicationStorage implements DeduplicationStorage { private redis: Redis; async setIfNotExists(key: string, value: string, ttlSeconds: number): Promise<boolean> { // Redis SETNX with expiry - atomic check-and-set const result = await this.redis.set(key, value, 'EX', ttlSeconds, 'NX'); return result === 'OK'; }} class PostgresDeduplicationStorage implements DeduplicationStorage { private db: Database; async setIfNotExists(key: string, value: string, ttlSeconds: number): Promise<boolean> { try { await this.db.deduplicationRecords.create({ data: { messageId: key, processedAt: new Date(), expiresAt: new Date(Date.now() + ttlSeconds * 1000), }, }); return true; // Successfully inserted (new message) } catch (error: any) { if (error.code === 'P2002') { return false; // Unique constraint violation (duplicate) } throw error; } }}Kafka's Built-in Deduplication:
Kafka provides identifier-based deduplication through its idempotent producer feature:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546
import { Kafka } from 'kafkajs'; const kafka = new Kafka({ clientId: 'order-service', brokers: ['kafka:9092'],}); /** * Kafka's idempotent producer provides automatic deduplication * at the broker level. * * How it works: * 1. Producer gets a unique ProducerId (PID) from the broker * 2. Each message includes (PID, sequenceNumber) * 3. Broker tracks the last sequenceNumber per PID per partition * 4. Duplicate (PID, seq) pairs are silently ignored */const producer = kafka.producer({ idempotent: true, // Enable broker-level deduplication // With idempotency enabled, these settings are required: maxInFlightRequests: 5, // Max 5 (Kafka limitation) acks: -1, // Wait for all replicas (required for safety)}); await producer.connect(); // Even if this is retried multiple times due to network issues,// the broker ensures the message appears exactly once in the logawait producer.send({ topic: 'orders', messages: [{ key: orderId, value: JSON.stringify(order), }],}); /** * Kafka idempotent producer dedup scope: * * ✓ Catches: Network retry duplicates (same producer session) * ✗ Misses: Producer restart duplicates (new PID, new sequence) * ✗ Misses: Consumer-side duplicates (redelivery after crash) * * For complete coverage, also need consumer-side deduplication */Kafka's idempotent producer uses a Producer ID (PID) assigned at startup. When the producer restarts, it gets a new PID, and the broker cannot correlate new messages with old ones. Producer restarts can still cause duplicates that consumer-side deduplication must handle.
Sometimes two messages with different IDs represent the same logical event. Content-based deduplication identifies duplicates by examining the message payload itself, typically using cryptographic hashes.
When Content-Based Deduplication is Needed:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
import { createHash } from 'crypto'; class ContentBasedDeduplication { private storage: DeduplicationStorage; private ttl: number; /** * Generate a content hash for a message. * * Includes only the fields that define message identity, * excluding metadata like timestamps or message IDs. */ generateContentHash(message: any): string { // Extract identity-defining fields only const contentToHash = { type: message.type, entityId: message.entityId, operation: message.operation, payload: message.payload, // Explicitly exclude: message.id, message.timestamp, message.metadata }; const serialized = this.canonicalSerialize(contentToHash); return createHash('sha256').update(serialized).digest('hex'); } /** * Canonical serialization ensures identical objects produce identical strings * regardless of property order. */ private canonicalSerialize(obj: any): string { if (obj === null || obj === undefined) { return 'null'; } if (typeof obj !== 'object') { return JSON.stringify(obj); } if (Array.isArray(obj)) { return '[' + obj.map(item => this.canonicalSerialize(item)).join(',') + ']'; } // Sort keys for consistent ordering const sortedKeys = Object.keys(obj).sort(); const pairs = sortedKeys.map(key => `"${key}":${this.canonicalSerialize(obj[key])}` ); return '{' + pairs.join(',') + '}'; } async isDuplicate(message: any): Promise<boolean> { const contentHash = this.generateContentHash(message); return await this.storage.isDuplicate(`content:${contentHash}`); }} // Usageconst dedup = new ContentBasedDeduplication(storage, 3600); // These two messages have different IDs but identical content// Content-based dedup will catch this as a duplicateconst message1 = { id: 'msg-001', timestamp: 1704700000000, type: 'ORDER_CREATED', entityId: 'order-123', payload: { items: ['A', 'B'], total: 99.99 }}; const message2 = { id: 'msg-002', // Different ID! timestamp: 1704700001000, // Different timestamp! type: 'ORDER_CREATED', entityId: 'order-123', payload: { items: ['A', 'B'], total: 99.99 }}; await dedup.isDuplicate(message1); // false (new)await dedup.isDuplicate(message2); // true (duplicate by content)Semantic Deduplication:
Some systems require even more sophisticated deduplication based on business semantics rather than exact content match:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
/** * Semantic deduplication considers business meaning, * not just byte-level content equality. */ class SemanticDeduplication { /** * Extract the semantic identity of a message. * Two messages are duplicates if they have the same semantic identity. */ getSemanticId(message: any): string { switch (message.type) { case 'ORDER_CREATED': // For orders, the orderId defines identity // Multiple ORDER_CREATED for same orderId = duplicate return `order-created:${message.payload.orderId}`; case 'PAYMENT_COMPLETED': // For payments, orderId + amount defines identity // (same order could have multiple partial payments) return `payment:${message.payload.orderId}:${message.payload.amount}`; case 'USER_PROFILE_UPDATED': // For profile updates, user + version defines identity // Only care about latest version return `profile:${message.payload.userId}:${message.payload.version}`; case 'INVENTORY_CHANGED': // For inventory, sku + final quantity defines identity // Multiple messages with same final state = duplicate return `inventory:${message.payload.sku}:${message.payload.finalQuantity}`; default: // Fallback to content hash return `unknown:${this.hashContent(message)}`; } } async isDuplicate(message: any): Promise<boolean> { const semanticId = this.getSemanticId(message); return await this.storage.isDuplicate(semanticId); }} /** * Example: These messages are semantically equivalent * (same order creation, different message structure) */const msg1 = { type: 'ORDER_CREATED', payload: { orderId: '12345', items: ['A', 'B'], createdBy: 'user-1' }}; const msg2 = { type: 'ORDER_CREATED', payload: { orderId: '12345', items: ['A', 'B'], createdBy: 'user-1', notes: 'Added later' }}; // Both produce semanticId "order-created:12345"// Second message is treated as duplicate despite different contentWhile SHA-256 hash collisions are practically impossible (10^38 messages before 50% collision probability), always use robust hashing algorithms. Never use MD5 or short hashes for deduplication in production systems.
Deduplication requires storing processed message IDs. Storing them forever isn't practical—the storage would grow unboundedly. Time-windowed deduplication keeps track of messages only for a defined window, after which duplicates are no longer detected.
The critical question: How long should the deduplication window be?
Window Size Considerations:
The window must be longer than the maximum possible time between a message's first delivery and its duplicate's arrival. This includes:
| Use Case | Recommended Window | Rationale |
|---|---|---|
| Real-time streaming | 5-15 minutes | Short retry windows, fast processing |
| Standard message queues | 1-4 hours | Covers typical outage + recovery |
| Batch processing | 24-48 hours | Daily jobs with retry windows |
| Financial/compliance | 7-90 days | Regulatory requirements, dispute resolution |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
class TimeWindowedDeduplication { /** * Redis-based time-windowed deduplication. * Messages are tracked for a fixed time window, * then automatically expire. */ private redis: Redis; private windowSeconds: number; private keyPrefix: string; constructor(redis: Redis, windowMinutes: number, keyPrefix: string = 'dedup') { this.redis = redis; this.windowSeconds = windowMinutes * 60; this.keyPrefix = keyPrefix; } async isDuplicate(messageId: string): Promise<boolean> { const key = `${this.keyPrefix}:${messageId}`; // SETNX with expiry - atomic check and set // Returns null if key already exists (duplicate) // Returns 'OK' if key was set (new message) const result = await this.redis.set( key, '1', 'EX', this.windowSeconds, 'NX' // Only set if not exists ); // If result is null, the key existed → duplicate return result === null; } /** * Check multiple message IDs at once (for batch processing) */ async checkBatch(messageIds: string[]): Promise<Map<string, boolean>> { const pipeline = this.redis.pipeline(); for (const id of messageIds) { pipeline.set( `${this.keyPrefix}:${id}`, '1', 'EX', this.windowSeconds, 'NX' ); } const results = await pipeline.exec(); const duplicates = new Map<string, boolean>(); results?.forEach((result, index) => { // result[1] is the value returned by SET NX // null means key existed (duplicate) duplicates.set(messageIds[index], result[1] === null); }); return duplicates; }} // Usage with different windows for different message typesconst realTimeDedup = new TimeWindowedDeduplication(redis, 15, 'rt'); // 15 minconst batchDedup = new TimeWindowedDeduplication(redis, 1440, 'batch'); // 24 hoursconst auditDedup = new TimeWindowedDeduplication(redis, 10080, 'audit'); // 7 days1234567891011121314151617181920212223242526272829303132333435363738394041
/** * Sliding window deduplication with first-seen timestamp tracking. * This allows us to know when a message was first seen, * useful for debugging and analytics. */ class SlidingWindowDeduplication { private redis: Redis; private windowMs: number; async checkAndRecord(messageId: string): Promise<{ isDuplicate: boolean; firstSeenAt?: Date; }> { const key = `dedup:${messageId}`; const now = Date.now(); // Use HSETNX to store first-seen timestamp atomically const wasSet = await this.redis.hsetnx(key, 'firstSeen', now.toString()); if (wasSet) { // New message - set expiry await this.redis.pexpire(key, this.windowMs); return { isDuplicate: false }; } // Duplicate - retrieve first-seen time const firstSeen = await this.redis.hget(key, 'firstSeen'); return { isDuplicate: true, firstSeenAt: firstSeen ? new Date(parseInt(firstSeen)) : undefined, }; }} // Usageconst result = await dedup.checkAndRecord('msg-12345');if (result.isDuplicate) { console.log(`Duplicate! First seen: ${result.firstSeenAt}`); // Useful for monitoring duplicate rates and debugging}If your deduplication window is 1 hour but a message is retried after 2 hours (from a DLQ, for example), the duplicate won't be caught. Always size your window for worst-case scenarios, not typical cases. When in doubt, err on the side of longer windows.
At scale, deduplication becomes a distributed systems problem. When you're processing millions of messages per second across dozens of consumer instances, a single deduplication store becomes a bottleneck.
Scaling Challenges:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
/** * Partitioned Deduplication * * Distribute dedup checks across multiple shards based on message ID. * Each shard handles a subset of the ID space. */ class PartitionedDeduplication { private shards: Redis[]; private numShards: number; constructor(shardConnections: Redis[]) { this.shards = shardConnections; this.numShards = shardConnections.length; } /** * Determine which shard owns a given message ID. * Uses consistent hashing to distribute evenly. */ private getShardIndex(messageId: string): number { // Simple hash-based partitioning // For production, use consistent hashing (e.g., jump hash) let hash = 0; for (let i = 0; i < messageId.length; i++) { hash = ((hash << 5) - hash) + messageId.charCodeAt(i); hash = hash & hash; // Convert to 32-bit integer } return Math.abs(hash) % this.numShards; } async isDuplicate(messageId: string): Promise<boolean> { const shardIndex = this.getShardIndex(messageId); const shard = this.shards[shardIndex]; const result = await shard.set( `dedup:${messageId}`, '1', 'EX', 3600, 'NX' ); return result === null; } /** * Batch check across shards. * Groups IDs by shard, executes in parallel. */ async checkBatch(messageIds: string[]): Promise<Map<string, boolean>> { // Group IDs by shard const shardGroups: Map<number, string[]> = new Map(); for (const id of messageIds) { const shardIndex = this.getShardIndex(id); if (!shardGroups.has(shardIndex)) { shardGroups.set(shardIndex, []); } shardGroups.get(shardIndex)!.push(id); } // Execute checks in parallel across shards const results = new Map<string, boolean>(); const promises: Promise<void>[] = []; for (const [shardIndex, ids] of shardGroups) { const promise = this.checkShardBatch(shardIndex, ids, results); promises.push(promise); } await Promise.all(promises); return results; } private async checkShardBatch( shardIndex: number, ids: string[], results: Map<string, boolean> ): Promise<void> { const shard = this.shards[shardIndex]; const pipeline = shard.pipeline(); for (const id of ids) { pipeline.set(`dedup:${id}`, '1', 'EX', 3600, 'NX'); } const pipelineResults = await pipeline.exec(); pipelineResults?.forEach((result, index) => { results.set(ids[index], result[1] === null); }); }} // Deploy: 16 Redis shards across 4 servers// Each shard handles 1/16th of the ID space// Scales to ~160K dedup checks/second (10K per shard)1234567891011121314151617181920212223242526272829303132333435363738394041424344
/** * Two-tier deduplication: Local cache + distributed store. * * Local cache catches recent duplicates instantly. * Distributed store provides durability and cross-instance dedup. */ import { LRUCache } from 'lru-cache'; class HybridDeduplication { private localCache: LRUCache<string, boolean>; private distributedStore: PartitionedDeduplication; constructor(distributedStore: PartitionedDeduplication) { this.localCache = new LRUCache({ max: 100_000, // 100K entries ttl: 1000 * 60 * 5, // 5 minute local TTL }); this.distributedStore = distributedStore; } async isDuplicate(messageId: string): Promise<boolean> { // Layer 1: Check local cache (fastest) if (this.localCache.has(messageId)) { return true; // Definitely duplicate (cached locally) } // Layer 2: Check distributed store const isDup = await this.distributedStore.isDuplicate(messageId); if (!isDup) { // New message - add to local cache this.localCache.set(messageId, true); } return isDup; }} // Performance characteristics:// - Local cache hit: ~0.001ms (microseconds)// - Redis hit: ~0.5-2ms (network round-trip)// - With 80% local cache hit rate, average latency drops from 1ms to 0.2ms// - 5x improvement in dedup check latencyCross-Region Deduplication:
For globally distributed systems, deduplication across regions requires special handling:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
/** * Cross-region deduplication strategies. * * Challenge: We want duplicates caught even when processing * moves between regions, but cross-region latency is expensive. */ // Strategy 1: Synchronous Cross-Region Check (Latency-heavy)class SyncCrossRegionDedup { async isDuplicate(messageId: string): Promise<boolean> { // Check all regions - high latency (50-200ms per region) const results = await Promise.all([ this.usEastStore.isDuplicate(messageId), this.euWestStore.isDuplicate(messageId), this.apSouthStore.isDuplicate(messageId), ]); return results.some(isDup => isDup); }} // Strategy 2: Async Replication (Eventual consistency)class AsyncReplicatedDedup { private localStore: DeduplicationStore; private replicationQueue: MessageQueue; async isDuplicate(messageId: string): Promise<boolean> { // Check only local store (fast) const isDup = await this.localStore.isDuplicate(messageId); if (!isDup) { // New message - replicate to other regions asynchronously await this.replicationQueue.send({ type: 'DEDUP_RECORD', messageId, region: process.env.REGION, timestamp: Date.now(), }); } return isDup; }} // Strategy 3: Region-Affine Message IDs (Recommended)class RegionAffineDedup { /** * Include region in message ID or routing key. * All duplicates of a message go to the same region. * No cross-region dedup needed. */ private localStore: DeduplicationStore; private currentRegion: string; async isDuplicate(messageId: string): Promise<boolean> { // Extract region from message routing const messageRegion = this.extractRegion(messageId); if (messageRegion !== this.currentRegion) { // Message belongs to different region - not our responsibility // This shouldn't happen if routing is correct throw new Error(`Message ${messageId} routed to wrong region`); } // Check only local store return await this.localStore.isDuplicate(messageId); } extractRegion(messageId: string): string { // Example: "us-east:msg-12345" → "us-east" return messageId.split(':')[0]; }} /** * Recommendation: * * Prefer region-affine routing where possible. * - Route messages to fixed regions based on partition key * - No cross-region dedup needed * - No cross-region latency * - Simplest to operate * * Use async replication only when region affinity is impossible. */The best cross-region deduplication strategy is to avoid it. Use consistent partitioning to ensure related messages always go to the same region, eliminating the need for cross-region coordination.
For extremely high-volume systems where storing all message IDs is impractical, probabilistic data structures offer space-efficient alternatives. These structures trade perfect accuracy for dramatically reduced memory usage.
Bloom Filters for Deduplication:
Bloom filters are probabilistic structures that answer: "Have I seen this ID before?" They have:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
import { BloomFilter } from 'bloom-filters'; class BloomFilterDeduplication { private filter: BloomFilter; private backupStore: DeduplicationStore; private falsePositiveRate: number; constructor( expectedItems: number, falsePositiveRate: number = 0.001 // 0.1% ) { // Calculate optimal size for target false positive rate // 1 billion items at 0.1% FP rate ≈ 1.4 GB memory this.filter = BloomFilter.create(expectedItems, falsePositiveRate); this.falsePositiveRate = falsePositiveRate; } async isDuplicate(messageId: string): Promise<boolean> { // Fast check: Bloom filter if (!this.filter.has(messageId)) { // DEFINITELY new - bloom filters have no false negatives this.filter.add(messageId); return false; } // MAYBE duplicate - could be false positive // Fall back to authoritative store const confirmed = await this.backupStore.isDuplicate(messageId); if (!confirmed) { // False positive! Actually new message. // Already in bloom filter, record in backup store await this.backupStore.record(messageId); } return confirmed; } /** * Memory usage calculation: * * For n items with false positive rate p: * bits = -n * ln(p) / (ln(2))^2 * * Examples: * - 1M items, 0.1% FP: ~1.4 MB * - 100M items, 0.1% FP: ~143 MB * - 1B items, 0.1% FP: ~1.4 GB * * Compare to storing 1B 36-char UUIDs: ~36 GB */}Cuckoo Filters:
Cuckoo filters are an improvement over Bloom filters that support deletion (Bloom filters don't). This is useful for time-windowed deduplication where old entries must be removed.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
import { CuckooFilter } from 'bloom-filters'; class TimeWindowedCuckooDedup { private currentWindow: CuckooFilter; private previousWindow: CuckooFilter; private windowDurationMs: number; private lastRotation: number; constructor( expectedItems: number, windowMinutes: number = 60 ) { // Two windows for time-windowed dedup with rotation this.currentWindow = CuckooFilter.create(expectedItems); this.previousWindow = CuckooFilter.create(expectedItems); this.windowDurationMs = windowMinutes * 60 * 1000; this.lastRotation = Date.now(); } async isDuplicate(messageId: string): Promise<boolean> { await this.maybeRotate(); // Check both windows if (this.currentWindow.has(messageId) || this.previousWindow.has(messageId)) { return true; } // New message - add to current window this.currentWindow.add(messageId); return false; } private async maybeRotate(): Promise<void> { const now = Date.now(); if (now - this.lastRotation > this.windowDurationMs) { // Rotate windows this.previousWindow = this.currentWindow; this.currentWindow = CuckooFilter.create(this.currentWindow.size); this.lastRotation = now; } }} /** * Cuckoo filter advantages: * - Supports deletion (can remove expired entries) * - Better space efficiency than Bloom for false positive rates < 3% * - Faster lookups than Bloom filter * * Trade-offs: * - More complex implementation * - Insertion can fail if filter is too full */| Approach | Memory (1B messages) | Accuracy | Deletion Support | Best For |
|---|---|---|---|---|
| Hash Set | ~36 GB | 100% | Yes | Moderate scale |
| Sorted Set (Redis) | ~40 GB | 100% | Yes | Time-windowed dedup |
| Bloom Filter | ~1.4 GB | 99.9% | No | High volume, immutable |
| Cuckoo Filter | ~1.1 GB | 99.9% | Yes | High volume, time-windowed |
| HyperLogLog | ~12 KB | ~98% | No | Approximate counting only |
While probabilistic structures are space-efficient, false positives mean some legitimate new messages may be incorrectly flagged as duplicates. For critical messages, always combine with a deterministic fallback store. Use probabilistic structures as a first-tier filter, not the only dedup mechanism.
Deduplication systems require careful monitoring to ensure they're working correctly and to catch issues before they cause duplicate processing or missed messages.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
class MonitoredDeduplication { private metrics: MetricsClient; private underlying: DeduplicationStore; async isDuplicate(messageId: string): Promise<boolean> { const startTime = Date.now(); try { const result = await this.underlying.isDuplicate(messageId); // Record latency const latency = Date.now() - startTime; this.metrics.histogram('dedup.check_latency_ms', latency); // Record duplicate rate this.metrics.increment('dedup.checks_total'); if (result) { this.metrics.increment('dedup.duplicates_found'); } return result; } catch (error) { this.metrics.increment('dedup.check_errors'); throw error; } } /** * Periodic health check and reporting */ async reportHealth(): Promise<void> { // Report store size const storeSize = await this.underlying.getSize(); this.metrics.gauge('dedup.store_size', storeSize); // Report memory usage const memoryUsage = await this.underlying.getMemoryUsage(); this.metrics.gauge('dedup.memory_bytes', memoryUsage); // Check for anomalies const duplicateRate = await this.calculateDuplicateRate(); if (duplicateRate > 0.05) { // >5% duplicate rate console.warn(`High duplicate rate: ${(duplicateRate * 100).toFixed(2)}%`); this.metrics.increment('dedup.high_duplicate_rate_alert'); } if (duplicateRate < 0.0001) { // <0.01% duplicate rate console.warn(`Suspiciously low duplicate rate: ${(duplicateRate * 100).toFixed(4)}%`); // This might indicate dedup is not working correctly this.metrics.increment('dedup.low_duplicate_rate_alert'); } } private async calculateDuplicateRate(): Promise<number> { const total = await this.metrics.getCounter('dedup.checks_total'); const duplicates = await this.metrics.getCounter('dedup.duplicates_found'); return total > 0 ? duplicates / total : 0; }}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
class DeduplicationMaintenance { private db: Database; private redis: Redis; private metrics: MetricsClient; /** * Cleanup expired deduplication records. * Should run periodically (e.g., hourly). */ async cleanupExpiredRecords(): Promise<void> { console.log('Starting dedup cleanup...'); const startTime = Date.now(); // Database cleanup const dbDeleted = await this.db.dedupRecords.deleteMany({ where: { expiresAt: { lt: new Date() }, }, }); console.log(`Deleted ${dbDeleted.count} expired DB records`); this.metrics.gauge('dedup.cleanup.db_deleted', dbDeleted.count); // Redis cleanup happens automatically via TTL // But we can force cleanup of keys without TTL const redisCleanup = await this.cleanupRedisOrphans(); const duration = Date.now() - startTime; console.log(`Cleanup completed in ${duration}ms`); this.metrics.histogram('dedup.cleanup.duration_ms', duration); } /** * Verify deduplication is working correctly. * Run periodically as a health check. */ async verifyDeduplication(): Promise<boolean> { const testId = `test-${Date.now()}-${Math.random()}`; // First check should return false (new) const firstCheck = await this.dedupStore.isDuplicate(testId); if (firstCheck) { console.error('Dedup verification FAILED: New ID flagged as duplicate'); this.metrics.increment('dedup.verification.failed'); return false; } // Second check should return true (duplicate) const secondCheck = await this.dedupStore.isDuplicate(testId); if (!secondCheck) { console.error('Dedup verification FAILED: Duplicate not detected'); this.metrics.increment('dedup.verification.failed'); return false; } console.log('Dedup verification PASSED'); this.metrics.increment('dedup.verification.passed'); return true; } /** * Handle dedup store failure gracefully. * When dedup fails, we must decide: skip or process? */ async handleDedupFailure( messageId: string, error: Error, message: Message ): Promise<'process' | 'skip'> { console.error(`Dedup check failed for ${messageId}: ${error.message}`); this.metrics.increment('dedup.failures'); // Decision depends on business requirements: // Option 1: Fail-open (process anyway, risk duplicates) // Use when: Duplicates are tolerable, message loss is not // return 'process'; // Option 2: Fail-closed (skip, rely on redelivery) // Use when: Duplicates are dangerous // return 'skip'; // Option 3: Circuit breaker (fail fast if too many failures) if (await this.isDedupCircuitOpen()) { throw new Error('Dedup circuit breaker open'); } // Default: process, log for investigation return 'process'; }}Run regular verification tests to ensure deduplication is working. A subtle misconfiguration (wrong TTL, incorrect key format) can cause the entire system to silently fail at catching duplicates. Automated verification catches these issues before they cause production incidents.
Deduplication is a critical defense layer in reliable messaging systems. Let's consolidate the key insights:
The Complete Picture:
With this page, we've completed our exploration of delivery guarantees. The full pattern for reliable, exactly-once-processing messaging is:
Together, these three components provide the foundation for building reliable distributed messaging systems at any scale.
You've completed Module 4: Delivery Guarantees. You now understand the full spectrum of message delivery semantics—from at-most-once to at-least-once to exactly-once—and the practical patterns (idempotent consumers, deduplication) that enable reliable message processing in distributed systems. These concepts are fundamental to building robust event-driven and microservices architectures.