Loading content...
You now understand the Outbox Pattern's theory: why dual writes fail, how transactional outbox solves atomicity, and the trade-offs between polling and CDC. But theory alone doesn't ship features.
This page bridges the gap from conceptual understanding to production-ready code. We'll implement the Outbox Pattern using popular technologies, address real-world concerns like testing and observability, and explore cloud-native solutions that handle the infrastructure for you.
By the end, you'll have copy-pastable implementations for your specific stack, plus the knowledge to adapt these patterns to any technology.
By the end of this page, you will have production-ready Outbox Pattern implementations for Node.js/TypeScript with Prisma, TypeORM, and native SQL, understand how to integrate with Kafka using Debezium, and know when to use cloud-managed solutions like AWS EventBridge with DynamoDB Streams.
Prisma is one of the most popular ORMs for Node.js/TypeScript. Here's a complete, production-ready Outbox Pattern implementation.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
// prisma/schema.prisma generator client { provider = "prisma-client-js"} datasource db { provider = "postgresql" url = env("DATABASE_URL")} // Main business entitymodel Order { id String @id @default(uuid()) customerId String @map("customer_id") status OrderStatus @default(PENDING) subtotal Decimal @db.Decimal(10, 2) tax Decimal @db.Decimal(10, 2) shippingCost Decimal @map("shipping_cost") @db.Decimal(10, 2) total Decimal @db.Decimal(10, 2) items OrderItem[] createdAt DateTime @default(now()) @map("created_at") updatedAt DateTime @updatedAt @map("updated_at") @@map("orders")} model OrderItem { id String @id @default(uuid()) orderId String @map("order_id") productId String @map("product_id") productName String @map("product_name") quantity Int unitPrice Decimal @map("unit_price") @db.Decimal(10, 2) lineTotal Decimal @map("line_total") @db.Decimal(10, 2) order Order @relation(fields: [orderId], references: [id]) @@map("order_items")} enum OrderStatus { PENDING CONFIRMED SHIPPED DELIVERED CANCELLED} // OUTBOX TABLEmodel OutboxEvent { id String @id @default(uuid()) sequenceNumber BigInt @default(autoincrement()) @map("sequence_number") aggregateType String @map("aggregate_type") aggregateId String @map("aggregate_id") eventType String @map("event_type") eventVersion Int @default(1) @map("event_version") payload Json metadata Json? correlationId String? @map("correlation_id") causationId String? @map("causation_id") createdAt DateTime @default(now()) @map("created_at") publishedAt DateTime? @map("published_at") // Retry tracking publishAttempts Int @default(0) @map("publish_attempts") lastAttemptAt DateTime? @map("last_attempt_at") lastError String? @map("last_error") @@index([sequenceNumber], map: "idx_outbox_pending") @@map("outbox_events")} // Processed events (for consumer idempotency)model ProcessedEvent { eventId String @id @map("event_id") processedAt DateTime @default(now()) @map("processed_at") @@map("processed_events")}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
// src/outbox/outbox-service.ts import { PrismaClient, Prisma } from '@prisma/client';import { v4 as uuidv4 } from 'uuid'; // Event type definitionsinterface BaseEvent<T extends string, P> { eventId: string; eventType: T; eventVersion: number; aggregateType: string; aggregateId: string; payload: P; metadata: EventMetadata; createdAt: Date;} interface EventMetadata { correlationId: string; causationId?: string; userId?: string; source: string;} // Type-safe event builderexport class OutboxService { constructor(private prisma: PrismaClient) {} /** * Add an event to the outbox within an existing transaction */ async addEvent<P>( tx: Prisma.TransactionClient, params: { aggregateType: string; aggregateId: string; eventType: string; eventVersion?: number; payload: P; metadata: EventMetadata; } ): Promise<string> { const eventId = uuidv4(); await tx.outboxEvent.create({ data: { id: eventId, aggregateType: params.aggregateType, aggregateId: params.aggregateId, eventType: params.eventType, eventVersion: params.eventVersion ?? 1, payload: params.payload as Prisma.JsonObject, metadata: params.metadata as Prisma.JsonObject, correlationId: params.metadata.correlationId, causationId: params.metadata.causationId, }, }); return eventId; }} // Order Service using Outboxexport class OrderService { constructor( private prisma: PrismaClient, private outbox: OutboxService ) {} async createOrder( command: CreateOrderCommand, context: RequestContext ): Promise<Order> { // Generate order ID upfront for event payload const orderId = uuidv4(); // Single transaction: order + outbox event const result = await this.prisma.$transaction(async (tx) => { // Calculate totals const subtotal = command.items.reduce( (sum, item) => sum + item.quantity * item.unitPrice, 0 ); const tax = subtotal * 0.1; // 10% tax const shippingCost = 9.99; const total = subtotal + tax + shippingCost; // Create order with items const order = await tx.order.create({ data: { id: orderId, customerId: command.customerId, status: 'PENDING', subtotal, tax, shippingCost, total, items: { create: command.items.map((item) => ({ id: uuidv4(), productId: item.productId, productName: item.productName, quantity: item.quantity, unitPrice: item.unitPrice, lineTotal: item.quantity * item.unitPrice, })), }, }, include: { items: true }, }); // Add event to outbox (SAME TRANSACTION) await this.outbox.addEvent(tx, { aggregateType: 'Order', aggregateId: order.id, eventType: 'OrderCreated', eventVersion: 1, payload: { orderId: order.id, customerId: order.customerId, status: order.status, items: order.items.map((i) => ({ productId: i.productId, productName: i.productName, quantity: i.quantity, unitPrice: Number(i.unitPrice), })), subtotal: Number(order.subtotal), tax: Number(order.tax), shippingCost: Number(order.shippingCost), total: Number(order.total), }, metadata: { correlationId: context.correlationId, causationId: context.requestId, userId: context.userId, source: 'order-service', }, }); return order; }); return result; } async shipOrder( orderId: string, trackingNumber: string, context: RequestContext ): Promise<void> { await this.prisma.$transaction(async (tx) => { // Update order status const order = await tx.order.update({ where: { id: orderId, status: 'CONFIRMED' }, data: { status: 'SHIPPED' }, }); // Add event to outbox await this.outbox.addEvent(tx, { aggregateType: 'Order', aggregateId: orderId, eventType: 'OrderShipped', payload: { orderId, trackingNumber, shippedAt: new Date().toISOString(), }, metadata: { correlationId: context.correlationId, userId: context.userId, source: 'order-service', }, }); }); }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
// src/outbox/outbox-publisher.ts import { PrismaClient } from "C:/learn-101/src/generated/prisma";import { Kafka, Producer } from 'kafkajs'; interface PublisherConfig { batchSize: number; pollIntervalMs: number; emptyPollDelayMs: number; maxRetries: number;} export class PrismaOutboxPublisher { private isRunning = false; private producer: Producer; constructor( private prisma: PrismaClient, private kafka: Kafka, private config: PublisherConfig ) { this.producer = kafka.producer({ idempotent: true, // Exactly-once producer semantics maxInFlightRequests: 5, transactionalId: 'outbox-publisher', // Enable transactions }); } async start(): Promise<void> { await this.producer.connect(); this.isRunning = true; console.log('Outbox publisher started'); while (this.isRunning) { try { const count = await this.publishBatch(); if (count === 0) { await this.sleep(this.config.emptyPollDelayMs); } else { // Immediately poll again if we found events await this.sleep(this.config.pollIntervalMs); } } catch (error) { console.error('Publisher error:', error); await this.sleep(5000); // Error backoff } } } private async publishBatch(): Promise<number> { // Prisma doesn't support FOR UPDATE SKIP LOCKED directly // We use raw SQL for this critical section const events = await this.prisma.$queryRaw<OutboxEvent[]>` SELECT id, sequence_number as "sequenceNumber", aggregate_type as "aggregateType", aggregate_id as "aggregateId", event_type as "eventType", event_version as "eventVersion", payload, metadata, correlation_id as "correlationId", created_at as "createdAt" FROM outbox_events WHERE published_at IS NULL AND (publish_attempts < ${this.config.maxRetries} OR last_attempt_at < NOW() - INTERVAL '5 minutes') ORDER BY sequence_number ASC LIMIT ${this.config.batchSize} FOR UPDATE SKIP LOCKED `; if (events.length === 0) { return 0; } const publishedIds: string[] = []; const failedEvents: Map<string, string> = new Map(); // Publish to Kafka with transactions for exactly-once const transaction = await this.producer.transaction(); try { for (const event of events) { const topic = this.buildTopicName(event); await transaction.send({ topic, messages: [{ key: event.aggregateId, value: JSON.stringify({ eventId: event.id, eventType: event.eventType, eventVersion: event.eventVersion, aggregateType: event.aggregateType, aggregateId: event.aggregateId, payload: event.payload, metadata: event.metadata, timestamp: event.createdAt.toISOString(), }), headers: { 'event-id': event.id, 'event-type': event.eventType, 'correlation-id': event.correlationId || '', 'sequence-number': String(event.sequenceNumber), }, }], }); publishedIds.push(event.id); } await transaction.commit(); } catch (error) { await transaction.abort(); console.error('Kafka transaction failed:', error); // Mark all as failed for (const event of events) { failedEvents.set(event.id, String(error)); } } // Update database with results if (publishedIds.length > 0) { await this.prisma.outboxEvent.updateMany({ where: { id: { in: publishedIds } }, data: { publishedAt: new Date() }, }); } for (const [eventId, error] of failedEvents) { await this.prisma.outboxEvent.update({ where: { id: eventId }, data: { publishAttempts: { increment: 1 }, lastAttemptAt: new Date(), lastError: error, }, }); } return publishedIds.length; } private buildTopicName(event: { aggregateType: string; eventType: string }): string { const aggregate = event.aggregateType.toLowerCase(); const eventType = event.eventType .replace(/([a-z])([A-Z])/g, '$1-$2') .toLowerCase(); return `${aggregate}.${eventType}`; } private sleep(ms: number): Promise<void> { return new Promise((resolve) => setTimeout(resolve, ms)); } async stop(): Promise<void> { this.isRunning = false; await this.producer.disconnect(); console.log('Outbox publisher stopped'); }} // Usageconst publisher = new PrismaOutboxPublisher(prisma, kafka, { batchSize: 100, pollIntervalMs: 100, emptyPollDelayMs: 1000, maxRetries: 5,}); // Start as background processpublisher.start().catch(console.error); // Graceful shutdownprocess.on('SIGTERM', () => publisher.stop());Prisma doesn't natively support FOR UPDATE SKIP LOCKED, so we use $queryRaw for the publisher's critical section. This is acceptable since the publisher query is simple and stable. The service layer uses standard Prisma methods.
TypeORM provides richer transaction control, including native support for row locking. Here's a complete implementation.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
// src/entities/outbox-event.entity.ts import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, Index,} from 'typeorm'; @Entity('outbox_events')export class OutboxEvent { @PrimaryGeneratedColumn('uuid') id: string; @Column({ type: 'bigint', generated: 'increment' }) @Index('idx_outbox_sequence') sequenceNumber: number; @Column({ name: 'aggregate_type' }) aggregateType: string; @Column({ name: 'aggregate_id' }) aggregateId: string; @Column({ name: 'event_type' }) eventType: string; @Column({ name: 'event_version', default: 1 }) eventVersion: number; @Column({ type: 'jsonb' }) payload: Record<string, unknown>; @Column({ type: 'jsonb', nullable: true }) metadata: Record<string, unknown>; @Column({ name: 'correlation_id', nullable: true }) correlationId: string; @Column({ name: 'causation_id', nullable: true }) causationId: string; @CreateDateColumn({ name: 'created_at' }) createdAt: Date; @Column({ name: 'published_at', nullable: true }) publishedAt: Date | null; @Column({ name: 'publish_attempts', default: 0 }) publishAttempts: number; @Column({ name: 'last_attempt_at', nullable: true }) lastAttemptAt: Date | null; @Column({ name: 'last_error', nullable: true }) lastError: string | null;} // src/outbox/outbox.repository.ts import { DataSource, EntityManager, LessThan, IsNull } from 'typeorm';import { OutboxEvent } from '../entities/outbox-event.entity'; export class OutboxRepository { constructor(private dataSource: DataSource) {} /** * Add event to outbox within transaction */ async addEvent( manager: EntityManager, event: Partial<OutboxEvent> ): Promise<OutboxEvent> { const outboxEvent = manager.create(OutboxEvent, event); return manager.save(outboxEvent); } /** * Get pending events with row locking for concurrent publishers */ async getPendingEventsWithLock( manager: EntityManager, limit: number, maxRetries: number ): Promise<OutboxEvent[]> { // TypeORM supports setLock directly return manager .createQueryBuilder(OutboxEvent, 'event') .where('event.publishedAt IS NULL') .andWhere( '(event.publishAttempts < :maxRetries OR event.lastAttemptAt < :retryAfter)', { maxRetries, retryAfter: new Date(Date.now() - 5 * 60 * 1000), // 5 minutes ago } ) .orderBy('event.sequenceNumber', 'ASC') .limit(limit) .setLock('pessimistic_write_or_fail') // SKIP LOCKED equivalent .getMany(); } /** * Mark events as published */ async markPublished( manager: EntityManager, eventIds: string[] ): Promise<void> { await manager.update(OutboxEvent, eventIds, { publishedAt: new Date(), }); } /** * Record failed publish attempt */ async recordFailure( manager: EntityManager, eventId: string, error: string ): Promise<void> { await manager.increment(OutboxEvent, { id: eventId }, 'publishAttempts', 1); await manager.update(OutboxEvent, eventId, { lastAttemptAt: new Date(), lastError: error, }); }} // src/services/order.service.ts import { DataSource } from 'typeorm';import { Order } from '../entities/order.entity';import { OutboxRepository } from '../outbox/outbox.repository';import { v4 as uuidv4 } from 'uuid'; export class OrderService { constructor( private dataSource: DataSource, private outboxRepo: OutboxRepository ) {} async createOrder( command: CreateOrderCommand, context: RequestContext ): Promise<Order> { return this.dataSource.transaction(async (manager) => { // Create order const order = manager.create(Order, { id: uuidv4(), customerId: command.customerId, status: 'PENDING', // ... other fields }); await manager.save(order); // Add outbox event (same transaction) await this.outboxRepo.addEvent(manager, { id: uuidv4(), aggregateType: 'Order', aggregateId: order.id, eventType: 'OrderCreated', eventVersion: 1, payload: { orderId: order.id, customerId: order.customerId, // ... other fields }, metadata: { source: 'order-service', }, correlationId: context.correlationId, causationId: context.requestId, }); return order; }); }}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
// src/outbox/outbox-publisher.ts import { DataSource } from 'typeorm';import { OutboxRepository } from './outbox.repository';import { KafkaProducer } from '../kafka/producer'; export class TypeORMOutboxPublisher { private isRunning = false; constructor( private dataSource: DataSource, private outboxRepo: OutboxRepository, private kafkaProducer: KafkaProducer, private config: { batchSize: number; pollInterval: number; maxRetries: number; } ) {} async start(): Promise<void> { this.isRunning = true; while (this.isRunning) { const count = await this.processNextBatch(); if (count === 0) { await this.sleep(this.config.pollInterval); } } } private async processNextBatch(): Promise<number> { // Use transaction for atomic read-process-update return this.dataSource.transaction(async (manager) => { // Get events with row locking const events = await this.outboxRepo.getPendingEventsWithLock( manager, this.config.batchSize, this.config.maxRetries ); if (events.length === 0) return 0; const publishedIds: string[] = []; for (const event of events) { try { await this.kafkaProducer.send({ topic: `${event.aggregateType.toLowerCase()}.${this.toKebab(event.eventType)}`, messages: [{ key: event.aggregateId, value: JSON.stringify({ eventId: event.id, eventType: event.eventType, aggregateType: event.aggregateType, aggregateId: event.aggregateId, payload: event.payload, metadata: event.metadata, timestamp: event.createdAt.toISOString(), }), }], }); publishedIds.push(event.id); } catch (error) { await this.outboxRepo.recordFailure( manager, event.id, String(error) ); } } if (publishedIds.length > 0) { await this.outboxRepo.markPublished(manager, publishedIds); } return publishedIds.length; }); } private toKebab(str: string): string { return str.replace(/([a-z])([A-Z])/g, '$1-$2').toLowerCase(); } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } stop(): void { this.isRunning = false; }}For near-real-time event delivery, Debezium's Outbox Event Router provides a complete CDC solution. Here's how to integrate it.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
# docker-compose.yml - Complete Debezium + Kafka Stack version: '3.8' services: # PostgreSQL with logical replication enabled postgres: image: postgres:15 environment: POSTGRES_USER: app POSTGRES_PASSWORD: secret POSTGRES_DB: orders_db command: - "postgres" - "-c" - "wal_level=logical" - "-c" - "max_replication_slots=4" - "-c" - "max_wal_senders=4" ports: - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data # Zookeeper for Kafka zookeeper: image: confluentinc/cp-zookeeper:7.5.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 # Kafka kafka: image: confluentinc/cp-kafka:7.5.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # Kafka Connect with Debezium connect: image: debezium/connect:2.4 depends_on: - kafka - postgres ports: - "8083:8083" environment: BOOTSTRAP_SERVERS: kafka:29092 GROUP_ID: debezium-connect CONFIG_STORAGE_TOPIC: connect_configs OFFSET_STORAGE_TOPIC: connect_offsets STATUS_STORAGE_TOPIC: connect_statuses CONFIG_STORAGE_REPLICATION_FACTOR: 1 OFFSET_STORAGE_REPLICATION_FACTOR: 1 STATUS_STORAGE_REPLICATION_FACTOR: 1 # Kafka UI for debugging kafka-ui: image: provectuslabs/kafka-ui:latest depends_on: - kafka - connect ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: debezium KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083 volumes: postgres_data:1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
#!/bin/bash# Register Debezium PostgreSQL connector with Outbox Event Router curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "orders-outbox-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "app", "database.password": "secret", "database.dbname": "orders_db", "database.server.name": "orders", "slot.name": "outbox_slot", "plugin.name": "pgoutput", "publication.name": "outbox_publication", "table.include.list": "public.outbox_events", "tombstones.on.delete": "false", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.route.by.field": "aggregate_type", "transforms.outbox.route.topic.replacement": "events.${'"'"'routedByValue'"'"' }", "transforms.outbox.table.field.event.id": "id", "transforms.outbox.table.field.event.key": "aggregate_id", "transforms.outbox.table.field.event.type": "event_type", "transforms.outbox.table.field.event.payload": "payload", "transforms.outbox.table.field.event.timestamp": "created_at", "transforms.outbox.table.fields.additional.placement": "event_version:envelope:eventVersion,correlation_id:header:correlationId,causation_id:header:causationId", "transforms.outbox.table.expand.json.payload": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } }' echo "Connector registered!" # Verify connector statuscurl http://localhost:8083/connectors/orders-outbox-connector/status | jqWith Debezium, you don't need a custom publisher. Debezium reads the PostgreSQL WAL and automatically publishes events to Kafka. Your application only needs to:
The Outbox Event Router transformation formats the events properly, so consumers receive clean business events, not raw database change events.
Monitor the replication slot lag carefully. If Debezium falls behind or stops, the PostgreSQL WAL cannot be cleaned, causing disk to fill. Set up alerts for slot lag and have runbooks for connector recovery.
Cloud providers offer managed services that can implement the Outbox Pattern with less operational overhead. Here are patterns for AWS.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
// AWS DynamoDB Streams + Lambda Outbox Pattern// // DynamoDB Streams provides built-in CDC - every table change// triggers a stream event that can invoke Lambda. import { DynamoDBClient, TransactWriteItemsCommand } from '@aws-sdk/client-dynamodb';import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';import { marshall } from '@aws-sdk/util-dynamodb';import { v4 as uuidv4 } from 'uuid'; // DynamoDB table design:// - Single table design with PK/SK// - Orders stored as: PK=ORDER#<id>, SK=ORDER#<id>// - Outbox events stored as: PK=OUTBOX#<date>, SK=<sequence>//// DynamoDB Streams captures writes to outbox items const dynamodb = new DynamoDBClient({}); export class DynamoDBOrderService { async createOrder(command: CreateOrderCommand): Promise<Order> { const orderId = uuidv4(); const timestamp = new Date().toISOString(); // TransactWriteItems ensures atomicity await dynamodb.send(new TransactWriteItemsCommand({ TransactItems: [ // Write the order { Put: { TableName: 'OrdersTable', Item: marshall({ PK: `ORDER#${orderId}`, SK: `ORDER#${orderId}`, entityType: 'Order', id: orderId, customerId: command.customerId, status: 'PENDING', items: command.items, total: command.total, createdAt: timestamp, }), }, }, // Write the outbox event { Put: { TableName: 'OrdersTable', Item: marshall({ PK: `OUTBOX#${timestamp.slice(0, 10)}`, // Partition by date SK: `${timestamp}#${orderId}`, entityType: 'OutboxEvent', eventId: uuidv4(), aggregateType: 'Order', aggregateId: orderId, eventType: 'OrderCreated', payload: { orderId, customerId: command.customerId, items: command.items, total: command.total, }, createdAt: timestamp, TTL: Math.floor(Date.now() / 1000) + 7 * 24 * 60 * 60, // 7 days }), }, }, ], })); return { id: orderId, ...command }; }} // Lambda function triggered by DynamoDB Streams// Publishes to EventBridge import { DynamoDBStreamEvent, Context } from 'aws-lambda';import { unmarshall } from '@aws-sdk/util-dynamodb'; const eventbridge = new EventBridgeClient({}); export async function streamHandler( event: DynamoDBStreamEvent, context: Context): Promise<void> { const events: PutEventsRequestEntry[] = []; for (const record of event.Records) { // Only process INSERTs to outbox if (record.eventName !== 'INSERT') continue; const item = unmarshall(record.dynamodb?.NewImage || {}); if (item.entityType !== 'OutboxEvent') continue; events.push({ Source: 'orders-service', DetailType: item.eventType, Detail: JSON.stringify({ eventId: item.eventId, aggregateType: item.aggregateType, aggregateId: item.aggregateId, payload: item.payload, timestamp: item.createdAt, }), EventBusName: 'orders-events', }); } if (events.length > 0) { await eventbridge.send(new PutEventsCommand({ Entries: events })); console.log(`Published ${events.length} events to EventBridge`); }} // Benefits of DynamoDB Streams:// - Automatic CDC, no separate connector// - Exactly-once processing within Lambda// - Serverless - scales automatically// - TTL auto-cleans old outbox events//// Limitations:// - DynamoDB transaction limits (100 items/25KB)// - Stream retention only 24 hours// - Lambda concurrency limits123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
// AWS EventBridge Pipes: Managed CDC for RDS//// EventBridge Pipes can connect RDS PostgreSQL changes// directly to EventBridge without custom code. // CDK Infrastructureimport * as cdk from 'aws-cdk-lib';import * as rds from 'aws-cdk-lib/aws-rds';import * as pipes from 'aws-cdk-lib/aws-pipes';import * as events from 'aws-cdk-lib/aws-events'; export class OutboxPipeStack extends cdk.Stack { constructor(scope: cdk.App, id: string) { super(scope, id); // RDS PostgreSQL with logical replication const database = new rds.DatabaseInstance(this, 'OrdersDB', { engine: rds.DatabaseInstanceEngine.postgres({ version: rds.PostgresEngineVersion.VER_15, }), parameters: { 'rds.logical_replication': '1', }, }); // EventBridge Bus for order events const eventBus = new events.EventBus(this, 'OrderEventBus', { eventBusName: 'orders-events', }); // EventBridge Pipe: RDS → EventBridge // (Simplified - actual implementation requires more setup) new pipes.CfnPipe(this, 'OutboxPipe', { name: 'orders-outbox-pipe', source: database.instanceArn, // RDS as source target: eventBus.eventBusArn, // EventBridge as target sourceParameters: { // Configure to read from outbox_events table }, targetParameters: { eventBridgeEventBusParameters: { detailType: '$.eventType', source: 'orders-service', }, }, }); }} // With EventBridge Pipes:// - No custom publisher code needed// - AWS manages the CDC connector// - Scales automatically// - Pay-per-event pricing//// Currently limited to specific source types.// Check AWS docs for current RDS support status.| Solution | Database | Event Target | Latency | Complexity |
|---|---|---|---|---|
| DynamoDB Streams + Lambda | DynamoDB | EventBridge/SQS/SNS | ~100ms | Low |
| RDS + Debezium on EC2/ECS | RDS PostgreSQL/MySQL | Kafka/MSK | ~10-50ms | High |
| Aurora + EventBridge Pipes | Aurora | EventBridge | ~100-500ms | Low |
| DocumentDB + Change Streams | DocumentDB | Lambda/Kinesis | ~100ms | Medium |
| Custom Polling Lambda | Any RDS | Any | Poll interval | Medium |
Testing distributed systems is notoriously difficult. Here are strategies for testing the Outbox Pattern at different levels.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
// src/outbox/__tests__/outbox-service.test.ts import { PrismaClient } from "C:/learn-101/src/generated/prisma";import { OrderService } from '../order-service';import { OutboxService } from '../outbox-service'; describe('OrderService with Outbox', () => { let prisma: PrismaClient; let orderService: OrderService; beforeAll(async () => { prisma = new PrismaClient(); orderService = new OrderService(prisma, new OutboxService(prisma)); }); beforeEach(async () => { // Clean state for each test await prisma.outboxEvent.deleteMany(); await prisma.orderItem.deleteMany(); await prisma.order.deleteMany(); }); afterAll(async () => { await prisma.$disconnect(); }); describe('createOrder', () => { it('should create order and outbox event atomically', async () => { const command = { customerId: 'cust-123', items: [ { productId: 'prod-1', productName: 'Widget', quantity: 2, unitPrice: 10 }, ], }; const context = { correlationId: 'corr-456', requestId: 'req-789', userId: 'user-1' }; const order = await orderService.createOrder(command, context); // Verify order created const savedOrder = await prisma.order.findUnique({ where: { id: order.id }, include: { items: true }, }); expect(savedOrder).not.toBeNull(); expect(savedOrder?.status).toBe('PENDING'); expect(savedOrder?.items).toHaveLength(1); // Verify outbox event created const outboxEvents = await prisma.outboxEvent.findMany({ where: { aggregateId: order.id }, }); expect(outboxEvents).toHaveLength(1); expect(outboxEvents[0].eventType).toBe('OrderCreated'); expect(outboxEvents[0].publishedAt).toBeNull(); expect(outboxEvents[0].correlationId).toBe('corr-456'); }); it('should rollback both order and event on constraint violation', async () => { // Create order with invalid data that will fail constraint const command = { customerId: '', // Empty - assuming NOT NULL constraint items: [], }; await expect( orderService.createOrder(command, { correlationId: 'x', requestId: 'y', userId: 'z' }) ).rejects.toThrow(); // Verify nothing was created const orders = await prisma.order.findMany(); const events = await prisma.outboxEvent.findMany(); expect(orders).toHaveLength(0); expect(events).toHaveLength(0); }); it('should include correct payload in outbox event', async () => { const command = { customerId: 'cust-123', items: [ { productId: 'prod-1', productName: 'Widget', quantity: 2, unitPrice: 10 }, { productId: 'prod-2', productName: 'Gadget', quantity: 1, unitPrice: 25 }, ], }; const order = await orderService.createOrder(command, { correlationId: 'corr-1', requestId: 'req-1', userId: 'user-1', }); const event = await prisma.outboxEvent.findFirst({ where: { aggregateId: order.id }, }); const payload = event?.payload as any; expect(payload.orderId).toBe(order.id); expect(payload.customerId).toBe('cust-123'); expect(payload.items).toHaveLength(2); expect(payload.total).toBe(45); // 2*10 + 1*25 }); });}); // Publisher integration testsdescribe('OutboxPublisher', () => { let publisher: OutboxPublisher; let mockKafka: jest.Mocked<KafkaProducer>; beforeEach(() => { mockKafka = { send: jest.fn().mockResolvedValue(undefined), } as any; publisher = new OutboxPublisher(prisma, mockKafka, { batchSize: 10, maxRetries: 3, }); }); it('should publish pending events and mark as published', async () => { // Create test event await prisma.outboxEvent.create({ data: { aggregateType: 'Order', aggregateId: 'order-1', eventType: 'OrderCreated', payload: { orderId: 'order-1' }, correlationId: 'corr-1', }, }); await publisher.processBatch(); // Verify Kafka was called expect(mockKafka.send).toHaveBeenCalledTimes(1); expect(mockKafka.send).toHaveBeenCalledWith( expect.objectContaining({ topic: 'order.order-created', }) ); // Verify marked as published const event = await prisma.outboxEvent.findFirst(); expect(event?.publishedAt).not.toBeNull(); }); it('should not republish already published events', async () => { await prisma.outboxEvent.create({ data: { aggregateType: 'Order', aggregateId: 'order-1', eventType: 'OrderCreated', payload: {}, publishedAt: new Date(), // Already published }, }); await publisher.processBatch(); expect(mockKafka.send).not.toHaveBeenCalled(); });});For reliable integration testing, use Testcontainers to spin up real PostgreSQL, Kafka, and other dependencies. This catches issues that mocks miss, like transaction isolation bugs and SQL syntax differences.
You have now mastered the Outbox Pattern—from understanding why dual writes fail to implementing production-ready solutions across multiple technologies. This pattern is fundamental to building reliable distributed systems and event-driven architectures. Apply it whenever you need to reliably publish events alongside database changes.