Loading learning content...
Throughout this module, we've explored how events cross boundaries, how they're serialized, and how they evolve over time. Now we address the final piece: integration patterns—proven architectural solutions for the recurring challenges of event-driven integration.
These patterns address fundamental distributed systems problems:
The patterns in this page are battle-tested solutions used by organizations like Netflix, Uber, and Amazon to build reliable event-driven systems at scale.
By the end of this page, you will understand: the Transactional Outbox pattern for reliable publishing, the Saga pattern for distributed transactions, Message Channel patterns for event routing, and strategies for idempotent event processing and dead letter handling.
Before diving into patterns, we must understand the fundamental problem they solve: the dual-write problem.
When a service needs to update its database AND publish an event, two separate systems must be updated. Without special handling, these operations can become inconsistent:
1234567891011121314151617181920212223242526272829303132333435363738394041
// DANGEROUS: Dual-write with no atomicity guaranteeclass OrderService { async placeOrder(command: PlaceOrderCommand): Promise<Order> { // Step 1: Save to database const order = Order.create(command); await this.orderRepository.save(order); // ← Commits // Step 2: Publish event await this.eventBus.publish( new OrderPlacedEvent(order) ); // ← What if this fails? return order; }} // FAILURE SCENARIO 1: Event publish fails// - Database has the order (committed)// - Event never sent// - Other services never know about the order// - System is inconsistent! // FAILURE SCENARIO 2: Service crashes after DB commit// - Order is saved// - Service crashes before publish// - Event lost forever// - Same inconsistency! // WRONG FIX: Publish first, then saveasync placeOrderWrongFix(command: PlaceOrderCommand): Promise<Order> { const order = Order.create(command); // Publish first await this.eventBus.publish(new OrderPlacedEvent(order)); // Then save await this.orderRepository.save(order); // ← What if THIS fails? // Now: Event was sent, but order wasn't saved! // Other services act on non-existent order!}The database and message broker are separate systems. Without special patterns, you cannot atomically update both. Either the DB update or the event publish might fail independently. This is the core problem that integration patterns solve.
The Transactional Outbox pattern solves the dual-write problem by leveraging database transactions. Instead of publishing directly to a message broker, the service writes events to an "outbox" table in the same database transaction as the domain change. A separate process reads the outbox and publishes to the message broker.
How It Works:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
// Outbox table entityinterface OutboxMessage { id: string; // Unique message ID aggregateType: string; // e.g., "Order" aggregateId: string; // e.g., order ID eventType: string; // e.g., "order.placed" payload: string; // JSON serialized event createdAt: Date; // When event was created publishedAt: Date | null; // When event was published (null = pending)} // Service writes to outbox in same transactionclass OrderService { constructor( private db: TransactionManager, private orderRepository: OrderRepository, private outboxRepository: OutboxRepository, ) {} async placeOrder(command: PlaceOrderCommand): Promise<Order> { // Single database transaction wraps everything return await this.db.transaction(async (tx) => { // Create and save order const order = Order.create(command); await this.orderRepository.save(order, tx); // Instead of publishing, write to outbox const event = this.createOrderPlacedEvent(order); await this.outboxRepository.insert({ id: generateUuid(), aggregateType: "Order", aggregateId: order.id, eventType: "order.placed", payload: JSON.stringify(event), createdAt: new Date(), publishedAt: null, // Pending }, tx); return order; // Transaction commits: order AND outbox message are atomic }); }} // Separate relay process publishes from outboxclass OutboxRelay { constructor( private outboxRepository: OutboxRepository, private eventBus: EventBus, ) {} async run(): Promise<void> { // Poll for unpublished messages while (true) { const messages = await this.outboxRepository.findUnpublished( { limit: 100 } ); for (const message of messages) { try { // Publish to message broker await this.eventBus.publish({ eventId: message.id, eventType: message.eventType, payload: JSON.parse(message.payload), }); // Mark as published await this.outboxRepository.markPublished(message.id); } catch (error) { // Will retry on next poll console.error(`Failed to publish ${message.id}`, error); } } // Brief pause before next poll await sleep(100); } }}12345678910111213141516171819202122232425262728
-- Outbox table designCREATE TABLE outbox_messages ( id UUID PRIMARY KEY, aggregate_type VARCHAR(255) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), published_at TIMESTAMP WITH TIME ZONE, -- Index for polling unpublished messages efficiently -- Partial index only includes unpublished rows CONSTRAINT outbox_unpublished_idx ON outbox_messages (created_at) WHERE published_at IS NULL); -- Alternative: Use a sequence number for orderingCREATE TABLE outbox_messages_v2 ( sequence_number BIGSERIAL PRIMARY KEY, aggregate_type VARCHAR(255) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW());-- Relay tracks last processed sequence_number-- No need for published_at columnInstead of a polling relay, many teams use Change Data Capture tools like Debezium. CDC monitors the database transaction log and automatically publishes outbox inserts to Kafka. This eliminates polling, reduces latency, and handles ordering correctly. Debezium + PostgreSQL + Kafka is a popular combination.
When a business process spans multiple services, we can't use traditional database transactions. The Saga pattern provides an alternative: a sequence of local transactions, each publishing events that trigger the next step. If a step fails, compensating transactions undo previous work.
Example: Order Fulfillment Saga
If any step fails after previous steps succeeded, we must compensate:
| Style | Description | Pros | Cons |
|---|---|---|---|
| Choreography | Each service listens for events and reacts | Simple, loosely coupled, no central point of failure | Hard to track saga state, difficult to debug, implicit flow |
| Orchestration | Central orchestrator directs each step | Explicit flow, easy to track state, centralized logic | Central point of failure, orchestrator can become complex |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
// CHOREOGRAPHY: Each service reacts to events // Step 1: Order Service creates order, publishes eventclass OrderService { async createOrder(command: CreateOrderCommand): Promise<void> { const order = Order.createPending(command); await this.orderRepository.save(order); await this.publish(new OrderCreatedEvent(order)); // Now waiting for payment processing... } // Listens for payment results async onPaymentCompleted(event: PaymentCompletedEvent): Promise<void> { const order = await this.orderRepository.get(event.orderId); order.markPaymentReceived(); await this.orderRepository.save(order); // Inventory service listens and reacts... } async onPaymentFailed(event: PaymentFailedEvent): Promise<void> { const order = await this.orderRepository.get(event.orderId); order.cancel("Payment failed"); await this.orderRepository.save(order); await this.publish(new OrderCancelledEvent(order)); }} // Step 2: Payment Service listens for ordersclass PaymentService { async onOrderCreated(event: OrderCreatedEvent): Promise<void> { try { const result = await this.processPayment( event.customerId, event.totalAmount ); await this.publish(new PaymentCompletedEvent({ orderId: event.orderId, paymentId: result.id, })); } catch (error) { await this.publish(new PaymentFailedEvent({ orderId: event.orderId, reason: error.message, })); } } // Compensating transaction async onOrderCancelled(event: OrderCancelledEvent): Promise<void> { const payment = await this.paymentRepository.findByOrderId( event.orderId ); if (payment) { await this.refundPayment(payment); await this.publish(new PaymentRefundedEvent({ orderId: event.orderId, paymentId: payment.id, })); } }} // Step 3: Inventory Service listens for paymentclass InventoryService { async onPaymentCompleted(event: PaymentCompletedEvent): Promise<void> { try { await this.reserveInventory(event.orderId); await this.publish(new InventoryReservedEvent({ orderId: event.orderId, })); } catch (error) { // Trigger compensation: refund needed await this.publish(new InventoryReservationFailedEvent({ orderId: event.orderId, reason: error.message, })); } } // Compensating transaction async onOrderCancelled(event: OrderCancelledEvent): Promise<void> { await this.releaseReservation(event.orderId); }}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
// ORCHESTRATION: Central coordinator manages the saga interface SagaStep { readonly name: string; execute(context: SagaContext): Promise<void>; compensate(context: SagaContext): Promise<void>;} class OrderFulfillmentSaga { private readonly steps: SagaStep[] = [ new CreateOrderStep(), new ProcessPaymentStep(), new ReserveInventoryStep(), new CreateShipmentStep(), ]; async execute(command: CreateOrderCommand): Promise<void> { const context = new SagaContext(command); const completedSteps: SagaStep[] = []; try { for (const step of this.steps) { await step.execute(context); completedSteps.push(step); // Persist saga state for recovery await this.saveSagaState(context, completedSteps); } context.markCompleted(); await this.saveSagaState(context, completedSteps); } catch (error) { // Compensate in reverse order context.markFailed(error); for (const step of completedSteps.reverse()) { try { await step.compensate(context); } catch (compensationError) { // Log but continue compensating console.error( `Compensation failed for ${step.name}`, compensationError ); } } await this.saveSagaState(context, completedSteps); throw error; } }} // Each step has execute and compensate logicclass ProcessPaymentStep implements SagaStep { readonly name = "ProcessPayment"; async execute(context: SagaContext): Promise<void> { const result = await this.paymentService.processPayment({ orderId: context.orderId, amount: context.totalAmount, customerId: context.customerId, }); context.setPaymentId(result.paymentId); } async compensate(context: SagaContext): Promise<void> { if (context.paymentId) { await this.paymentService.refundPayment(context.paymentId); } }} // Saga orchestrator runs as a separate serviceclass SagaOrchestrator { async start(): Promise<void> { // Listen for saga initiation requests await this.eventBus.subscribe( "order.create.requested", async (event) => { const saga = new OrderFulfillmentSaga(); await saga.execute(event.command); } ); // Handle saga recovery on startup await this.recoverIncompleteSagas(); } private async recoverIncompleteSagas(): Promise<void> { const incompleteSagas = await this.sagaRepository.findIncomplete(); for (const sagaState of incompleteSagas) { await this.resumeOrCompensate(sagaState); } }}Sagas provide eventual consistency, not ACID transactions. Between saga steps, the system may be in an intermediate state visible to users. Design your domain to tolerate this: use status fields (PENDING, PROCESSING, COMPLETED), design UIs to show in-progress states, and ensure compensations are idempotent.
How events flow from producers to consumers depends on message channel architecture. Different patterns serve different needs:
Key Considerations:
| Pattern | Delivery | Use Case | Example |
|---|---|---|---|
| Point-to-Point | One event → one consumer (exactly one) | Work queues, task processing | Job queue: each job processed by exactly one worker |
| Publish-Subscribe | One event → all subscribers (broadcast) | Notifications, event propagation | Order placed: inventory, shipping, analytics all receive |
| Competing Consumers | One event → one of N consumers (load balanced) | Horizontal scaling | Multiple order processors share the load |
| Content-Based Router | Event routed based on content | Conditional processing | Priority orders to express queue, standard orders to normal |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
// POINT-TO-POINT: Queue with exactly-once deliveryclass TaskQueue { async enqueue(task: Task): Promise<void> { await this.redis.lpush("tasks", JSON.stringify(task)); } async dequeue(): Promise<Task | null> { const data = await this.redis.brpoplpush( "tasks", // Source queue "tasks:processing", // Processing queue (for recovery) 30 // Timeout seconds ); return data ? JSON.parse(data) : null; } async acknowledge(task: Task): Promise<void> { await this.redis.lrem("tasks:processing", 1, JSON.stringify(task)); }} // PUBLISH-SUBSCRIBE: All consumers receive all eventsclass EventBroadcaster { private readonly subscribers = new Map<string, EventHandler[]>(); subscribe(eventType: string, handler: EventHandler): void { const handlers = this.subscribers.get(eventType) || []; handlers.push(handler); this.subscribers.set(eventType, handlers); } async publish(event: Event): Promise<void> { const handlers = this.subscribers.get(event.type) || []; // All subscribers receive the event await Promise.all( handlers.map(handler => handler.handle(event)) ); }} // COMPETING CONSUMERS: Consumer groups in Kafkaclass KafkaCompetingConsumers { async configureConsumerGroup(): Promise<void> { const consumer = this.kafka.consumer({ groupId: "order-processors", // Multiple instances share this group }); await consumer.subscribe({ topic: "orders", fromBeginning: false, }); // Kafka automatically balances partitions across group members // Each message goes to exactly one consumer in the group await consumer.run({ eachMessage: async ({ message }) => { await this.processOrder(message); }, }); }} // CONTENT-BASED ROUTER: Route based on event contentclass ContentBasedRouter { private readonly routes: Array<{ condition: (event: Event) => boolean; channel: EventChannel; }> = []; addRoute( condition: (event: Event) => boolean, channel: EventChannel ): void { this.routes.push({ condition, channel }); } async route(event: Event): Promise<void> { for (const route of this.routes) { if (route.condition(event)) { await route.channel.send(event); return; // First match wins } } // Default channel for unmatched events await this.defaultChannel.send(event); }} // Usage: Route orders by priorityconst router = new ContentBasedRouter();router.addRoute( (e) => e.payload.priority === "EXPRESS", expressOrderChannel);router.addRoute( (e) => e.payload.priority === "STANDARD", standardOrderChannel);Kafka's consumer group model elegantly solves competing consumers AND pub-sub: consumers in the same group compete (each message to one consumer), while consumers in different groups each receive all messages. A single topic can support both patterns simultaneously.
In distributed systems, events may be delivered more than once (network retries, consumer restarts, rebalancing). Handlers must be idempotent: processing the same event multiple times produces the same result as processing it once.
Why Duplicates Occur:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
// PATTERN 1: Event ID tracking (deduplication table)class IdempotentEventHandler { constructor( private processedEvents: ProcessedEventRepository, private businessLogic: BusinessLogic, ) {} async handle(event: Event): Promise<void> { // Check if already processed const wasProcessed = await this.processedEvents.exists(event.eventId); if (wasProcessed) { console.log(`Skipping duplicate event: ${event.eventId}`); return; } // Process in transaction with deduplication record await this.db.transaction(async (tx) => { // Record that we're processing this event await this.processedEvents.insert(event.eventId, tx); // Execute business logic await this.businessLogic.process(event, tx); }); }} // PATTERN 2: Natural idempotency via domain designclass NaturallyIdempotentHandler { async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> { // Check current state rather than tracking event IDs const existingReservation = await this.inventoryRepository .findReservationByOrderId(event.payload.orderId); if (existingReservation) { // Already processed - idempotent by design console.log(`Inventory already reserved for order ${event.payload.orderId}`); return; } // First time: create reservation await this.inventoryRepository.createReservation({ orderId: event.payload.orderId, items: event.payload.items, }); }} // PATTERN 3: Idempotent operations (database level)class IdempotentDatabaseOperations { // UPSERT instead of INSERT async setCustomerTier(event: CustomerTierChangedEvent): Promise<void> { // Runs identical whether first or duplicate await this.db.query(` INSERT INTO customer_tiers (customer_id, tier, updated_at) VALUES ($1, $2, $3) ON CONFLICT (customer_id) DO UPDATE SET tier = EXCLUDED.tier, updated_at = EXCLUDED.updated_at `, [event.customerId, event.newTier, event.occurredAt]); } // Conditional update with version check async updateBalance(event: PaymentReceivedEvent): Promise<void> { const result = await this.db.query(` UPDATE accounts SET balance = balance + $1, last_transaction_id = $2 WHERE id = $3 AND (last_transaction_id IS NULL OR last_transaction_id < $2) `, [event.amount, event.transactionId, event.accountId]); if (result.rowCount === 0) { // Either account doesn't exist, or already processed console.log(`Payment ${event.transactionId} already applied or account missing`); } }} // PATTERN 4: Idempotency keys for API callsclass ExternalApiIdempotency { async processRefund(event: RefundRequestedEvent): Promise<void> { // Use event ID as idempotency key for payment provider const response = await this.paymentGateway.createRefund({ amount: event.amount, transactionId: event.originalTransactionId, idempotencyKey: event.eventId, // Payment gateway deduplicates }); // If duplicate, payment gateway returns existing refund // Either way, we get the right result }}The best idempotency comes from domain design, not tracking tables. If your handler checks 'does this order already exist?' rather than 'have I seen this event?', the logic is clearer and the deduplication is implicit. Reserve event ID tracking for cases where natural idempotency isn't possible.
When event processing fails, what happens to the event? Infinite retries can block the queue; discarding loses data. Dead Letter Queues (DLQ) provide a solution: after exhausting retries, move failed events to a separate queue for investigation and manual handling.
DLQ Best Practices:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
interface DeadLetterMessage { originalEvent: Event; failureReason: string; failureStack?: string; retryCount: number; firstFailedAt: Date; lastFailedAt: Date; sourceQueue: string;} class ResilientEventProcessor { private readonly maxRetries = 5; private readonly baseDelayMs = 1000; async process(event: Event): Promise<void> { let lastError: Error | null = null; for (let attempt = 1; attempt <= this.maxRetries; attempt++) { try { await this.handler.handle(event); return; // Success! } catch (error) { lastError = error as Error; // Is this a transient or permanent failure? if (this.isPermanentFailure(error)) { break; // Don't retry permanent failures } // Exponential backoff const delay = this.baseDelayMs * Math.pow(2, attempt - 1); console.log( `Retry ${attempt}/${this.maxRetries} for ${event.eventId}, waiting ${delay}ms` ); await sleep(delay); } } // Exhausted retries or permanent failure await this.sendToDeadLetter(event, lastError!); } private isPermanentFailure(error: unknown): boolean { // Validation errors, business rule violations, etc. return error instanceof ValidationError || error instanceof BusinessRuleViolation || error instanceof DeserializationError; } private async sendToDeadLetter(event: Event, error: Error): Promise<void> { const dlqMessage: DeadLetterMessage = { originalEvent: event, failureReason: error.message, failureStack: error.stack, retryCount: this.maxRetries, firstFailedAt: new Date(), lastFailedAt: new Date(), sourceQueue: this.sourceQueueName, }; await this.deadLetterQueue.send(dlqMessage); // Alert on-call team await this.alerting.notify({ severity: "warning", title: `Event sent to DLQ: ${event.eventType}`, details: error.message, eventId: event.eventId, }); }} // DLQ monitoring and managementclass DeadLetterManager { async getDeadLetterStats(): Promise<DLQStats> { const messages = await this.dlq.peek(1000); return { totalCount: messages.length, byEventType: this.groupBy(messages, m => m.originalEvent.eventType), byFailureReason: this.groupBy(messages, m => m.failureReason), oldestMessage: messages[0]?.firstFailedAt, }; } async replay(messageId: string): Promise<void> { const dlqMessage = await this.dlq.get(messageId); // Send back to original queue await this.eventBus.publish( dlqMessage.originalEvent, { topic: dlqMessage.sourceQueue } ); // Remove from DLQ await this.dlq.delete(messageId); console.log(`Replayed message ${messageId}`); } async replayAll(filter?: { eventType?: string }): Promise<number> { let replayed = 0; const messages = await this.dlq.getAll(filter); for (const message of messages) { await this.replay(message.id); replayed++; } return replayed; }}A DLQ is a holding area, not a trash can. Every message in the DLQ represents data that couldn't be processed—potential lost orders, failed notifications, or broken integrations. Build operational processes to review, fix, and replay DLQ contents regularly.
Some business processes require events to be processed in order. An order cannot be shipped before it's placed; a balance update shouldn't apply if an earlier transaction is still pending.
Ordering Challenges:
| Strategy | Guarantee | Scalability | When to Use |
|---|---|---|---|
| Single consumer | Total ordering | None (1 consumer) | Low volume, strict ordering required |
| Partitioning by key | Ordering within partition | High (N partitions) | Order matters per entity, not globally |
| Sequence numbers | Consumer reorders as needed | High | Out-of-order detection and handling |
| Causal ordering | Happens-before preserved | Medium | Distributed systems with dependencies |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
// STRATEGY 1: Partition by aggregate ID (Kafka style)class PartitionedEventPublisher { async publish(event: AggregateEvent): Promise<void> { await this.kafka.send({ topic: "order-events", messages: [{ // Key determines partition // All events for same order go to same partition // Same partition = same consumer = ordering preserved key: event.aggregateId, value: JSON.stringify(event), }], }); }} // STRATEGY 2: Sequence numbers for out-of-order detectioninterface SequencedEvent extends Event { sequenceNumber: number; // Per-aggregate sequence aggregateVersion: number; // Current aggregate version} class SequenceAwareHandler { private readonly lastProcessed = new Map<string, number>(); async handle(event: SequencedEvent): Promise<void> { const lastSeq = this.lastProcessed.get(event.aggregateId) ?? 0; if (event.sequenceNumber <= lastSeq) { // Duplicate or old event - skip console.log(`Skipping old event: seq ${event.sequenceNumber} <= ${lastSeq}`); return; } if (event.sequenceNumber > lastSeq + 1) { // Gap detected - events arrived out of order console.warn( `Gap detected: expected ${lastSeq + 1}, got ${event.sequenceNumber}` ); // Options: wait for missing event, fetch from source, or process anyway await this.handleGap(event, lastSeq); return; } // Correct sequence - process await this.processEvent(event); this.lastProcessed.set(event.aggregateId, event.sequenceNumber); } private async handleGap(event: SequencedEvent, lastSeq: number): Promise<void> { // Option 1: Buffer and wait await this.bufferForLater(event); // Option 2: Fetch missing events from source const missing = await this.eventStore.getEvents( event.aggregateId, { fromSequence: lastSeq + 1, toSequence: event.sequenceNumber - 1 } ); for (const missedEvent of missing) { await this.handle(missedEvent); } await this.handle(event); // Retry original event }} // STRATEGY 3: Version-based optimistic handlingclass VersionAwareHandler { async handleOrderStatusChange(event: OrderStatusChangedEvent): Promise<void> { const result = await this.db.query(` UPDATE orders SET status = $1, version = $2, updated_at = NOW() WHERE id = $3 AND version < $4 `, [ event.newStatus, event.aggregateVersion, event.orderId, event.aggregateVersion, // Only update if our version is newer ]); if (result.rowCount === 0) { // Either order doesn't exist, or we have stale event console.log( `Skipping stale status update for order ${event.orderId}` ); } }}Before adding ordering complexity, ask: do we really need global ordering, or just ordering per entity? Kafka's partitioning often suffices: all events for one order are ordered, even if events across orders are not. True global ordering (one consumer, no parallelism) is rarely worth the scalability cost.
Event-driven integration introduces distributed systems challenges that require proven patterns to solve reliably. The patterns in this page form the foundation of robust event-driven architectures.
Module Complete:
Congratulations! You've completed the Event-Driven Integration module. You now understand how events cross boundaries, how they're serialized and versioned, and the architectural patterns that make event-driven integration reliable.
With this knowledge, you can design event-driven systems that:
You've mastered the core patterns of event-driven integration: Transactional Outbox for reliable publishing, Sagas for distributed coordination, idempotent processing for duplicate handling, and dead letter queues for error management. These patterns are the building blocks of resilient, scalable event-driven systems used by industry leaders worldwide.