Loading content...
A customer places an order. The OrderPlaced event is published. Inventory is reserved. Payment is charged. But the shipping label is never created. No error was logged. No alert was triggered. The customer waits days for a shipment that will never arrive.
What happened? The PaymentCharged event—which should have triggered shipping—was lost somewhere in the system. It simply vanished. Lost events are among the most insidious failures in event-driven architectures because they are silent. Unlike errors that crash processes or exceptions that fill logs, lost events leave no trace. The system appears healthy while business processes remain incomplete.
This page explores how events get lost, how to detect loss, and how to build systems that remain correct even when events fail to arrive.
By the end of this page, you will understand the various ways events can be lost in distributed systems, the delivery guarantees provided by different architectures, strategies for detecting event loss, patterns for recovering from lost events, and how to design systems that are resilient to event loss.
Events can be lost at every stage of their journey from producer to consumer. Understanding these failure modes is the first step toward building resilient systems.
Stage 1: Production Failures
Events can fail to be produced in the first place.
| Failure Mode | Mechanism | Example |
|---|---|---|
| Fire-and-Forget Publishing | Producer doesn't wait for acknowledgment | Async publish without waiting for broker ACK |
| Producer Process Crash | Process dies before event publish completes | OOM kill between business logic and publish |
| Transaction Rollback | Business transaction fails after event sent | Database constraint violation after publish |
| Serialization Failure | Event cannot be serialized | Circular reference, unsupported type |
| Network Failure | Producer can't reach broker | DNS failure, firewall rules, broker unreachable |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
// Common production-time event loss scenarios // ❌ BAD: Fire-and-forget publishingasync function dangerousHandler(order: Order) { // Save to database await db.orders.insert(order); // Fire and forget - no guarantee event was received producer.send({ topic: 'orders', value: JSON.stringify(order) }); // ^^^ No await! Event might be lost if send fails} // ❌ BAD: Event published before transaction commitsasync function raceConditionHandler(order: Order) { const tx = await db.beginTransaction(); try { await tx.orders.insert(order); // Event published but transaction not committed yet await producer.send({ topic: 'orders', value: JSON.stringify(order) }); // If this fails, event was sent but order doesn't exist! await tx.commit(); } catch (error) { await tx.rollback(); throw error; }} // ❌ BAD: Producer crash between DB commit and event publishasync function crashVulnerableHandler(order: Order) { // Save to database and commit await db.orders.insert(order); await db.commit(); // If process crashes here, event is never published // But order exists in database - inconsistent state await producer.send({ topic: 'orders', value: JSON.stringify(order) });} // ✅ GOOD: Transactional outbox patternasync function reliableHandler(order: Order) { await db.transaction(async (tx) => { // Insert order await tx.orders.insert(order); // Insert event into outbox (same transaction!) await tx.outbox.insert({ eventType: 'OrderPlaced', payload: JSON.stringify(order), createdAt: new Date(), }); }); // Separate process polls outbox and publishes events // If publish fails, it retries. If DB fails, no event is created.}Stage 2: Broker Failures
Once an event reaches the message broker, it can still be lost.
| Failure Mode | Mechanism | Mitigation |
|---|---|---|
| ACK Before Replication | Primary fails after ACK but before replicating | Set acks=all (Kafka), wait for quorum writes |
| In-Memory Queue Loss | Broker crashes before flushing to disk | Use durable queues, synchronous disk writes |
| Partition Data Loss | Disk corruption or failure | Multi-replica storage, cross-AZ replication |
| Retention Policy | Event expires before consumption | Monitor consumer lag, adjust retention |
| Message Size Rejection | Event exceeds broker limits | Validate message size before publishing |
Stage 3: Consumption Failures
Events successfully stored in the broker can still be lost at consumption time.
| Failure Mode | Mechanism | Mitigation |
|---|---|---|
| Auto-Commit Without Processing | Offset committed before processing completes | Use manual commit after successful processing |
| Silent Deserialization Failure | Event can't be deserialized, quietly dropped | Log and DLQ undeserializable events |
| Exception Swallowing | Handler catches exception, doesn't propagate | Ensure all failures either retry or DLQ |
| Filter Misconfiguration | Event filtered out incorrectly | Test filter logic, monitor filter rates |
| Consumer Group Misconfiguration | Consumer not subscribed to partition | Verify partition assignments |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
// Common consumption-time event loss scenarios // ❌ BAD: Auto-commit before processingconst badConsumer = kafka.consumer({ groupId: 'order-processing', enableAutoCommit: true, // Commits immediately after poll! autoCommitIntervalMs: 100,}); await badConsumer.run({ eachMessage: async ({ message }) => { // If this crashes, message is already committed - LOST await processOrder(JSON.parse(message.value.toString())); },}); // ❌ BAD: Silent exception swallowingawait consumer.run({ eachMessage: async ({ message }) => { try { await processOrder(JSON.parse(message.value.toString())); } catch (error) { // Log but continue - message effectively LOST console.error('Failed to process order:', error); } },}); // ❌ BAD: Silent deserialization failureawait consumer.run({ eachMessage: async ({ message }) => { const order = JSON.parse(message.value.toString()); // What if JSON.parse throws? Message dropped! // What if order.items is undefined? Silently wrong behavior await processOrder(order); },}); // ✅ GOOD: Proper error handling with DLQconst goodConsumer = kafka.consumer({ groupId: 'order-processing', enableAutoCommit: false, // Manual commit only}); await goodConsumer.run({ eachMessage: async ({ topic, partition, message }) => { try { const order = parseOrderEvent(message.value); await processOrder(order); // Commit only after successful processing await goodConsumer.commitOffsets([ { topic, partition, offset: (BigInt(message.offset) + 1n).toString() } ]); } catch (error) { if (isSerializationError(error)) { // Can't retry - send to DLQ await sendToDeadLetterQueue(message, error); await goodConsumer.commitOffsets([/*...*/]); // Move past bad message } else { // Transient error - don't commit, will retry throw error; } } },});The most common cause of event loss in production is exception swallowing—catching an exception, logging it, and continuing. This pattern turns every transient failure into permanent data loss. Always ensure that non-recoverable errors route to a dead letter queue, and retriable errors cause the message to be reprocessed.
Message delivery guarantees exist at multiple levels, and understanding these levels is critical for achieving desired reliability.
Producer → Broker Guarantee
This determines whether the broker has durably stored the message.
| acks Setting | Guarantee | Latency | Risk |
|---|---|---|---|
| acks=0 | Fire-and-forget | Lowest | Message may be lost if leader fails |
| acks=1 | Leader acknowledged | Medium | Lost if leader fails before replication |
| acks=all (or -1) | All replicas acknowledged | Highest | Messages survive broker failures |
12345678910111213141516171819202122232425262728
// Kafka producer configuration for different durability levels // ❌ DANGEROUS: Best throughput, but messages can be lostconst unsafeProducer = kafka.producer({ acks: 0, // Don't wait for any acknowledgment}); // ⚠️ RISK: Balances throughput and durabilityconst balancedProducer = kafka.producer({ acks: 1, // Wait for leader acknowledgment only}); // ✅ SAFE: Highest durability for critical eventsconst safeProducer = kafka.producer({ acks: -1, // Wait for all in-sync replicas // Also need broker configuration: // min.insync.replicas = 2 (or higher) // unclean.leader.election.enable = false}); // For critical business events:const criticalProducer = kafka.producer({ acks: -1, idempotent: true, // Prevent duplicate sends maxInFlightRequests: 5, // Required for idempotent producer retries: Number.MAX_VALUE, // Retry forever (with backoff) retryBackoff: 100, // Start at 100ms});Broker → Consumer Guarantee
This determines when the broker considers a message successfully delivered to a consumer.
The consumer controls this through offset commit behavior:
Auto-commit (at-most-once): Offsets are committed periodically, regardless of processing success. Fast but lossy.
Manual commit after processing (at-least-once): Offsets are committed only after successful processing. Reliable but may cause duplicates on failure.
Transactional processing (exactly-once within Kafka): Process + commit in a transaction. Reliable but higher overhead.
End-to-End Guarantee
The overall delivery guarantee is the weakest guarantee in the chain. If producers use acks=all but consumers auto-commit, the system has at-most-once semantics overall.
To achieve at-least-once end-to-end:
acks=all with retriesmin.insync.replicas >= 2Even with acks=all, there's a gap between your business transaction and the event publish. The Transactional Outbox pattern closes this gap: write events to a database table in the same transaction as your business logic, then have a separate process publish events from the outbox. This guarantees the event is produced if and only if the business transaction commits.
Because lost events are silent, detecting them requires proactive monitoring strategies.
Strategy 1: Sequence Number Monitoring
If events carry sequence numbers, gaps indicate lost events.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
class SequenceGapDetector { private lastSequenceByEntity = new Map<string, number>(); private gaps: Array<{ entityId: string; expected: number; received: number }> = []; constructor( private readonly alerter: AlertingService, private readonly maxGapAge: number = 60_000 // Alert if gap not filled in 60s ) {} checkSequence(entityId: string, sequenceNumber: number): void { const lastSequence = this.lastSequenceByEntity.get(entityId) ?? 0; if (sequenceNumber === lastSequence + 1) { // Expected sequence - update and check if it fills a gap this.lastSequenceByEntity.set(entityId, sequenceNumber); this.checkFilledGaps(entityId, sequenceNumber); } else if (sequenceNumber > lastSequence + 1) { // Gap detected! const gap = { entityId, expected: lastSequence + 1, received: sequenceNumber, detectedAt: Date.now(), }; this.gaps.push(gap); console.warn(`Sequence gap detected for ${entityId}: expected ${lastSequence + 1}, got ${sequenceNumber}`); // Start timer to alert if gap not filled setTimeout(() => this.checkGapPersists(gap), this.maxGapAge); // Still update the sequence (might receive out-of-order) this.lastSequenceByEntity.set(entityId, sequenceNumber); } // sequenceNumber <= lastSequence means old/duplicate event - ignore } private checkFilledGaps(entityId: string, newSequence: number): void { // Remove gaps that are now filled this.gaps = this.gaps.filter(gap => !(gap.entityId === entityId && gap.expected <= newSequence) ); } private checkGapPersists(gap: { entityId: string; expected: number; received: number }): void { // Check if gap still exists after timeout const stillExists = this.gaps.some(g => g.entityId === gap.entityId && g.expected === gap.expected ); if (stillExists) { this.alerter.alert({ severity: 'critical', message: `Event loss detected for ${gap.entityId}: missing sequence(s) ${gap.expected} to ${gap.received - 1}`, metadata: gap, }); } }}Strategy 2: Heartbeat Events
Publish periodic 'heartbeat' events and verify they're received. This detects systemic event loss (e.g., broken topic, misconfigured consumer).
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
// Producer side: publish heartbeats every 30 secondsclass HeartbeatProducer { constructor( private readonly producer: KafkaProducer, private readonly topic: string, private readonly serviceId: string, private readonly intervalMs: number = 30_000 ) {} start(): void { setInterval(() => { const heartbeat = { type: 'SYSTEM.HEARTBEAT', serviceId: this.serviceId, timestamp: Date.now(), sequenceNumber: this.sequenceNumber++, }; this.producer.send({ topic: this.topic, messages: [{ value: JSON.stringify(heartbeat) }], }); }, this.intervalMs); } private sequenceNumber = 0;} // Consumer side: verify heartbeats arriveclass HeartbeatMonitor { private lastHeartbeat = new Map<string, { timestamp: number; sequence: number }>(); constructor( private readonly alerter: AlertingService, private readonly maxHeartbeatAge: number = 90_000 // 3x heartbeat interval ) {} receiveHeartbeat(serviceId: string, timestamp: number, sequence: number): void { const last = this.lastHeartbeat.get(serviceId); if (last && sequence !== last.sequence + 1) { // Missed heartbeat(s)! this.alerter.alert({ severity: 'warning', message: `Missed heartbeat(s) from ${serviceId}: ${sequence - last.sequence - 1} missing`, }); } this.lastHeartbeat.set(serviceId, { timestamp, sequence }); } startChecking(): void { setInterval(() => { const now = Date.now(); for (const [serviceId, last] of this.lastHeartbeat) { if (now - last.timestamp > this.maxHeartbeatAge) { this.alerter.alert({ severity: 'critical', message: `No heartbeat from ${serviceId} for ${(now - last.timestamp) / 1000}s`, }); } } }, 10_000); // Check every 10 seconds }}Strategy 3: Business-Level Reconciliation
Periodically compare derived state (built from events) with source data to detect discrepancies caused by lost events.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
// Example: Reconcile order counts between source and derived systems class OrderReconciliation { constructor( private readonly sourceDb: SourceDatabase, // Order service database private readonly derivedDb: DerivedDatabase, // Analytics database (built from events) private readonly alerter: AlertingService ) {} async reconcile(): Promise<ReconciliationReport> { const report: ReconciliationReport = { timestamp: new Date(), discrepancies: [], }; // Compare order counts by date const sourceOrders = await this.sourceDb.query(` SELECT DATE(created_at) as date, COUNT(*) as count FROM orders WHERE created_at > NOW() - INTERVAL '7 days' GROUP BY DATE(created_at) `); const derivedOrders = await this.derivedDb.query(` SELECT date, order_count as count FROM daily_order_stats WHERE date > CURRENT_DATE - INTERVAL '7 days' `); // Find discrepancies for (const source of sourceOrders.rows) { const derived = derivedOrders.rows.find(d => d.date === source.date); if (!derived) { report.discrepancies.push({ type: 'missing', date: source.date, sourceCount: source.count, derivedCount: 0, }); } else if (Math.abs(source.count - derived.count) > 0) { report.discrepancies.push({ type: 'mismatch', date: source.date, sourceCount: source.count, derivedCount: derived.count, difference: source.count - derived.count, }); } } if (report.discrepancies.length > 0) { this.alerter.alert({ severity: 'warning', message: `Order reconciliation found ${report.discrepancies.length} discrepancies`, metadata: report, }); } return report; }} // Run reconciliation hourlysetInterval(() => reconciliation.reconcile(), 60 * 60 * 1000);When reconciling, account for event propagation delays. If events take up to 5 minutes to process, don't reconcile data from the last 5 minutes—you'll get false positives. Use a 'reconciliation lag' window that exceeds your maximum expected processing time.
When event loss is detected (or suspected), you need recovery mechanisms to restore system consistency.
Pattern 1: Event Replay from Source
If the source system retains events (event store, CDC log), replay the missing events.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
class EventReplayService { constructor( private readonly eventStore: EventStore, private readonly publisher: EventPublisher ) {} /** * Replay events for a specific entity from a given sequence number */ async replayFromSequence( entityId: string, fromSequence: number, toSequence?: number ): Promise<number> { const events = await this.eventStore.getEvents({ entityId, fromSequence, toSequence, }); console.log(`Replaying ${events.length} events for ${entityId}`); for (const event of events) { // Mark as replay so consumers can handle accordingly const replayEvent = { ...event, metadata: { ...event.metadata, isReplay: true, originalTimestamp: event.metadata.timestamp, replayTimestamp: new Date(), }, }; await this.publisher.publish(replayEvent); } return events.length; } /** * Replay all events for an entity (full rebuild) */ async fullReplay(entityId: string): Promise<number> { return this.replayFromSequence(entityId, 0); } /** * Replay events for all entities modified in a time range */ async replayTimeRange(from: Date, to: Date): Promise<number> { const events = await this.eventStore.getEventsByTimeRange(from, to); console.log(`Replaying ${events.length} events from ${from} to ${to}`); for (const event of events) { await this.publisher.publish({ ...event, metadata: { ...event.metadata, isReplay: true }, }); } return events.length; }} // Consumer handling replay eventsclass ReplayAwareHandler { async handleEvent(event: DomainEvent): Promise<void> { if (event.metadata.isReplay) { // For replays, use upsert semantics instead of insert await this.upsertState(event); } else { // Normal processing await this.processEvent(event); } } private async upsertState(event: DomainEvent): Promise<void> { // Idempotent upsert that won't fail if data already exists await this.db.query(` INSERT INTO projections (id, data, version, last_event_id) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET data = CASE WHEN projections.version < $3 THEN $2 ELSE projections.data END, version = GREATEST(projections.version, $3), last_event_id = CASE WHEN projections.version < $3 THEN $4 ELSE projections.last_event_id END `, [event.data.id, event.data, event.data.version, event.metadata.eventId]); }}Pattern 2: State Sync from Source System
When events aren't available for replay, directly sync state from the source system.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
class StateSyncService { constructor( private readonly sourceClient: SourceServiceClient, private readonly localDb: Database ) {} /** * Sync a single entity by fetching current state from source */ async syncEntity(entityType: string, entityId: string): Promise<void> { // Fetch current state from source of truth const sourceState = await this.sourceClient.getEntity(entityType, entityId); // Replace local state entirely await this.localDb.query(` INSERT INTO ${entityType} (id, data, synced_at) VALUES ($1, $2, NOW()) ON CONFLICT (id) DO UPDATE SET data = $2, synced_at = NOW() `, [entityId, JSON.stringify(sourceState)]); console.log(`Synced ${entityType}:${entityId} from source`); } /** * Bulk sync all entities modified after a timestamp */ async syncSince(entityType: string, since: Date): Promise<number> { const cursor = await this.sourceClient.getEntitiesModifiedSince(entityType, since); let count = 0; for await (const entity of cursor) { await this.syncEntity(entityType, entity.id); count++; } return count; } /** * Full sync - nuclear option for major data loss */ async fullSync(entityType: string): Promise<void> { console.warn(`Starting full sync of ${entityType} - this may take a while`); // Truncate local data await this.localDb.query(`TRUNCATE TABLE ${entityType}`); // Fetch all from source const cursor = await this.sourceClient.getAllEntities(entityType); let batch: unknown[] = []; const BATCH_SIZE = 1000; for await (const entity of cursor) { batch.push(entity); if (batch.length >= BATCH_SIZE) { await this.insertBatch(entityType, batch); batch = []; } } if (batch.length > 0) { await this.insertBatch(entityType, batch); } console.log(`Full sync of ${entityType} complete`); }}Pattern 3: Compensating Actions
When replay isn't possible and state sync is impractical, use business-level compensating actions.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
// Example: Recovering from lost shipping events class ShippingRecoveryService { constructor( private readonly orderService: OrderServiceClient, private readonly shippingService: ShippingServiceClient, private readonly notificationService: NotificationServiceClient ) {} async recoverUnshippedOrders(): Promise<RecoveryReport> { // Find orders that should have shipped but didn't const problematicOrders = await this.orderService.findOrders({ status: 'PAID', paidAt: { before: new Date(Date.now() - 24 * 60 * 60 * 1000) }, // > 24h ago }); const shippedOrderIds = await this.shippingService.getShippedOrderIds( problematicOrders.map(o => o.id) ); const unshippedOrders = problematicOrders.filter( o => !shippedOrderIds.includes(o.id) ); console.log(`Found ${unshippedOrders.length} orders that should have shipped`); const report: RecoveryReport = { recovered: [], failed: [] }; for (const order of unshippedOrders) { try { // Compensating action: manually trigger shipping await this.shippingService.createShipment({ orderId: order.id, items: order.items, address: order.shippingAddress, priority: 'HIGH', // Expedite due to delay }); // Notify customer about the delay await this.notificationService.sendApologyEmail( order.customerId, order.id, 'We apologize for the delay in shipping your order.' ); report.recovered.push(order.id); } catch (error) { report.failed.push({ orderId: order.id, error: (error as Error).message }); } } return report; }}When replaying events, ensure that: (1) Consumers are idempotent and won't create duplicate side effects, (2) External actions (emails, payments) are skipped or deduplicated during replay, (3) Replay events are marked so consumers can handle them specially, (4) You have a way to stop/pause replay if issues are discovered.
A Dead Letter Queue (DLQ) is a secondary queue where messages that can't be processed are stored instead of being lost. Properly implemented DLQs turn silent failures into visible, actionable items.
When to Route to DLQ:
NOT to Route to DLQ:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
interface DLQEntry { originalTopic: string; originalPartition: number; originalOffset: string; originalMessage: unknown; errorType: string; errorMessage: string; errorStack?: string; failedAt: Date; retryCount: number; processingServiceId: string; correlationId?: string;} class DeadLetterQueueHandler { constructor( private readonly dlqProducer: KafkaProducer, private readonly dlqTopic: string, private readonly alerter: AlertingService ) {} async routeToDLQ( originalMessage: KafkaMessage, error: Error, context: { topic: string; partition: number; retryCount: number } ): Promise<void> { const dlqEntry: DLQEntry = { originalTopic: context.topic, originalPartition: context.partition, originalOffset: originalMessage.offset, originalMessage: this.safelyParseMessage(originalMessage), errorType: error.name, errorMessage: error.message, errorStack: error.stack, failedAt: new Date(), retryCount: context.retryCount, processingServiceId: process.env.SERVICE_ID!, correlationId: this.extractCorrelationId(originalMessage), }; await this.dlqProducer.send({ topic: this.dlqTopic, messages: [{ key: originalMessage.key, value: JSON.stringify(dlqEntry), headers: { 'dlq.original-topic': context.topic, 'dlq.error-type': error.name, }, }], }); // Alert on DLQ routing (configurable by error type) await this.alertDLQEntry(dlqEntry); } private safelyParseMessage(message: KafkaMessage): unknown { try { return JSON.parse(message.value?.toString() ?? ''); } catch { return { raw: message.value?.toString() }; } } private extractCorrelationId(message: KafkaMessage): string | undefined { try { const parsed = JSON.parse(message.value?.toString() ?? '{}'); return parsed.metadata?.correlationId; } catch { return undefined; } } private async alertDLQEntry(entry: DLQEntry): Promise<void> { // Alert immediately for certain error types if (entry.errorType === 'ValidationError' || entry.errorType === 'BusinessLogicError') { await this.alerter.alert({ severity: 'warning', message: `Event routed to DLQ: ${entry.errorType}`, metadata: { ...entry, errorStack: undefined }, // Truncate for alert }); } }}DLQ Processing and Reprocessing
DLQ messages should be regularly reviewed and either reprocessed (after fixing the underlying issue) or archived.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
class DLQProcessor { constructor( private readonly dlqConsumer: KafkaConsumer, private readonly mainProducer: KafkaProducer, private readonly archiveStore: ArchiveStore ) {} /** * Reprocess a specific DLQ message back to its original topic */ async reprocess(dlqEntry: DLQEntry): Promise<void> { // Increment retry count to track reprocessing attempts const reprocessedMessage = { ...dlqEntry.originalMessage, metadata: { ...(dlqEntry.originalMessage as any).metadata, reprocessedAt: new Date(), reprocessCount: dlqEntry.retryCount + 1, originalDLQTimestamp: dlqEntry.failedAt, }, }; await this.mainProducer.send({ topic: dlqEntry.originalTopic, messages: [{ value: JSON.stringify(reprocessedMessage), }], }); console.log(`Reprocessed DLQ entry to ${dlqEntry.originalTopic}`); } /** * Archive a DLQ message (won't be reprocessed) */ async archive(dlqEntry: DLQEntry, reason: string): Promise<void> { await this.archiveStore.store({ ...dlqEntry, archivedAt: new Date(), archiveReason: reason, }); console.log(`Archived DLQ entry: ${reason}`); } /** * Dashboard query: get DLQ statistics */ async getStats(): Promise<DLQStats> { const entries = await this.dlqConsumer.getAll(); return { totalCount: entries.length, byErrorType: this.groupBy(entries, e => e.errorType), byTopic: this.groupBy(entries, e => e.originalTopic), oldestEntry: entries.sort((a, b) => a.failedAt.getTime() - b.failedAt.getTime() )[0], recentRate: this.calculateRate(entries, 60 * 60 * 1000), // Last hour }; }}The best approach to event loss is designing systems that can detect and recover from loss automatically.
Design Principle 1: Derivable State
Design derived state (read models, caches, projections) to be fully rebuildable from events or source data. Never store non-derivable data only in a projection.
Design Principle 2: Checksums and Verification Points
Periodically verify that derived state matches what events should have produced.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
// Verification events: published periodically by source systemsinterface VerificationEvent { type: 'VERIFICATION.CHECKPOINT'; data: { entityType: string; checkpointId: string; timestamp: Date; recordCount: number; checksum: string; // Hash of all entity IDs + versions sampleRecords: Array<{ id: string; version: number }>; };} // Verification consumer: compares local state against checkpointsclass VerificationConsumer { constructor( private readonly localDb: Database, private readonly alerter: AlertingService ) {} async handleVerificationCheckpoint(event: VerificationEvent): Promise<void> { const { entityType, recordCount, checksum, sampleRecords } = event.data; // Compare record count const localCount = await this.localDb.query( `SELECT COUNT(*) as count FROM ${entityType}` ); if (localCount.rows[0].count !== recordCount) { await this.alerter.alert({ severity: 'warning', message: `Verification failed: ${entityType} count mismatch`, metadata: { expected: recordCount, actual: localCount.rows[0].count, difference: recordCount - localCount.rows[0].count, }, }); } // Compare sample records for (const sample of sampleRecords) { const local = await this.localDb.query( `SELECT version FROM ${entityType} WHERE id = $1`, [sample.id] ); if (!local.rows[0]) { await this.alerter.alert({ severity: 'critical', message: `Verification failed: missing record ${sample.id}`, }); } else if (local.rows[0].version !== sample.version) { await this.alerter.alert({ severity: 'critical', message: `Verification failed: version mismatch for ${sample.id}`, metadata: { expected: sample.version, actual: local.rows[0].version }, }); } } // TODO: Compare full checksum for comprehensive verification }}Design Principle 3: Saga Timeout and Recovery
For multi-step workflows, implement timeouts that detect stalled sagas (potentially due to lost events) and trigger recovery.
123456789101112131415161718192021222324252627282930313233343536373839404142
class SagaTimeoutMonitor { constructor( private readonly sagaStore: SagaStore, private readonly recoveryService: SagaRecoveryService ) {} async checkStalledSagas(): Promise<void> { // Find sagas that have been in-progress too long const stalledSagas = await this.sagaStore.findStalledSagas({ states: ['AWAITING_PAYMENT', 'AWAITING_INVENTORY', 'AWAITING_SHIPPING'], maxAge: 30 * 60 * 1000, // 30 minutes }); for (const saga of stalledSagas) { console.log(`Stalled saga detected: ${saga.id} in state ${saga.state}`); // Determine if we're missing an event const expectedEvent = this.getExpectedNextEvent(saga); const received = await this.checkEventReceived(saga.id, expectedEvent); if (!received) { // Event appears to be lost - trigger recovery await this.recoveryService.recoverSaga(saga); } else { // Event was received but saga didn't progress - different issue await this.alerter.alert({ severity: 'critical', message: `Saga ${saga.id} received event but didn't progress`, }); } } } private getExpectedNextEvent(saga: Saga): string { switch (saga.state) { case 'AWAITING_PAYMENT': return 'PaymentCompleted'; case 'AWAITING_INVENTORY': return 'InventoryReserved'; case 'AWAITING_SHIPPING': return 'ShipmentCreated'; default: return 'Unknown'; } }}Combine multiple strategies: (1) Strong production guarantees (outbox pattern, acks=all), (2) Monitoring for detection (heartbeats, sequence gaps, reconciliation), (3) DLQs for visibility, (4) Replay/recovery mechanisms, (5) Business-level compensating actions. Each layer catches failures the others might miss.
Event loss is a silent killer in event-driven systems. Unlike errors that crash processes, lost events leave no trace while business processes remain incomplete. Prevention, detection, and recovery must all be explicitly designed.
What's Next
Debugging challenges, ordering issues, duplicates, and lost events are all specific pitfalls. In the final page, we'll explore complexity management—how to keep event-driven systems understandable and maintainable as they grow, including documentation, governance, and organizational patterns.
You now understand how events get lost in distributed systems, strategies for detection, and patterns for recovery. These techniques transform silent failures into manageable, recoverable situations—essential for operating event-driven systems reliably in production.