Loading learning content...
In the previous pages, we established a critical truth: at-least-once delivery means messages may be delivered multiple times. This is not a bug—it's the fundamental trade-off for ensuring no message is lost.
The question then becomes: How do we design consumers that handle duplicate messages correctly?
The answer is idempotency—designing operations such that executing them multiple times produces the same result as executing them once. Idempotent consumers are the cornerstone of reliable message processing in distributed systems.
This page explores idempotency in depth: what it means formally, how to design idempotent operations, the patterns for implementing idempotency, and the trade-offs involved.
By the end of this page, you will understand the formal definition of idempotency, recognize naturally idempotent vs non-idempotent operations, master multiple patterns for making non-idempotent operations idempotent, and be able to implement production-ready idempotent consumers for complex business workflows.
Idempotency comes from mathematics, where an operation is idempotent if applying it multiple times has the same effect as applying it once.
Formal Definition:
An operation f is idempotent if and only if: f(f(x)) = f(x) for all x in the domain.
Or more generally for repeated applications:
f(x) = f(f(x)) = f(f(f(x))) = ... (any number of applications)
In the context of message processing:
A consumer is idempotent if processing the same message N times (N ≥ 1) produces the same observable system state as processing it exactly once.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546
/** * Mathematical examples of idempotent operations: */ // Idempotent: max(x, x) = xMath.max(5, 5); // Always 5, no matter how many times applied // Idempotent: absolute value |x| = ||x||Math.abs(-5); // Always 5Math.abs(Math.abs(-5)); // Still 5 // NOT idempotent: incrementlet x = 5;x++; // x = 6x++; // x = 7 (different result!) // NOT idempotent: appendlet arr: number[] = [1, 2, 3];arr.push(4); // [1, 2, 3, 4]arr.push(4); // [1, 2, 3, 4, 4] (different result!) /** * In message processing: */ // Idempotent consumer: Same order produces same resultclass IdempotentOrderProcessor { async process(orderId: string, orderData: OrderData): Promise<void> { // Create or update - same data results in same state await this.db.orders.upsert({ where: { orderId }, create: { orderId, ...orderData }, update: { ...orderData }, // Update to same values = no change }); }} // Non-idempotent consumer: Each call changes stateclass NonIdempotentPaymentProcessor { async process(orderId: string, amount: number): Promise<void> { // DANGER: Each call adds another payment! await this.db.payments.create({ data: { orderId, amount }, }); }}Key Insight: Idempotency is About Observable State
An operation doesn't need to be a no-op on subsequent calls—it just needs to produce the same observable outcome. The distinction is subtle but important:
| Scenario | First Call | Second Call | Idempotent? |
|---|---|---|---|
| Set user email to 'x@y.com' | Email becomes 'x@y.com' | Email is already 'x@y.com', no change | Yes ✓ |
| Create user with ID 123 | User created with ID 123 | Error: 'User already exists' | Yes ✓ (state unchanged) |
| Create user with ID 123 | User created with ID 123 | Another user created with ID 124 | No ✗ |
| Add $10 to account | Balance increases by $10 | Balance increases by $10 again | No ✗ |
| Set balance to $100 | Balance becomes $100 | Balance is already $100, no change | Yes ✓ |
| Send welcome email | Email sent | Another email sent | No ✗ |
| Record that welcome email was sent | Flag set to true | Flag already true, no change | Yes ✓ |
Ask yourself: 'If I process this message 10 times instead of once, will my system be in a different state?' If yes, the operation is not idempotent and needs redesign.
Some operations are naturally idempotent by their nature. These are the easiest to work with because they require no special handling for duplicate messages.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
/** * Pattern 1: Upsert with Full State * * Instead of "add item to cart", pass the full cart state. */class IdempotentCartService { async setCart(userId: string, cartItems: CartItem[]): Promise<void> { // Delete existing and recreate (or use upsert) await this.db.$transaction([ this.db.cartItems.deleteMany({ where: { userId } }), this.db.cartItems.createMany({ data: cartItems.map(item => ({ userId, ...item })), }), ]); }} // Message format supports idempotency:// { userId: "123", cartItems: [{ sku: "ABC", qty: 2 }, { sku: "XYZ", qty: 1 }] }// Processing this multiple times results in the same cart state /** * Pattern 2: Absolute Value Sets * * Instead of "increment counter", set to a specific value. */class IdempotentMetricsService { async setCounter(name: string, value: number, timestamp: number): Promise<void> { // Upsert with timestamp to handle ordering await this.db.metrics.upsert({ where: { name }, create: { name, value, timestamp }, update: { // Only update if this is a newer value value: { set: value }, timestamp: { set: timestamp }, // Using a conditional update based on timestamp: // UPDATE metrics SET value = $1, timestamp = $2 // WHERE name = $3 AND timestamp < $2 }, }); }} // Message: { counter: "page_views", value: 15234, timestamp: 1704700000000 }// Duplicate processing sets the same value—no change /** * Pattern 3: Idempotent Delete * * Delete by unique identifier is naturally idempotent. */class IdempotentDeletionService { async deleteAccount(accountId: string): Promise<void> { // Deleting an already-deleted account is a no-op const result = await this.db.accounts.deleteMany({ where: { id: accountId }, }); // result.count will be 0 if already deleted, 1 if deleted // Either way, the end state is the same: account doesn't exist }}When designing message schemas and APIs, prefer naturally idempotent patterns. 'Set cart to [items]' is better than 'Add item to cart'. 'Set inventory to 100' is better than 'Decrement inventory by 1'. This design philosophy eliminates entire categories of duplicate-handling complexity.
Not all operations are naturally idempotent. Many critical business operations—payments, transfers, sends—are inherently non-idempotent. These require explicit patterns to handle safely.
The Core Technique: Idempotency Keys
An idempotency key is a unique identifier for an operation. By tracking which keys have been processed, we can detect and safely ignore duplicates.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
/** * The Idempotency Key Pattern * * Core idea: Every message has a unique ID. Track processed IDs. * If we see the same ID again, skip processing. */ interface IdempotencyRecord { key: string; processedAt: Date; result: any; // Optionally store the result for returning to caller} class IdempotentProcessor<T> { private db: Database; private ttlDays: number = 7; // How long to remember processed keys /** * Process an operation idempotently. * * If this key was already processed, return the cached result. * If not, execute the operation and record the key. */ async processOnce<R>( idempotencyKey: string, operation: () => Promise<R> ): Promise<{ isNew: boolean; result: R }> { // Step 1: Check if already processed const existing = await this.db.idempotencyRecords.findUnique({ where: { key: idempotencyKey }, }); if (existing) { // Duplicate detected! console.log(`Duplicate key ${idempotencyKey}, returning cached result`); return { isNew: false, result: existing.result as R }; } // Step 2: Execute the operation and record atomically // This MUST be atomic to prevent race conditions return await this.db.$transaction(async (tx) => { // Double-check inside transaction (for race conditions) const doubleCheck = await tx.idempotencyRecords.findUnique({ where: { key: idempotencyKey }, }); if (doubleCheck) { return { isNew: false, result: doubleCheck.result as R }; } // Execute the operation const result = await operation(); // Record that we processed this key await tx.idempotencyRecords.create({ data: { key: idempotencyKey, processedAt: new Date(), result: result as any, }, }); return { isNew: true, result }; }); } /** * Cleanup old idempotency records to prevent unbounded growth */ async cleanup(): Promise<number> { const cutoff = new Date(); cutoff.setDate(cutoff.getDate() - this.ttlDays); const result = await this.db.idempotencyRecords.deleteMany({ where: { processedAt: { lt: cutoff } }, }); return result.count; }}Using Idempotency Keys in Practice:
12345678910111213141516171819202122232425262728293031323334353637383940414243
class PaymentProcessor { private idempotent: IdempotentProcessor; private paymentGateway: PaymentGateway; async processPaymentMessage(message: PaymentMessage): Promise<void> { // Use message ID as idempotency key // Same message = same key = no duplicate charge const idempotencyKey = `payment-${message.messageId}`; const { isNew, result } = await this.idempotent.processOnce( idempotencyKey, async () => { // This block executes AT MOST ONCE per idempotencyKey const paymentResult = await this.paymentGateway.charge({ customerId: message.customerId, amount: message.amount, orderId: message.orderId, }); // Record in our database await this.db.payments.create({ data: { paymentId: paymentResult.id, orderId: message.orderId, amount: message.amount, status: 'completed', }, }); return paymentResult; } ); if (isNew) { console.log(`Payment processed: ${result.id}`); } else { console.log(`Duplicate payment message, already processed: ${result.id}`); } // Acknowledge the message in either case await message.ack(); }}Alternative: Using Natural Business Keys
Sometimes the message content itself provides a natural idempotency key:
1234567891011121314151617181920212223242526272829303132333435363738394041
class OrderProcessor { async processOrderMessage(message: OrderMessage): Promise<void> { // Use orderId as natural idempotency key // Same order should only be processed once const existingOrder = await this.db.orders.findUnique({ where: { orderId: message.orderId }, }); if (existingOrder) { // Order already exists - this is a duplicate message if (existingOrder.status === 'completed') { console.log(`Order ${message.orderId} already completed`); await message.ack(); return; } // Order exists but isn't complete - might be stuck // Handle partial completion case... } // Create the order await this.db.orders.create({ data: { orderId: message.orderId, customerId: message.customerId, items: message.items, status: 'pending', }, }); // Process the order... await this.fulfillOrder(message.orderId); await this.db.orders.update({ where: { orderId: message.orderId }, data: { status: 'completed' }, }); await message.ack(); }}The simple 'check then act' pattern has a race condition: two messages with the same key could both pass the check before either records. Always use database transactions or unique constraints to make the check-and-record atomic.
Database unique constraints provide the most robust foundation for idempotency. They guarantee atomicity at the database level, preventing race conditions that application-level checks might miss.
12345678910111213141516171819202122232425262728293031323334
-- Schema with unique constraints for idempotency -- Processed messages tableCREATE TABLE processed_messages ( id SERIAL PRIMARY KEY, message_id VARCHAR(255) NOT NULL UNIQUE, -- Unique constraint! processed_at TIMESTAMP NOT NULL DEFAULT NOW(), topic VARCHAR(255) NOT NULL, consumer_group VARCHAR(255) NOT NULL, result JSONB); -- Create index for TTL cleanupCREATE INDEX idx_processed_messages_time ON processed_messages(processed_at); -- Payments with idempotency keyCREATE TABLE payments ( id SERIAL PRIMARY KEY, idempotency_key VARCHAR(255) NOT NULL UNIQUE, -- Unique constraint! order_id VARCHAR(255) NOT NULL, amount DECIMAL(10, 2) NOT NULL, status VARCHAR(50) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW()); -- Orders with natural idempotency (order_id is unique)CREATE TABLE orders ( id SERIAL PRIMARY KEY, order_id VARCHAR(255) NOT NULL UNIQUE, -- Natural idempotency key! customer_id VARCHAR(255) NOT NULL, items JSONB NOT NULL, status VARCHAR(50) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW());12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
class DatabaseIdempotentProcessor { private db: Database; /** * Process using database unique constraint. * * The unique constraint makes duplicates FAIL atomically, * which we catch and treat as "already processed". */ async processOnce( messageId: string, operation: () => Promise<void> ): Promise<{ isNew: boolean }> { try { // Attempt to record this message as processed // AND execute the operation in a single transaction await this.db.$transaction(async (tx) => { // Insert the processed message record // Will fail if messageId already exists (unique constraint) await tx.processedMessages.create({ data: { messageId, processedAt: new Date(), }, }); // Execute the operation await operation(); }); return { isNew: true }; } catch (error: any) { // Check if this is a unique constraint violation if (this.isUniqueConstraintError(error)) { // Duplicate message - this is expected, not an error console.log(`Duplicate message ${messageId} detected`); return { isNew: false }; } // Some other error - rethrow throw error; } } private isUniqueConstraintError(error: any): boolean { // PostgreSQL unique violation code is 23505 // MySQL duplicate entry code is 1062 return error.code === 'P2002' || // Prisma unique constraint error.code === '23505' || // PostgreSQL error.code === 'ER_DUP_ENTRY'; // MySQL }} // Usageconsumer.on('message', async (message) => { const { isNew } = await processor.processOnce( message.id, async () => { await orderService.createOrder(message.payload); } ); // Acknowledge regardless - duplicate handling is complete await message.ack();});| Strategy | Pros | Cons | Best For |
|---|---|---|---|
| Application-level check | Simple to understand | Race conditions possible | Low-volume, single consumer |
| Transaction + check | Atomic, no race conditions | Database overhead | Most use cases |
| Unique constraint | Absolute atomicity, fast | Constraint violation noise | High-volume, critical operations |
| External dedup service | Scales independently | Additional infrastructure | Very high volume, microservices |
Many developers see constraint violations as 'errors' to be avoided. For idempotency, they're the mechanism. A unique constraint violation means 'already processed'—exactly what we want. Design your error handling to treat this as a normal case, not an exception.
Real-world message handlers often perform multiple operations. Making complex workflows idempotent requires additional patterns.
Pattern 1: All-or-Nothing Transaction
Wrap all operations in a single transaction. Either everything succeeds, or nothing does.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
class OrderFulfillmentProcessor { async processOrderFulfillment(message: FulfillmentMessage): Promise<void> { const idempotencyKey = `fulfill-${message.messageId}`; await this.db.$transaction(async (tx) => { // Check idempotency const existing = await tx.processedMessages.findUnique({ where: { messageId: idempotencyKey }, }); if (existing) { return; // Already processed } // All operations in single transaction // Either all succeed or none do // 1. Update order status await tx.orders.update({ where: { orderId: message.orderId }, data: { status: 'fulfilled' }, }); // 2. Update inventory for (const item of message.items) { await tx.inventory.update({ where: { sku: item.sku }, data: { quantity: { decrement: item.quantity } }, }); } // 3. Create shipment record await tx.shipments.create({ data: { orderId: message.orderId, carrier: message.carrier, trackingNumber: message.trackingNumber, }, }); // 4. Record as processed (idempotency) await tx.processedMessages.create({ data: { messageId: idempotencyKey }, }); }); await message.ack(); }}Pattern 2: Resumable Workflow with Status Tracking
For operations that can't be wrapped in a single transaction (external APIs, cross-service calls), track progress and resume from the last completed step.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
interface WorkflowState { id: string; orderId: string; currentStep: 'pending' | 'payment_charged' | 'inventory_reserved' | 'shipped' | 'completed'; paymentId?: string; shipmentId?: string; completedAt?: Date;} class ResumableOrderWorkflow { private db: Database; async processOrder(message: OrderMessage): Promise<void> { // Get or create workflow state let state = await this.db.workflows.findUnique({ where: { id: message.orderId }, }); if (!state) { state = await this.db.workflows.create({ data: { id: message.orderId, orderId: message.orderId, currentStep: 'pending', }, }); } // Resume from current step switch (state.currentStep) { case 'pending': await this.chargePayment(state, message); // Fall through to next step case 'payment_charged': await this.reserveInventory(state, message); case 'inventory_reserved': await this.createShipment(state, message); case 'shipped': await this.completeWorkflow(state); case 'completed': // Already done - this is a duplicate message console.log(`Order ${message.orderId} already completed`); break; } await message.ack(); } private async chargePayment(state: WorkflowState, message: OrderMessage): Promise<void> { // Charge payment with idempotency key const paymentResult = await this.paymentGateway.charge({ amount: message.amount, idempotencyKey: `payment-${message.orderId}`, }); // Update state await this.db.workflows.update({ where: { id: state.id }, data: { currentStep: 'payment_charged', paymentId: paymentResult.id, }, }); state.currentStep = 'payment_charged'; state.paymentId = paymentResult.id; } private async reserveInventory(state: WorkflowState, message: OrderMessage): Promise<void> { // Reserve inventory (idempotent by orderId) await this.inventoryService.reserve({ orderId: message.orderId, items: message.items, }); await this.db.workflows.update({ where: { id: state.id }, data: { currentStep: 'inventory_reserved' }, }); state.currentStep = 'inventory_reserved'; } private async createShipment(state: WorkflowState, message: OrderMessage): Promise<void> { // Create shipment (idempotent by orderId) const shipment = await this.shippingService.createShipment({ orderId: message.orderId, address: message.shippingAddress, }); await this.db.workflows.update({ where: { id: state.id }, data: { currentStep: 'shipped', shipmentId: shipment.id, }, }); state.currentStep = 'shipped'; state.shipmentId = shipment.id; } private async completeWorkflow(state: WorkflowState): Promise<void> { await this.db.workflows.update({ where: { id: state.id }, data: { currentStep: 'completed', completedAt: new Date(), }, }); }} /** * Why this is idempotent: * * 1. Each step transitions state before proceeding * 2. On duplicate message, we resume from current step * 3. Each external call uses idempotency key * 4. 'completed' step is a no-op * * Example: Message processed twice * * First time: * pending → charge → payment_charged → reserve → inventory_reserved → * ship → shipped → complete → completed * * Second time (duplicate): * state = completed → "Already completed" → return */The resumable workflow pattern handles crashes mid-workflow. But what about failures that require undoing completed steps? For that, you need the Saga pattern with compensating actions—a topic covered separately. The key is: make each step either idempotent or compensatable.
The design of idempotency keys significantly impacts correctness and operational simplicity. Here are the key considerations:
12345678910111213141516171819202122232425262728293031323334353637
/** * Idempotency Key Generation Patterns */ // Pattern 1: Broker message ID (preferred when available)function keyFromMessageId(message: BrokerMessage): string { return `process-${message.id}`;} // Pattern 2: Content hash (for messages without ID)import { createHash } from 'crypto'; function keyFromContent(message: any): string { const content = JSON.stringify(message); const hash = createHash('sha256').update(content).digest('hex'); return `hash-${hash.substring(0, 16)}`; // First 16 chars sufficient} // Pattern 3: Business key compositefunction keyFromBusinessData(operation: string, data: any): string { // Example: "create-order-ORD123-v2" return `${operation}-${data.orderId}-v2`;} // Pattern 4: Scoped keys for multiple operationsclass ScopedKeyGenerator { generateKey(scope: string, identifier: string): string { return `${scope}:${identifier}`; }} const keyGen = new ScopedKeyGenerator(); // Same orderId, different operations = different keyskeyGen.generateKey('payment', 'ORD123'); // "payment:ORD123"keyGen.generateKey('fulfillment', 'ORD123'); // "fulfillment:ORD123"keyGen.generateKey('notification', 'ORD123'); // "notification:ORD123"Idempotency Record Retention:
Idempotency records must be retained long enough to catch duplicates, but can't grow unbounded. Consider:
| Scenario | Suggested Retention | Rationale |
|---|---|---|
| Normal operations | 7 days | Covers weekend + batch processing delays |
| High-volume, real-time | 24-48 hours | Balance storage vs safety |
| Financial transactions | 90+ days | Regulatory requirements, dispute resolution |
| Audit-critical | Years | Compliance, legal requirements |
123456789101112131415161718192021222324252627282930313233343536373839404142434445
class IdempotencyRecordManager { private db: Database; /** * Clean up old idempotency records. * Run periodically (e.g., daily cron job). */ async cleanupOldRecords(retentionDays: number = 7): Promise<number> { const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - retentionDays); const result = await this.db.processedMessages.deleteMany({ where: { processedAt: { lt: cutoffDate }, }, }); console.log(`Cleaned up ${result.count} old idempotency records`); return result.count; } /** * For very high volume systems, use partitioned tables * and drop entire partitions. */ async dropOldPartition(partitionDate: Date): Promise<void> { // PostgreSQL partition management example const partitionName = `processed_messages_${partitionDate.toISOString().slice(0, 10).replace(/-/g, '_')}`; await this.db.$executeRaw` ALTER TABLE processed_messages DETACH PARTITION ${partitionName}; `; await this.db.$executeRaw` DROP TABLE ${partitionName}; `; }} // Schedule cleanupcron.schedule('0 3 * * *', async () => { // Run at 3 AM daily await idempotencyManager.cleanupOldRecords(7);});Your retention period must exceed your maximum possible duplicate window. If a message could theoretically be redelivered after 5 days (dead letter queue retry, for example), 7-day retention provides safety margin. Deleting records too early can allow duplicates to sneak through.
Idempotency checks add overhead to every message processing. Optimizing this overhead is important for high-throughput systems.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
class CachedIdempotencyChecker { private localCache: LRUCache<string, boolean>; private db: Database; private redis: Redis; constructor() { // Local in-memory cache for recent messages this.localCache = new LRUCache({ max: 100_000, // 100K entries ttl: 1000 * 60 * 60, // 1 hour TTL }); } async checkAndRecord(messageId: string): Promise<boolean> { // Layer 1: Local cache (fastest) if (this.localCache.has(messageId)) { return true; // Definitely processed } // Layer 2: Distributed cache (fast) const redisCached = await this.redis.exists(`idem:${messageId}`); if (redisCached) { this.localCache.set(messageId, true); return true; // Definitely processed } // Layer 3: Database (authoritative) const dbRecord = await this.db.processedMessages.findUnique({ where: { messageId }, }); if (dbRecord) { // Found in DB - update caches await this.redis.setex(`idem:${messageId}`, 3600, '1'); this.localCache.set(messageId, true); return true; } // Not processed - record it await this.db.processedMessages.create({ data: { messageId, processedAt: new Date() }, }); // Update caches await this.redis.setex(`idem:${messageId}`, 3600, '1'); this.localCache.set(messageId, true); return false; // New message }} // Typical hit rates:// - Local cache: ~50-80% (recently processed messages)// - Redis: ~15-40% of remaining// - Database: ~5-20% (first time or very old duplicates)// // Result: ~90% of checks never hit the database12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
import { BloomFilter } from 'bloom-filters'; class BloomFilterIdempotency { private bloomFilter: BloomFilter; private db: Database; constructor() { // 1 million entries, 0.1% false positive rate // Size: ~1.2 MB memory this.bloomFilter = BloomFilter.create(1_000_000, 0.001); } async checkAndRecord(messageId: string): Promise<boolean> { // Fast check: If bloom filter says "not present", // we're 100% certain it's new (no false negatives) if (!this.bloomFilter.has(messageId)) { // Definitely new - record it await this.recordProcessed(messageId); return false; } // Bloom filter says "maybe present" // Could be false positive (0.1% chance) - must check DB const dbCheck = await this.db.processedMessages.findUnique({ where: { messageId }, }); if (dbCheck) { return true; // Already processed } // False positive - it's actually new await this.recordProcessed(messageId); return false; } private async recordProcessed(messageId: string): Promise<void> { this.bloomFilter.add(messageId); await this.db.processedMessages.create({ data: { messageId, processedAt: new Date() }, }); }} /** * Bloom filter trade-offs: * * Pros: * - Constant-time checks * - Tiny memory footprint * - No false negatives (100% reliable for "not seen") * * Cons: * - False positives require DB fallback * - Can't remove entries (needs rebuild on cleanup) * - Needs careful sizing */Don't over-engineer idempotency. At 100 messages/second, a simple database check adds <1ms. At 100,000 messages/second, you need caching layers. Match complexity to volume.
Idempotent consumers are the foundation of reliable message processing in distributed systems. Let's consolidate the key insights:
The Idempotent Consumer Checklist:
For every message handler, verify:
You now understand idempotent consumer design—the critical pattern that enables exactly-once processing semantics in distributed systems. In the next page, we'll explore deduplication strategies—the infrastructure-level patterns for detecting and filtering duplicate messages before they reach your consumers.