Loading learning content...
At the heart of the Outbox Pattern lies a deceptively simple idea: leverage the atomicity guarantees that databases already provide. Every relational database worth its salt has been perfecting ACID transactions for decades. The transactional outbox doesn't reinvent distributed coordination—it cleverly reframes the problem to use single-database transactions, which are rock-solid and well-understood.
But implementing a transactional outbox correctly requires understanding the subtle details of how database transactions work. What isolation level do you need? How do you handle concurrent writes? What about database-specific behaviors that could break your guarantees?
This page takes you deep into the mechanics of the transactional outbox, ensuring you can implement it correctly across different database engines and handle the edge cases that trip up less careful implementations.
By the end of this page, you will understand how to implement transactional outbox with proper isolation guarantees, handle concurrent writers correctly, work with different database engines (PostgreSQL, MySQL, MongoDB), and avoid common pitfalls that break atomicity.
Let's dissect exactly what happens when you write business data and an outbox event in the same transaction. Understanding the sequence of operations clarifies why this approach provides the guarantees we need.
The Transaction Lifecycle:
┌─────────────────────────────────────────────────────────────────┐
│ DATABASE TRANSACTION │
│ │
│ BEGIN ─────────────────────────────────────────────────────► │
│ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │
│ ├─►│ INSERT INTO orders (id, customer_id, ...) │ │
│ │ │ VALUES ('ord-123', 'cust-456', ...) │ │
│ │ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌─────────────────────────────────────────────────────┐ │
│ ├─►│ INSERT INTO outbox_events (...) │ │
│ │ │ VALUES ('evt-789', 'Order', 'ord-123', │ │
│ │ │ 'OrderCreated', '{...}') │ │
│ │ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ COMMIT (atomic durability flush) ─────────────────────────► │
│ │
│ Both writes become visible simultaneously to other │
│ transactions. Either both exist or neither exists. │
└─────────────────────────────────────────────────────────────────┘
Key Properties:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
// PRODUCTION-GRADE TRANSACTIONAL OUTBOX WRITE import { v4 as uuidv4 } from 'uuid'; interface TransactionContext { query<T>(sql: string, params: unknown[]): Promise<T>; commit(): Promise<void>; rollback(): Promise<void>;} interface OutboxEvent { id: string; aggregateType: string; aggregateId: string; eventType: string; eventVersion: number; payload: unknown; metadata: EventMetadata;} interface EventMetadata { correlationId: string; causationId?: string; userId?: string; timestamp: Date;} class OrderService { constructor(private db: Database) {} async createOrder( command: CreateOrderCommand, context: RequestContext ): Promise<Order> { // Generate IDs upfront - before transaction // This enables idempotency and correlation const orderId = uuidv4(); const eventId = uuidv4(); const order = await this.db.transaction(async (tx: TransactionContext) => { // === STEP 1: INSERT BUSINESS DATA === const [order] = await tx.query<Order[]>(` INSERT INTO orders ( id, customer_id, status, shipping_address, billing_address, subtotal, tax, shipping_cost, total, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW() ) RETURNING * `, [ orderId, command.customerId, 'PENDING', JSON.stringify(command.shippingAddress), JSON.stringify(command.billingAddress), command.subtotal, command.tax, command.shippingCost, command.subtotal + command.tax + command.shippingCost ]); // === STEP 2: INSERT ORDER ITEMS === for (const item of command.items) { await tx.query(` INSERT INTO order_items ( id, order_id, product_id, quantity, unit_price, line_total ) VALUES ($1, $2, $3, $4, $5, $6) `, [ uuidv4(), orderId, item.productId, item.quantity, item.unitPrice, item.quantity * item.unitPrice ]); } // === STEP 3: INSERT OUTBOX EVENT (SAME TRANSACTION) === const event: OutboxEvent = { id: eventId, aggregateType: 'Order', aggregateId: orderId, eventType: 'OrderCreated', eventVersion: 1, payload: { orderId: orderId, customerId: command.customerId, items: command.items.map(i => ({ productId: i.productId, productName: i.productName, quantity: i.quantity, unitPrice: i.unitPrice })), shippingAddress: command.shippingAddress, subtotal: command.subtotal, tax: command.tax, shippingCost: command.shippingCost, total: order.total, currency: 'USD' }, metadata: { correlationId: context.correlationId, causationId: context.requestId, userId: context.userId, timestamp: new Date() } }; await tx.query(` INSERT INTO outbox_events ( id, aggregate_type, aggregate_id, event_type, event_version, payload, metadata, correlation_id, causation_id, created_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, NOW() ) `, [ event.id, event.aggregateType, event.aggregateId, event.eventType, event.eventVersion, JSON.stringify(event.payload), JSON.stringify(event.metadata), event.metadata.correlationId, event.metadata.causationId ]); return order; }); // Transaction committed - order and event both exist return order; }} // KEY POINTS:// // 1. IDs generated BEFORE transaction:// - Enables idempotency key usage// - Allows correlation before commit// - No need to retrieve database-generated IDs//// 2. All writes in SINGLE transaction:// - Order insert// - Order items inserts// - Outbox event insert// - All commit together or all rollback//// 3. Event payload constructed from SAME data:// - No chance of mismatch between order and event// - Event accurately reflects committed state//// 4. Metadata captured at creation time:// - Correlation ID for distributed tracing// - Causation ID links event to triggering action// - Timestamp is event creation time, not publish timeAlways generate event IDs (using UUIDs) before starting the transaction. This allows the event ID to be used as an idempotency key, enables correlation tracking before the transaction commits, and avoids the need to query back database-generated IDs.
Database isolation levels affect when the outbox publisher can see newly inserted events. Understanding this interaction is crucial for correct implementation.
Isolation Level Review:
| Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads | Description |
|---|---|---|---|---|
| READ UNCOMMITTED | ✗ | ✗ | ✗ | Can see uncommitted data (dangerous) |
| READ COMMITTED | ✓ | ✗ | ✗ | Only sees committed data (PostgreSQL default) |
| REPEATABLE READ | ✓ | ✓ | ✗ | Snapshot at transaction start (MySQL InnoDB default) |
| SERIALIZABLE | ✓ | ✓ | ✓ | Full isolation, as if transactions ran sequentially |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
-- SCENARIO: Transaction T1 inserts order + outbox event-- Publisher P1 polls for unpublished events-- What does P1 see? -- === TIMELINE WITH READ COMMITTED === -- T=0: P1 starts polling (READ COMMITTED)-- T=1: T1 begins transaction-- T=2: T1 inserts order-- T=3: T1 inserts outbox event-- T=4: P1 queries outbox (sees NOTHING - T1 not committed)-- T=5: T1 commits-- T=6: P1 queries outbox (sees the event - T1 committed) -- READ COMMITTED is SAFE - publisher never sees uncommitted events -- === TIMELINE WITH REPEATABLE READ === -- T=0: P1 starts polling (REPEATABLE READ)-- T=0: [P1's snapshot taken here]-- T=1: T1 begins transaction-- T=2: T1 inserts order-- T=3: T1 inserts outbox event-- T=4: T1 commits-- T=5: P1 queries outbox (sees NOTHING despite T1 committed!)-- T=6: P1's transaction ends-- T=7: P1 starts new query (NOW sees the event) -- REPEATABLE READ can DELAY visibility but is SAFE-- Just need to start new transactions for each poll iteration -- === TIMELINE WITH READ UNCOMMITTED (DANGEROUS!) === -- T=0: P1 starts polling (READ UNCOMMITTED)-- T=1: T1 begins transaction-- T=2: T1 inserts order-- T=3: T1 inserts outbox event-- T=4: P1 queries outbox (SEES uncommitted event!)-- T=5: P1 publishes to message broker-- T=6: T1 ROLLBACK (order never committed)-- -- DISASTER: Event published for order that doesn't exist!-- NEVER use READ UNCOMMITTED for outbox publisher -- === RECOMMENDED: Explicit READ COMMITTED for publisher === -- PostgreSQL: Set per-transactionBEGIN;SET TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT * FROM outbox_events WHERE published_at IS NULL ORDER BY sequence_number LIMIT 100FOR UPDATE SKIP LOCKED; -- Concurrent publisher safetyCOMMIT; -- MySQL: Set per-session or per-transactionSET TRANSACTION ISOLATION LEVEL READ COMMITTED;START TRANSACTION;SELECT * FROM outbox_events WHERE published_at IS NULL ORDER BY sequence_number LIMIT 100FOR UPDATE SKIP LOCKED;COMMIT;MySQL with InnoDB defaults to REPEATABLE READ. This is safe but means the publisher should use short-lived transactions. Alternatively, explicitly set READ COMMITTED for publisher connections to match PostgreSQL behavior.
In production systems, you typically want multiple publisher instances for high availability. But concurrent publishers create a new challenge: ensuring each event is published exactly once (or at least that duplicates are minimized).
The Problem:
┌─────────────────────────────────────────────────────────────────┐
│ Publisher 1 Publisher 2 │
│ ─────────── ─────────── │
│ │
│ SELECT * FROM outbox SELECT * FROM outbox │
│ WHERE published_at IS NULL WHERE published_at IS NULL │
│ ──────────▼────────── ──────────▼────────── │
│ [event-1, event-2] [event-1, event-2] │
│ │
│ Publish event-1 to Kafka Publish event-1 to Kafka │
│ (DUPLICATE!) (DUPLICATE!) │
│ │
│ UPDATE published_at UPDATE published_at │
│ (Both update same row) (One succeeds, one no-ops) │
└─────────────────────────────────────────────────────────────────┘
Solution: Row-Level Locking with SKIP LOCKED
Modern databases provide FOR UPDATE SKIP LOCKED, which locks selected rows and skips any rows already locked by other transactions. This effectively partitions work across publishers without coordination.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
// CONCURRENT-SAFE OUTBOX PUBLISHER class ConcurrentOutboxPublisher { constructor( private db: Database, private messageQueue: MessageQueue, private config: PublisherConfig ) {} async processBatch(): Promise<number> { // Use explicit READ COMMITTED for consistent behavior return await this.db.transaction(async (tx) => { // Set isolation level (PostgreSQL syntax) await tx.query('SET TRANSACTION ISOLATION LEVEL READ COMMITTED'); // Select events with row-level locking // FOR UPDATE: Lock selected rows // SKIP LOCKED: Skip rows locked by other publishers const events = await tx.query<OutboxEvent[]>(` SELECT id, aggregate_type, aggregate_id, event_type, event_version, payload, metadata, correlation_id, causation_id, created_at, sequence_number FROM outbox_events WHERE published_at IS NULL ORDER BY sequence_number ASC LIMIT $1 FOR UPDATE SKIP LOCKED `, [this.config.batchSize]); if (events.length === 0) { return 0; } // Track successfully published event IDs const publishedIds: string[] = []; const failedIds: string[] = []; for (const event of events) { try { // Construct message for broker const topic = this.getTopicName(event); const message = { key: event.aggregateId, // Partition key for ordering value: { eventId: event.id, eventType: event.eventType, eventVersion: event.eventVersion, aggregateType: event.aggregateType, aggregateId: event.aggregateId, payload: JSON.parse(event.payload), metadata: JSON.parse(event.metadata), timestamp: event.createdAt.toISOString() }, headers: { 'correlation-id': event.correlationId, 'causation-id': event.causationId, 'event-type': event.eventType } }; // Publish with explicit acknowledgment await this.messageQueue.publish(topic, message, { acks: 'all', // Wait for all replicas timeout: this.config.publishTimeout }); publishedIds.push(event.id); } catch (error) { // Log failure but continue with other events console.error(`Failed to publish event ${event.id}:`, error); failedIds.push(event.id); // Update retry count for failed event await tx.query(` UPDATE outbox_events SET publish_attempts = publish_attempts + 1, last_attempt_at = NOW(), last_error = $1 WHERE id = $2 `, [String(error), event.id]); } } // Mark successfully published events if (publishedIds.length > 0) { await tx.query(` UPDATE outbox_events SET published_at = NOW() WHERE id = ANY($1) `, [publishedIds]); } return publishedIds.length; }); } private getTopicName(event: OutboxEvent): string { // Convention: aggregate-type.event-type (lowercase, kebab-case) const aggregate = event.aggregateType.toLowerCase(); const eventType = event.eventType .replace(/([A-Z])/g, '-$1') .toLowerCase() .replace(/^-/, ''); return `${aggregate}.${eventType}`; }} // PUBLISHER ORCHESTRATION class OutboxPublisherOrchestrator { private isRunning = false; private publishers: ConcurrentOutboxPublisher[] = []; constructor( private db: Database, private messageQueue: MessageQueue, private config: OrchestratorConfig ) { // Create publisher pool for (let i = 0; i < config.publisherCount; i++) { this.publishers.push( new ConcurrentOutboxPublisher(db, messageQueue, { batchSize: config.batchSize, publishTimeout: config.publishTimeout }) ); } } async start(): Promise<void> { this.isRunning = true; // Run all publishers concurrently await Promise.all( this.publishers.map((publisher, i) => this.runPublisherLoop(publisher, i) ) ); } private async runPublisherLoop( publisher: ConcurrentOutboxPublisher, index: number ): Promise<void> { while (this.isRunning) { try { const processed = await publisher.processBatch(); if (processed === 0) { // No events found - back off to avoid spinning await this.sleep(this.config.emptyPollDelay); } else { // Events processed - immediately poll again // (minimize latency for burst traffic) await this.sleep(this.config.busyPollDelay); } } catch (error) { console.error(`Publisher ${index} error:`, error); await this.sleep(this.config.errorDelay); } } } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } async stop(): Promise<void> { this.isRunning = false; }}| Strategy | Behavior | Use When |
|---|---|---|
| FOR UPDATE | Blocks until row is unlocked | Single publisher; need guaranteed ordering |
| FOR UPDATE NOWAIT | Immediately fails if row locked | Want fast failure; will retry elsewhere |
| FOR UPDATE SKIP LOCKED | Skips locked rows, returns unlocked | Multiple publishers; best parallelism |
| Advisory Locks | Application-level locking, not row-level | Need to lock conceptual resources, not rows |
When using SKIP LOCKED, events may be published out of sequence order if one publisher is slower than another. If strict global ordering is required, use a single publisher or partition events by aggregate (where per-aggregate ordering is sufficient).
Different databases have different capabilities and syntax for implementing the transactional outbox. Here's how to implement it correctly across the most common databases.
123456789101112131415161718192021222324252627282930313233343536373839404142
-- PostgreSQL: Full-featured outbox with all optimizations -- SchemaCREATE TABLE outbox_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), sequence_number BIGSERIAL NOT NULL UNIQUE, aggregate_type VARCHAR(255) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_version INTEGER NOT NULL DEFAULT 1, payload JSONB NOT NULL, metadata JSONB, correlation_id VARCHAR(255), causation_id VARCHAR(255), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), published_at TIMESTAMPTZ); -- Partial index for unpublished events (very efficient)CREATE INDEX idx_outbox_pending ON outbox_events (sequence_number) WHERE published_at IS NULL; -- Publisher query with SKIP LOCKEDSELECT * FROM outbox_eventsWHERE published_at IS NULLORDER BY sequence_numberLIMIT 100FOR UPDATE SKIP LOCKED; -- PostgreSQL-specific: NOTIFY for low-latency-- Alternative to polling -- In business transaction:INSERT INTO orders (...) VALUES (...);INSERT INTO outbox_events (...) VALUES (...);NOTIFY outbox_events_channel; -- Wake up listenersCOMMIT; -- Publisher listens:LISTEN outbox_events_channel;-- Woken up immediately when events insertedPostgreSQL Advantages:
gen_random_uuid() for native UUID generationFOR UPDATE SKIP LOCKED for efficient concurrent publishingLISTEN/NOTIFY for near-real-time event notificationEvent payloads can sometimes be large—containing entire documents, binary data, or aggregated information. Large payloads create challenges for both the outbox table and the message broker.
The Challenge:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
// STRATEGY 1: Claim Check Pattern// ─────────────────────────────────// Store large data in blob storage, put reference in event interface ClaimCheckEvent { eventId: string; eventType: 'OrderDocumentGenerated'; // Instead of embedding the 50MB PDF: documentRef: { storageType: 's3'; bucket: 'order-documents'; key: 'orders/ord-123/invoice.pdf'; contentType: 'application/pdf'; sizeBytes: 52428800; checksum: 'sha256:abc123...'; }; // Event is tiny; consumer fetches document via reference} class DocumentService { async generateInvoice(order: Order): Promise<void> { await this.db.transaction(async (tx) => { // Generate PDF (large) const pdfBuffer = await this.pdfGenerator.generateInvoice(order); // Store in blob storage (outside transaction is OK) const key = `orders/${order.id}/invoice.pdf`; await this.s3.putObject({ Bucket: 'order-documents', Key: key, Body: pdfBuffer, ContentType: 'application/pdf' }); // Store reference in outbox (small event) await tx.query(` INSERT INTO outbox_events ( id, aggregate_type, aggregate_id, event_type, payload ) VALUES ($1, $2, $3, $4, $5) `, [ uuid(), 'Order', order.id, 'InvoiceGenerated', JSON.stringify({ orderId: order.id, documentRef: { storageType: 's3', bucket: 'order-documents', key: key, contentType: 'application/pdf', sizeBytes: pdfBuffer.length, checksum: calculateSHA256(pdfBuffer) } }) ]); }); }} // STRATEGY 2: Event Compression// ─────────────────────────────// Compress payload before storing import { gzip, gunzip } from 'zlib';import { promisify } from 'util'; const gzipAsync = promisify(gzip);const gunzipAsync = promisify(gunzip); class CompressedOutboxService { async addEvent(tx: Transaction, event: OutboxEvent): Promise<void> { const payloadJson = JSON.stringify(event.payload); // Compress if over threshold let storedPayload: Buffer | string; let isCompressed: boolean; if (payloadJson.length > 10_000) { // 10KB threshold storedPayload = await gzipAsync(payloadJson); isCompressed = true; } else { storedPayload = payloadJson; isCompressed = false; } await tx.query(` INSERT INTO outbox_events ( id, aggregate_type, aggregate_id, event_type, payload, is_compressed ) VALUES ($1, $2, $3, $4, $5, $6) `, [ event.id, event.aggregateType, event.aggregateId, event.eventType, storedPayload, isCompressed ]); } async getEvent(id: string): Promise<OutboxEvent> { const row = await this.db.queryOne(` SELECT * FROM outbox_events WHERE id = $1 `, [id]); let payload: object; if (row.is_compressed) { const decompressed = await gunzipAsync(row.payload); payload = JSON.parse(decompressed.toString()); } else { payload = JSON.parse(row.payload); } return { ...row, payload }; }} // STRATEGY 3: Event Splitting// ───────────────────────────// Split large event into header + multiple chunks interface ChunkedEvent { eventId: string; chunkIndex: number; totalChunks: number; isLastChunk: boolean; payload: string; // Partial payload} class ChunkedEventPublisher { private readonly CHUNK_SIZE = 500_000; // 500KB chunks async publishLargeEvent(event: OutboxEvent): Promise<void> { const fullPayload = JSON.stringify(event.payload); if (fullPayload.length <= this.CHUNK_SIZE) { // Small enough - publish normally await this.messageQueue.publish(event); return; } // Split into chunks const totalChunks = Math.ceil(fullPayload.length / this.CHUNK_SIZE); for (let i = 0; i < totalChunks; i++) { const chunkPayload = fullPayload.slice( i * this.CHUNK_SIZE, (i + 1) * this.CHUNK_SIZE ); const chunk: ChunkedEvent = { eventId: event.id, chunkIndex: i, totalChunks: totalChunks, isLastChunk: i === totalChunks - 1, payload: chunkPayload }; await this.messageQueue.publish({ topic: `${event.aggregateType.toLowerCase()}.chunked`, key: event.id, // All chunks go to same partition value: chunk }); } }} // Consumer reassembles chunks before processing| Strategy | Pros | Cons | Best For |
|---|---|---|---|
| Claim Check | Minimal event size; blob storage is cheap | Extra fetch required; blob might be unavailable | Binary data (images, PDFs, videos) |
| Compression | Transparent; keeps data in event | CPU overhead; still limited by max size | Text-heavy payloads (JSON, XML) |
| Chunking | No size limit; ordered delivery | Complex consumer logic; reassembly required | Very large events that must be inline |
| Event Redesign | Simpler; aligned with event sourcing | Requires domain modeling changes | Events carrying too much data |
Often, large events indicate a design problem. Events should describe what happened, not carry entire entity snapshots. Consider: Does the consumer really need all this data, or can it fetch what it needs using an ID reference?
The transactional outbox is the core mechanism that makes reliable event publishing possible. By leveraging your database's ACID guarantees, you transform an impossible distributed coordination problem into a reliable single-database transaction.
You now understand the mechanics of transactional outbox implementation across different databases. Next, we'll explore the two main strategies for publishing events from the outbox: polling and Change Data Capture (CDC)—each with distinct latency, complexity, and operational characteristics.