Loading learning content...
In CQRS, the write side is the source of truth. The read side is a projection—a derived view optimized for queries. Between them lies the synchronization layer: the infrastructure that ensures changes on the write side eventually appear on the read side.
This synchronization is deceptively complex. It must be:
This page explores the patterns and technologies that make robust synchronization possible at scale.
This page covers the major synchronization approaches: change data capture, event streaming, transaction outbox pattern, and dual-write strategies. You'll learn when to use each, their trade-offs, and implementation patterns for production systems.
There are several fundamental approaches to keeping read models synchronized with write models. Each has distinct characteristics, trade-offs, and appropriate use cases.
The core question: How do changes in the write store get propagated to the read store(s)?
Approach 1: Application-Level Event Publishing
The application explicitly publishes events to a message broker after writing to the database. Projections consume from the broker.
Approach 2: Change Data Capture (CDC)
A separate service monitors the write database's transaction log and publishes changes as events. No application code changes needed.
Approach 3: Transactional Outbox Pattern
Events are written to an 'outbox' table in the same transaction as the write. A separate process reads the outbox and publishes to the broker.
Approach 4: Dual Writes (Anti-pattern)
Writing to both the write store and read store directly. Generally problematic and should be avoided.
| Approach | Reliability | Complexity | Coupling | Use Case |
|---|---|---|---|---|
| Application Events | Medium (at-least-once) | Low | Tight to broker | Event-sourced systems |
| CDC | High (from DB log) | Medium | Decoupled from app | Legacy system integration |
| Transactional Outbox | High (transactional) | Medium-High | Requires outbox table | Financial/critical systems |
| Dual Writes | Low (no atomicity) | Low | Tight to both stores | AVOID in production |
Never write directly to both write and read stores in the same operation. Without distributed transactions, failures after the first write but before the second leave your stores permanently inconsistent. This is a common anti-pattern that causes subtle, hard-to-debug data corruption.
In this approach, your application code explicitly publishes events after successful writes. This is common in event-sourced systems where events are first-class citizens.
Architecture:
┌────────────────────────────────────────────────────────────────────────┐│ COMMAND HANDLER ││ ││ 1. Validate command ││ 2. Execute business logic ││ 3. Persist to write store (transaction) ││ 4. Publish events to message broker ←─── Can fail after step 3! ││ │└─────────────────────────────────┬──────────────────────────────────────┘ │ ▼ Events ┌─────────────────────────────┐ │ MESSAGE BROKER │ │ (Kafka, RabbitMQ, etc.) │ └─────────────────────────────┘ │ ┌──────────────────────┼──────────────────────┐ │ │ │ ▼ ▼ ▼ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ Projection 1 │ │ Projection 2 │ │ Other │ │ (Product │ │ (Analytics │ │ Consumers │ │ Read Model) │ │ Service) │ │ │ └───────────────┘ └───────────────┘ └───────────────┘12345678910111213141516171819202122232425262728293031323334
class OrderCommandHandler { constructor( private orderRepository: OrderRepository, private eventPublisher: EventPublisher, private transactionManager: TransactionManager ) {} async placeOrder(command: PlaceOrderCommand): Promise<Order> { // Execute within transaction return await this.transactionManager.transaction(async (tx) => { // 1. Create domain entity const order = Order.create(command); // 2. Persist to write store await this.orderRepository.save(order, tx); // 3. Collect domain events const events = order.getUncommittedEvents(); // PROBLEM: If we commit the transaction but event publish fails, // the order exists but no events are published return order; }); // 4. Publish events AFTER transaction commits // But what if this fails? Data is saved, events are lost! const events = order.getUncommittedEvents(); await this.eventPublisher.publishAll(events); order.markEventsAsCommitted(); return order; }}The Reliability Problem:
With this approach, there's a window between committing to the database and publishing events where failures can cause lost events. If the process crashes after the database commit but before event publishing, the events are never sent.
Mitigations:
In pure event sourcing, events are stored as the source of truth. A separate process can continuously scan the event store and publish to message brokers. This eliminates the dual-write problem because there's only one write—to the event store.
Change Data Capture (CDC) monitors the database's transaction log (WAL in PostgreSQL, binlog in MySQL) and streams changes as events. This approach is non-invasive—no application code changes required.
How CDC Works:
Popular CDC Tools:
| Tool | Databases Supported | Output Format | Key Features |
|---|---|---|---|
| Debezium | PostgreSQL, MySQL, MongoDB, SQL Server, Oracle | Kafka | Open source, mature, widely adopted |
| AWS DMS | Most major databases | Kinesis, Kafka | Managed service, migration + streaming |
| Fivetran / Airbyte | 200+ sources | Various destinations | SaaS, ELT focus |
| pg_logical | PostgreSQL only | Custom | Built-in PostgreSQL feature |
| Maxwell | MySQL only | Kafka, Kinesis, etc. | Lightweight, MySQL specific |
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐│ PostgreSQL │────▶│ Debezium │────▶│ Kafka ││ │ WAL │ Connector │ │ ││ orders │ │ │ │ dbserver.public. ││ customers │ │ • Reads WAL │ │ orders ││ products │ │ • Transforms │ │ ││ │ │ • Publishes │ │ dbserver.public. │└──────────────────┘ └──────────────────┘ │ customers │ └────────┬─────────┘ │ ┌────────────────────────────────────────────┼─────┐ │ │ │ ▼ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ Order Projection │ │ Search Indexer │ │ │ │ │ │ Consumes events │ │ Updates │ │ Updates read DB │ │ Elasticsearch │ └──────────────────┘ └──────────────────┘123456789101112131415161718192021222324252627282930313233
{ "schema": { /* Kafka Connect schema */ }, "payload": { "before": null, // Previous state (null for INSERT) "after": { // New state "id": 12345, "customer_id": 789, "total_amount": 99.99, "status": "placed", "created_at": 1704897600000 }, "source": { "version": "2.4.0", "connector": "postgresql", "name": "dbserver", "ts_ms": 1704897600123, // Transaction timestamp "snapshot": "false", "db": "ecommerce", "schema": "public", "table": "orders", "txId": 987654, // Transaction ID "lsn": 23456789, // Log sequence number "xmin": null }, "op": "c", // Operation: c=create, u=update, d=delete, r=read (snapshot) "ts_ms": 1704897600200, "transaction": { "id": "987654:23456789", "total_order": 1, "data_collection_order": 1 } }}Advantages of CDC:
Challenges with CDC:
CDC captures data changes: 'Order table row inserted.' Domain events capture intent: 'Customer placed an order.' If your projections need rich business context (e.g., who initiated, why, related operations), domain events are preferable. CDC excels for technical synchronization without semantic enrichment.
The transactional outbox pattern solves the dual-write problem by storing events in the same database transaction as the entity change. An outbox table holds pending events; a separate process publishes them to the message broker.
How It Works:
This guarantees: if the entity is saved, the event is also saved. No events can be lost due to crashes between steps.
12345678910111213141516171819202122
CREATE TABLE outbox ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- Event metadata aggregate_type VARCHAR(255) NOT NULL, -- e.g., 'Order' aggregate_id VARCHAR(255) NOT NULL, -- e.g., order ID event_type VARCHAR(255) NOT NULL, -- e.g., 'OrderPlaced' -- Event payload payload JSONB NOT NULL, -- Tracking created_at TIMESTAMPTZ DEFAULT NOW(), published_at TIMESTAMPTZ, -- NULL until published -- For ordering and deduplication sequence_number BIGSERIAL, -- Indexes for relay processing INDEX idx_unpublished (published_at) WHERE published_at IS NULL, INDEX idx_aggregate (aggregate_type, aggregate_id));1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
class OrderCommandHandler { async placeOrder(command: PlaceOrderCommand): Promise<Order> { return await this.transactionManager.transaction(async (tx) => { // 1. Create and save order const order = Order.create(command); await this.orderRepository.save(order, tx); // 2. Write events to outbox in SAME transaction const events = order.getUncommittedEvents(); await this.outboxRepository.insertAll(events.map(event => ({ aggregateType: 'Order', aggregateId: order.id, eventType: event.type, payload: event.toJSON() })), tx); // 3. Transaction commits both atomically return order; }); // Events are now safely in outbox; relay will publish them }} // Outbox relay process (runs separately)class OutboxRelay { async processOutbox(): Promise<void> { while (true) { const events = await this.outboxRepository.getUnpublished({ limit: 100 }); if (events.length === 0) { await sleep(100); // Polling interval continue; } for (const event of events) { try { // Publish to message broker await this.messageBroker.publish( `${event.aggregateType}.${event.eventType}`, event.payload, { messageId: event.id, // For deduplication headers: { 'aggregate-id': event.aggregateId, 'sequence': event.sequenceNumber.toString() } } ); // Mark as published await this.outboxRepository.markPublished(event.id); } catch (error) { // Log error; will retry on next poll console.error(`Failed to publish event ${event.id}: ${error}`); } } } }}Optimizations for Outbox Relay:
Use CDC on the outbox table: Instead of polling, use Debezium to capture outbox inserts and stream to Kafka. Eliminates polling latency and load.
Batched publishing: Publish multiple events in a single broker request.
Partitioned processing: Multiple relay instances process different aggregate types or ID ranges.
Cleanup job: Periodically delete old published events to prevent table bloat.
For strong read-your-writes consistency, the writing service can be a consumer of its own events. After writing to outbox, it consumes from the broker and updates its local read cache. This ensures the same instance that wrote data can immediately read it, while other instances get eventual consistency.
The message broker is the backbone of CQRS synchronization. Choosing the right broker significantly impacts reliability, performance, and operational complexity.
Key Selection Criteria:
| Broker | Ordering | Retention | Consumer Groups | Best For |
|---|---|---|---|---|
| Apache Kafka | Per-partition | Configurable (days-forever) | Yes, with rebalancing | High-throughput, event sourcing |
| Amazon Kinesis | Per-shard | 24h - 365 days | Yes, with KCL | AWS-native, managed |
| Amazon SQS | FIFO mode only | Up to 14 days | No native groups | Simple queuing, low throughput |
| RabbitMQ | Per-queue | Until consumed (persistent) | Competing consumers | Traditional messaging, routing |
| Google Pub/Sub | Per-subscription | 7 days default | Yes | GCP-native, global |
| Azure Event Hubs | Per-partition | 1-7 days (standard) | Yes | Azure-native, Kafka-compatible |
| Pulsar | Per-topic/partition | Tiered (hot to cold) | Yes | Multi-tenancy, tiered storage |
Kafka Deep Dive (Most Common Choice):
Kafka is the de-facto standard for event streaming in CQRS systems. Key concepts:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
// Producer configuration for reliable publishingconst producer = kafka.producer({ idempotent: true, // Prevent duplicate messages on retry maxInFlightRequests: 5, // Balance throughput and ordering transactionTimeout: 60000, // For exactly-once semantics}); // Topic configuration for CQRS eventsconst topicConfig = { topic: 'order-events', numPartitions: 12, // Parallelism for consumers replicationFactor: 3, // Durability across broker failures // Compaction for event sourcing (keep latest per key) // OR deletion for pure event streaming cleanupPolicy: 'delete', // or 'compact' for event store pattern retentionMs: 7 * 24 * 60 * 60 * 1000, // 7 days retention minInsyncReplicas: 2, // Require 2 replicas for durability}; // Consumer configuration for projectionsconst consumer = kafka.consumer({ groupId: 'order-projection-v1', // Change for projection rebuild // Start from earliest to rebuild, or latest for new projections fromBeginning: true, // Manual commits for at-least-once autoCommit: false, // Tune for latency vs throughput sessionTimeout: 30000, heartbeatInterval: 3000, maxWaitTimeInMs: 500,}); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const event = JSON.parse(message.value.toString()); // Process the event await projection.apply(event); // Commit offset after successful processing await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]); }});Events for the same aggregate MUST go to the same Kafka partition to guarantee ordering. Use the aggregate ID as the message key. If OrderPlaced and OrderShipped for the same order go to different partitions, they might be processed out of order.
With at-least-once delivery, the same event may be delivered multiple times. Projections must be idempotent—processing the same event twice should produce the same result as processing it once.
Why Duplicates Happen:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
// Pattern 1: Event ID trackingclass IdempotentProjection { async apply(event: DomainEvent): Promise<void> { // Check if already processed const processed = await this.eventTracker.isProcessed(event.id); if (processed) { console.log(`Skipping duplicate event: ${event.id}`); return; } // Process event await this.doApply(event); // Mark as processed (atomically with the update if possible) await this.eventTracker.markProcessed(event.id); }} // Pattern 2: Version/sequence-based idempotencyclass VersionedProjection { async apply(event: OrderEvent): Promise<void> { const current = await this.readStore.getOrder(event.orderId); // Version check: only apply if event is newer if (current && event.version <= current.lastAppliedVersion) { console.log(`Skipping outdated event: v${event.version} <= v${current.lastAppliedVersion}`); return; } // Apply and update version const updated = this.applyEvent(current, event); updated.lastAppliedVersion = event.version; await this.readStore.save(updated); }} // Pattern 3: Conditional/upsert operationsclass ConditionalProjection { async handleOrderPlaced(event: OrderPlacedEvent): Promise<void> { // Use database upsert - naturally idempotent await this.readStore.upsertOrder({ id: event.orderId, customerId: event.customerId, items: event.items, status: 'placed', placedAt: event.timestamp, }); } async handleInventoryDecremented(event: InventoryDecrementedEvent): Promise<void> { // Use conditional update - only apply once const result = await this.readStore.updateInventoryIfEventNotApplied( event.productId, event.eventId, -event.quantity ); if (result.rowsAffected === 0) { console.log(`Event already applied: ${event.eventId}`); } }} // SQL implementation of conditional update```sql-- Table has 'applied_events' JSONB column tracking processed event IDsUPDATE inventory_read_modelSET quantity = quantity - $quantity, applied_events = applied_events || jsonb_build_array($eventId)WHERE product_id = $productId AND NOT applied_events ? $eventId; -- Only if not already applied```Projections will fail. Database timeouts, malformed events, bugs in projection logic—all cause processing failures. Robust error handling prevents these from blocking the entire projection pipeline.
The Dead Letter Queue (DLQ) Pattern:
Events that repeatedly fail processing are moved to a 'dead letter queue' for manual investigation. This prevents one bad event from blocking all subsequent events.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
interface RetryPolicy { maxRetries: number; backoffMs: number; backoffMultiplier: number; maxBackoffMs: number;} class ResilientProjectionConsumer { private retryPolicy: RetryPolicy = { maxRetries: 5, backoffMs: 100, backoffMultiplier: 2, maxBackoffMs: 30000 }; async processEvent(event: DomainEvent): Promise<void> { let attempt = 0; let lastError: Error; while (attempt < this.retryPolicy.maxRetries) { try { await this.projection.apply(event); return; // Success } catch (error) { lastError = error; attempt++; // Classify error if (this.isPermanentFailure(error)) { // Don't retry permanent failures (e.g., validation errors) break; } if (attempt < this.retryPolicy.maxRetries) { // Exponential backoff const backoffMs = Math.min( this.retryPolicy.backoffMs * Math.pow(this.retryPolicy.backoffMultiplier, attempt - 1), this.retryPolicy.maxBackoffMs ); console.log(`Retry ${attempt}/${this.retryPolicy.maxRetries} in ${backoffMs}ms for event ${event.id}`); await sleep(backoffMs); } } } // All retries exhausted - send to DLQ await this.sendToDeadLetterQueue(event, lastError); } private isPermanentFailure(error: Error): boolean { // Errors that won't succeed on retry return ( error instanceof ValidationError || error instanceof MalformedEventError || error instanceof BusinessRuleViolation ); } private async sendToDeadLetterQueue(event: DomainEvent, error: Error): Promise<void> { const dlqMessage = { originalEvent: event, error: { message: error.message, stack: error.stack, name: error.name }, failedAt: new Date().toISOString(), projectionName: this.projection.name, attempts: this.retryPolicy.maxRetries }; // Publish to DLQ topic await this.dlqProducer.publish('projection-dlq', dlqMessage); // Increment DLQ metric for alerting this.metrics.increment('projection.dlq.count', { projection: this.projection.name, eventType: event.type }); console.error(`Event sent to DLQ: ${event.id}`, error); }} // DLQ processing service (manual or automated)class DLQProcessor { async replayEvent(dlqMessageId: string): Promise<void> { const dlqMessage = await this.dlqStore.getMessage(dlqMessageId); // Attempt to reprocess (after fixing the underlying issue) await this.projectionConsumer.processEvent(dlqMessage.originalEvent); // If successful, remove from DLQ await this.dlqStore.markResolved(dlqMessageId); } async skipEvent(dlqMessageId: string, reason: string): Promise<void> { // Mark as intentionally skipped (e.g., corrupted event) await this.dlqStore.markSkipped(dlqMessageId, reason); }}Every event in the DLQ represents a gap in your read model. Set up alerts for DLQ depth and age. Old messages in the DLQ mean users are seeing stale data. Treat DLQ events as incidents requiring investigation and resolution.
We've covered the critical infrastructure that keeps read models synchronized with write models. Let's consolidate the key insights:
What's Next:
With the mechanics of CQRS understood—command/query separation, read model optimization, eventual consistency, and synchronization—we can now step back and ask: When should you actually use CQRS? The final page explores decision criteria, anti-patterns, and real-world case studies to help you determine if CQRS is the right choice for your system.
You now understand the synchronization infrastructure that powers CQRS systems. From transactional outbox to CDC to message broker selection, you have the knowledge to build reliable pipelines that keep read models consistently synchronized with write models.