Loading content...
CQRS's benefits come with significant implementation complexity. Teams that underestimate this complexity often discover it during production incidents, when the cost of learning is highest.
This page examines the real challenges of CQRS implementation: synchronization between models, handling the inevitable failures, testing strategies for eventually consistent systems, and the operational overhead that persists for the lifetime of the system.
Understanding these costs upfront enables informed decisions and proper planning.
By the end of this page, you will understand synchronization strategies between read and write models, how to handle synchronization failures and ensure eventual consistency, testing approaches for CQRS systems, and the ongoing operational considerations of running CQRS in production.
The core challenge of CQRS with separate data stores is keeping read models synchronized with the write model. Several strategies exist, each with distinct trade-offs.
Strategy 1: Synchronous Update (Transactional)
Update the read model within the same transaction as the write. Provides strong consistency but couples the paths.
1234567891011121314151617181920212223242526272829303132333435363738394041
// SYNCHRONOUS UPDATE: Same transaction class OrderCommandService { async confirmOrder(orderId: string): Promise<void> { await this.database.transaction(async (tx) => { // 1. Update write model const order = await this.orderRepository.getById(orderId, tx); order.confirm(); await this.orderRepository.save(order, tx); // 2. Update read model (SAME transaction) await tx.execute(` UPDATE order_summaries SET status = 'Confirmed', confirmed_at = NOW() WHERE order_id = $1 `, [orderId]); await tx.execute(` UPDATE order_details SET data = jsonb_set(data, '{status}', '"Confirmed"') WHERE order_id = $1 `, [orderId]); }); }} // PROS:// ✓ Strong consistency - read model always reflects write model// ✓ Simple failure handling - rollback covers both models// ✓ No synchronization lag // CONS:// ✗ Single database required (can't use different technologies)// ✗ Write performance impacted by read model update overhead// ✗ Harder to scale independently// ✗ Not true CQRS - paths are coupled // WHEN TO USE:// - Level 1 CQRS (same database, different models)// - Strong consistency is required for reads// - Limited scale requirementsStrategy 2: Transactional Outbox
Write events to an outbox table within the same transaction, then asynchronously process them to update read models. Guarantees eventual delivery without distributed transactions.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
// TRANSACTIONAL OUTBOX: Reliable async synchronization class OrderCommandService { async confirmOrder(orderId: string): Promise<void> { await this.database.transaction(async (tx) => { // 1. Update write model const order = await this.orderRepository.getById(orderId, tx); order.confirm(); await this.orderRepository.save(order, tx); // 2. Write events to outbox (SAME transaction) for (const event of order.getUncommittedEvents()) { await tx.execute(` INSERT INTO outbox (id, event_type, payload, created_at) VALUES ($1, $2, $3, NOW()) `, [uuid(), event.type, JSON.stringify(event)]); } order.clearUncommittedEvents(); }); // Transaction commits - write model AND outbox atomically updated }} // OUTBOX PROCESSOR: Runs separatelyclass OutboxProcessor { async processOutbox(): Promise<void> { while (true) { const events = await this.database.query(` SELECT * FROM outbox WHERE processed_at IS NULL ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED `); for (const event of events) { try { // Publish to message bus for read model updates await this.messageBus.publish(event.event_type, event.payload); // Mark as processed await this.database.execute(` UPDATE outbox SET processed_at = NOW() WHERE id = $1 `, [event.id]); } catch (error) { // Log and continue - will retry on next iteration this.logger.error('Failed to process outbox event', { event, error }); } } await this.sleep(100); // Poll interval } }} // READ MODEL UPDATER: Subscribes to eventsclass OrderReadModelUpdater { @EventHandler('OrderConfirmedEvent') async handleOrderConfirmed(event: OrderConfirmedEvent): Promise<void> { await this.readDatabase.update('order_summaries', { order_id: event.orderId }, { status: 'Confirmed', confirmed_at: event.timestamp } ); await this.searchIndex.update('orders', event.orderId, { status: 'confirmed' }); }} // PROS:// ✓ No distributed transactions// ✓ Guaranteed eventual consistency// ✓ Can use different read technologies// ✓ Write path not blocked by read model update // CONS:// ✗ Eventual consistency (read model lags behind)// ✗ Additional operational complexity (outbox processor)// ✗ Need to handle duplicate events (idempotency)// ✗ Outbox table grows and needs cleanupStrategy 3: Change Data Capture (CDC)
Use database log-based CDC to capture changes and stream them to read model updaters. No application code needed for event publishing.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
// CDC WITH DEBEZIUM: Database log-based synchronization // 1. Configure Debezium connector for PostgreSQLconst debeziumConfig = { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "orders-db", "database.port": "5432", "database.dbname": "orders", "table.include.list": "public.orders,public.order_items", "slot.name": "orders_cdc", "publication.name": "orders_publication", "transforms": "route", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": ".*", "transforms.route.replacement": "order-events"}; // 2. Command service writes ONLY to primary databaseclass OrderCommandService { async confirmOrder(orderId: string): Promise<void> { const order = await this.orderRepository.getById(orderId); order.confirm(); await this.orderRepository.save(order); // That's it! No event publishing, no outbox // CDC captures the database change automatically }} // 3. CDC consumer updates read modelsclass CDCEventConsumer { @KafkaConsumer('order-events') async handleOrderChange(message: DebeziumChangeEvent): Promise<void> { const { before, after, op } = message.payload; switch (op) { case 'c': // Create await this.handleOrderCreated(after); break; case 'u': // Update await this.handleOrderUpdated(before, after); break; case 'd': // Delete await this.handleOrderDeleted(before); break; } } private async handleOrderUpdated( before: OrderRow, after: OrderRow ): Promise<void> { // Derive events from row diffs if (before.status !== after.status) { if (after.status === 'confirmed') { await this.updateOrderConfirmed(after.id, after.confirmed_at); } else if (after.status === 'shipped') { await this.updateOrderShipped(after.id, after.tracking_number); } } }} // PROS:// ✓ Zero application code for event capture// ✓ Captures ALL database changes (even direct SQL)// ✓ Transactional - tied to database commit// ✓ Works with legacy systems // CONS:// ✗ Events are database rows, not domain events (coarser)// ✗ Need to derive business meaning from row diffs// ✗ Additional infrastructure (Debezium, Kafka)// ✗ Schema changes require connector reconfiguration// ✗ Not suitable for complex domain events| Aspect | Synchronous | Transactional Outbox | CDC |
|---|---|---|---|
| Consistency | Strong | Eventual | Eventual |
| Latency | Zero | Milliseconds-Seconds | Seconds |
| Complexity | Low | Medium | High (Infrastructure) |
| Reliability | Transaction-based | At-least-once | At-least-once |
| Technology Flexibility | Single DB only | Multiple stores | Multiple stores |
| Domain Events | Limited | Full fidelity | Derived from rows |
| Operational Load | Low | Medium (processor) | High (CDC pipeline) |
In distributed systems, failures are inevitable. CQRS systems must handle synchronization failures gracefully to maintain eventual consistency.
Failure Types and Mitigation:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
// IDEMPOTENCY: Safely handle duplicate events class OrderReadModelUpdater { // Use event ID for idempotency check async handleOrderConfirmed(event: OrderConfirmedEvent): Promise<void> { // Check if we've already processed this event const processed = await this.idempotencyStore.exists(event.eventId); if (processed) { this.logger.info('Skipping duplicate event', { eventId: event.eventId }); return; } try { // Process the event await this.updateOrderStatus(event.orderId, 'confirmed'); // Mark as processed await this.idempotencyStore.set(event.eventId, { processedAt: new Date(), ttl: 7 * 24 * 60 * 60 // Keep for 7 days }); } catch (error) { // Don't mark as processed - will retry throw error; } }} // IDEMPOTENT UPDATE: Use upsert semanticsclass ReadModelProjector { async projectOrderCreated(event: OrderCreatedEvent): Promise<void> { // Upsert instead of insert - idempotent by design await this.readDb.query(` INSERT INTO order_summaries ( order_id, customer_id, status, total, created_at, last_event_seq ) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (order_id) DO UPDATE SET -- Only update if our event is newer status = CASE WHEN order_summaries.last_event_seq < EXCLUDED.last_event_seq THEN EXCLUDED.status ELSE order_summaries.status END, last_event_seq = GREATEST(order_summaries.last_event_seq, EXCLUDED.last_event_seq) `, [ event.orderId, event.customerId, 'pending', event.total, event.timestamp, event.sequenceNumber ]); }} // CONDITIONAL UPDATES: Prevent stale overwritesasync projectOrderStatusChanged(event: StatusChangedEvent): Promise<void> { // Use sequence number to prevent out-of-order updates const result = await this.readDb.query(` UPDATE order_summaries SET status = $1, status_changed_at = $2, last_event_seq = $3 WHERE order_id = $4 AND last_event_seq < $3 -- Only if this event is newer `, [ event.newStatus, event.timestamp, event.sequenceNumber, event.orderId ]); if (result.rowCount === 0) { // Either order doesn't exist, or we received an older event this.logger.debug('Skipped stale or duplicate event', { event }); }}Dead Letter Queues and Retry Strategies:
When event processing fails repeatedly, events should be moved to a dead letter queue for investigation rather than blocking the entire pipeline.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
// RETRY WITH EXPONENTIAL BACKOFF class ResilientEventProcessor { private readonly maxRetries = 5; private readonly baseDelayMs = 100; async processEvent(event: DomainEvent): Promise<void> { let attempt = 0; while (attempt < this.maxRetries) { try { await this.handler.handle(event); return; // Success! } catch (error) { attempt++; if (attempt >= this.maxRetries) { // Move to dead letter queue await this.deadLetterQueue.enqueue({ event, error: error.message, attempts: attempt, failedAt: new Date() }); this.alerting.notify({ severity: 'warning', message: 'Event moved to DLQ after retries exhausted', context: { eventId: event.eventId, eventType: event.type } }); return; } // Calculate backoff with jitter const delay = this.calculateBackoff(attempt); this.logger.warn(`Retry ${attempt}/${this.maxRetries} for event`, { eventId: event.eventId, nextRetryMs: delay }); await this.sleep(delay); } } } private calculateBackoff(attempt: number): number { // Exponential backoff: 100ms, 200ms, 400ms, 800ms, 1600ms const exponentialDelay = this.baseDelayMs * Math.pow(2, attempt - 1); // Add jitter (±25%) to prevent thundering herd const jitter = exponentialDelay * 0.25 * (Math.random() - 0.5); return Math.min(exponentialDelay + jitter, 30000); // Cap at 30s }} // DEAD LETTER QUEUE PROCESSOR class DeadLetterProcessor { // Manual reprocessing tool for ops async replayDLQEvents(criteria: DLQCriteria): Promise<ReplayResult> { const events = await this.deadLetterQueue.query(criteria); const results = { succeeded: 0, failed: 0, skipped: 0 }; for (const dlqEntry of events) { // Check if the root cause was fixed if (!await this.shouldRetry(dlqEntry)) { results.skipped++; continue; } try { await this.eventProcessor.processEvent(dlqEntry.event); await this.deadLetterQueue.markResolved(dlqEntry.id); results.succeeded++; } catch (error) { // Update failure info, keep in DLQ await this.deadLetterQueue.updateFailure(dlqEntry.id, error); results.failed++; } } return results; } private async shouldRetry(entry: DLQEntry): Promise<boolean> { // Skip if too old (data might be stale) const maxAge = 7 * 24 * 60 * 60 * 1000; // 7 days if (Date.now() - entry.failedAt.getTime() > maxAge) { return false; } // Check if read model has been rebuilt since failure // (event might already be incorporated via rebuild) // ... additional checks return true; }}When read models become inconsistent due to bugs or lost events, you need the ability to rebuild them from scratch. This requires: (1) storing events durably (event store or event log), (2) having replay capability in projectors, (3) handling the load of bulk replays. Design for this from the start—retroactive addition is painful.
Eventual consistency means reads may not immediately reflect writes. This reality must be addressed at multiple levels: infrastructure, application, and user experience.
Measuring and Monitoring Lag:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
// MEASURE READ MODEL LAG class ConsistencyLagMonitor { // Track last processed event timestamp per read model async recordEventProcessed( readModel: string, eventTimestamp: Date ): Promise<void> { const now = new Date(); const lagMs = now.getTime() - eventTimestamp.getTime(); // Emit metrics this.metrics.histogram('read_model_lag_ms', lagMs, { readModel }); // Store watermark for dashboards await this.redis.set( `read_model:${readModel}:last_processed`, eventTimestamp.toISOString() ); } // Alert if lag exceeds threshold async checkLagHealth(): Promise<void> { const models = ['order_summaries', 'order_search', 'order_analytics']; for (const model of models) { const lastProcessed = await this.redis.get( `read_model:${model}:last_processed` ); if (!lastProcessed) continue; const lagMs = Date.now() - new Date(lastProcessed).getTime(); // Different thresholds for different models const thresholds = { 'order_summaries': 5000, // 5 seconds 'order_search': 30000, // 30 seconds 'order_analytics': 300000 // 5 minutes }; if (lagMs > thresholds[model]) { await this.alerting.notify({ severity: 'warning', message: `Read model ${model} lag exceeds threshold`, context: { lagMs, threshold: thresholds[model] } }); } } }} // EXPOSE LAG TO API CONSUMERS interface OrderListResponse { orders: OrderSummaryDto[]; metadata: { // Let consumers know data freshness dataAsOf: string; // "2024-01-15T10:30:00Z" maxLagSeconds: number; // 5 };}UI Patterns for Eventual Consistency:
Users expect immediate feedback. Several UX patterns bridge the gap between command execution and read model update.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
// PATTERN 1: OPTIMISTIC UI UPDATE function OrderConfirmButton({ order, onConfirm }: Props) { const [optimisticStatus, setOptimisticStatus] = useState<string | null>(null); const handleConfirm = async () => { // Immediately show confirmed status (optimistic) setOptimisticStatus('confirmed'); try { await confirmOrder(order.id); // After success, let it stay optimistic until next data fetch // The read model will catch up } catch (error) { // Rollback optimistic update on failure setOptimisticStatus(null); showError('Failed to confirm order'); } }; const displayStatus = optimisticStatus ?? order.status; return ( <div> <StatusBadge status={displayStatus} /> {order.status === 'pending' && ( <Button onClick={handleConfirm}>Confirm Order</Button> )} </div> );} // PATTERN 2: COMMAND RESPONSE WITH UPDATED VIEW // Instead of returning just success/failure, return the new state interface ConfirmOrderResponse { success: boolean; orderId: string; // Include the updated view data in command response updatedOrder: { status: 'confirmed'; confirmedAt: string; };} // Client uses response data, not stale read modelfunction OrderPage({ orderId }: Props) { const [orderData, setOrderData] = useState<Order | null>(null); const handleConfirm = async () => { const response = await confirmOrder(orderId); // Merge command response into local state setOrderData(prev => ({ ...prev, ...response.updatedOrder })); // Later fetches will get consistent data from read model };} // PATTERN 3: POLLING WITH FRESH DATA INDICATOR function OrderListPage() { const [orders, setOrders] = useState<Order[]>([]); const [lastUpdated, setLastUpdated] = useState<Date | null>(null); // Poll for updates useEffect(() => { const interval = setInterval(async () => { const data = await fetchOrders(); setOrders(data.orders); setLastUpdated(new Date(data.metadata.dataAsOf)); }, 5000); return () => clearInterval(interval); }, []); return ( <div> <span className="text-muted"> Updated {formatRelativeTime(lastUpdated)} </span> <OrderTable orders={orders} /> </div> );} // PATTERN 4: SHOW PENDING INDICATOR FOR RECENT CHANGES function RecentOrderCard({ order, commandTimestamps }: Props) { const recentlyChanged = commandTimestamps[order.id] && (Date.now() - commandTimestamps[order.id] < 10000); return ( <div className={recentlyChanged ? 'opacity-70' : ''}> <OrderCard order={order} /> {recentlyChanged && ( <span className="text-sm text-yellow-600"> <Spinner size="sm" /> Updating... </span> )} </div> );}Users don't think in terms of 'eventual consistency.' They think: 'I clicked confirm, so it should be confirmed.' Design your UI to never contradict this expectation. If the read model hasn't caught up, show the expected state or clearly indicate that an update is in progress—never show stale data that contradicts the user's recent action.
CQRS introduces testing complexity beyond traditional architectures. You need to test commands, queries, event handling, and the eventual consistency of the full system.
Testing Layers:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
// UNIT TEST: Command handlers in isolation describe('ConfirmOrderHandler', () => { let handler: ConfirmOrderHandler; let orderRepository: MockOrderRepository; let eventPublisher: MockEventPublisher; beforeEach(() => { orderRepository = new MockOrderRepository(); eventPublisher = new MockEventPublisher(); handler = new ConfirmOrderHandler(orderRepository, eventPublisher); }); it('should confirm a pending order', async () => { // Arrange const order = Order.create({ customerId: 'cust-123', items: [{ productId: 'prod-1', quantity: 2, price: 10 }] }); orderRepository.setOrder(order); // Act await handler.handle(new ConfirmOrderCommand(order.id)); // Assert const savedOrder = orderRepository.getLastSaved(); expect(savedOrder.status).toBe('confirmed'); const publishedEvents = eventPublisher.getPublishedEvents(); expect(publishedEvents).toHaveLength(1); expect(publishedEvents[0]).toBeInstanceOf(OrderConfirmedEvent); }); it('should reject confirming a shipped order', async () => { // Arrange const order = Order.create({...}); order.confirm(); order.ship('TRACK-123'); orderRepository.setOrder(order); // Act & Assert await expect( handler.handle(new ConfirmOrderCommand(order.id)) ).rejects.toThrow(InvalidStateTransitionError); expect(eventPublisher.getPublishedEvents()).toHaveLength(0); }); it('should publish events atomically with persistence', async () => { // Arrange const order = Order.create({...}); orderRepository.setOrder(order); orderRepository.failOnSave = true; // Simulate failure // Act & Assert await expect( handler.handle(new ConfirmOrderCommand(order.id)) ).rejects.toThrow(); // Events should NOT be published if save fails expect(eventPublisher.getPublishedEvents()).toHaveLength(0); });});123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
// UNIT TEST: Event projectors describe('OrderReadModelProjector', () => { let projector: OrderReadModelProjector; let readDatabase: MockReadDatabase; beforeEach(() => { readDatabase = new MockReadDatabase(); projector = new OrderReadModelProjector(readDatabase); }); it('should create order summary on OrderPlacedEvent', async () => { // Arrange const event = new OrderPlacedEvent({ orderId: 'order-123', customerId: 'cust-456', customerName: 'John Doe', items: [ { productId: 'prod-1', name: 'Widget', quantity: 2, price: 25 } ], total: 50, timestamp: new Date('2024-01-15T10:00:00Z') }); // Act await projector.handleOrderPlaced(event); // Assert const summary = readDatabase.findOne('order_summaries', { order_id: 'order-123' }); expect(summary).toEqual({ order_id: 'order-123', customer_name: 'John Doe', item_count: 1, total_amount: 50, status: 'pending', created_at: event.timestamp }); }); it('should be idempotent for duplicate events', async () => { // Arrange const event = new OrderPlacedEvent({ eventId: 'evt-123', orderId: 'order-123', // ... }); // Act - process same event twice await projector.handleOrderPlaced(event); await projector.handleOrderPlaced(event); // Assert - only one record created const summaries = readDatabase.findAll('order_summaries', { order_id: 'order-123' }); expect(summaries).toHaveLength(1); }); it('should ignore out-of-order events', async () => { // Arrange - create with sequence 5 await projector.handleOrderPlaced(new OrderPlacedEvent({ orderId: 'order-123', sequenceNumber: 5, // ... })); // Process status change with sequence 10 await projector.handleOrderConfirmed(new OrderConfirmedEvent({ orderId: 'order-123', sequenceNumber: 10, // ... })); // Act - receive late event with sequence 7 (out of order) await projector.handleOrderConfirmed(new OrderConfirmedEvent({ orderId: 'order-123', sequenceNumber: 7, // ... })); // Assert - sequence 10 data preserved (not overwritten by 7) const summary = readDatabase.findOne('order_summaries', { order_id: 'order-123' }); expect(summary.last_event_seq).toBe(10); });});12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
// INTEGRATION TEST: Full command-to-query flow describe('Order CQRS Integration', () => { let app: TestApplication; beforeAll(async () => { // Start real databases and message broker in containers app = await TestApplication.create({ writeDb: 'postgres', readDb: 'postgres', messageBroker: 'kafka' }); }); afterAll(async () => { await app.teardown(); }); it('should reflect command changes in queries after sync', async () => { // Arrange const customerId = 'test-customer-1'; // Act - Execute command const { orderId } = await app.commandBus.execute( new CreateOrderCommand({ customerId, items: [{ productId: 'prod-1', quantity: 1, price: 100 }] }) ); // Wait for read model synchronization (with timeout) await app.waitForReadModelSync(orderId, { timeoutMs: 5000 }); // Assert - Query returns the order const orders = await app.queryService.getOrderList({ customerId, page: 1, pageSize: 10 }); expect(orders.items).toHaveLength(1); expect(orders.items[0].orderId).toBe(orderId); expect(orders.items[0].status).toBe('pending'); }); it('should handle command failures gracefully', async () => { // Arrange - order in non-confirmable state const { orderId } = await app.createTestOrder('shipped'); // Act & Assert - confirm should fail await expect( app.commandBus.execute(new ConfirmOrderCommand(orderId)) ).rejects.toThrow(InvalidStateTransitionError); // Read model should be unchanged const order = await app.queryService.getOrderDetail(orderId); expect(order.status).toBe('shipped'); });}); // CONTRACT TEST: Event schema compatibility describe('Event Schema Contracts', () => { it('OrderPlacedEvent should match consumer expectations', () => { const event = new OrderPlacedEvent({ eventId: uuid(), orderId: 'order-123', customerId: 'cust-456', customerName: 'John', items: [{ productId: 'prod-1', quantity: 1, price: 10 }], total: 10, timestamp: new Date() }); // Serialize/deserialize to verify schema const json = JSON.stringify(event); const restored = JSON.parse(json); // Assert required fields for consumers expect(restored).toHaveProperty('orderId'); expect(restored).toHaveProperty('customerId'); expect(restored).toHaveProperty('total'); expect(restored).toHaveProperty('timestamp'); });});Integration tests must account for synchronization delays. Use polling with timeouts rather than fixed delays. Assert on the eventual state, not intermediate states. Consider 'wait for condition' helpers that poll until the expected state is reached or timeout.
CQRS systems have ongoing operational requirements beyond initial implementation. Teams must be prepared for sustained maintenance.
Infrastructure Requirements:
| Component | Purpose | Operational Needs |
|---|---|---|
| Write Database | Primary source of truth | Backup, replication, monitoring |
| Read Database(s) | Optimized query stores | Separate backup, scaling, monitoring per store |
| Message Broker | Event distribution | Cluster management, partition balancing, retention |
| Outbox Processor | Event relay (if using outbox) | Monitoring, scaling with volume |
| Projection Workers | Read model updates | Horizontal scaling, lag monitoring |
| Dead Letter Queues | Failed event storage | Alerting, reprocessing tools |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
// KEY METRICS TO MONITOR interface CQRSMetrics { // Command side commandsPerSecond: Gauge; commandLatencyMs: Histogram; commandErrorRate: Counter; // Event pipeline eventsPublishedPerSecond: Gauge; eventPublishLatencyMs: Histogram; outboxQueueDepth: Gauge; // Projection side eventsProcessedPerSecond: Gauge; projectionLagMs: Histogram; // Time from event creation to projection complete projectionErrorRate: Counter; deadLetterQueueDepth: Gauge; // Read side queriesPerSecond: Gauge; queryLatencyMs: Histogram; readStoreAvailability: Gauge;} // ALERTING RULES const alertingRules = [ { name: 'HighProjectionLag', condition: 'avg(projectionLagMs) > 10000', // > 10 seconds severity: 'warning', description: 'Read models are falling behind write model' }, { name: 'CriticalProjectionLag', condition: 'avg(projectionLagMs) > 60000', // > 1 minute severity: 'critical', description: 'Read models severely behind - user experience impacted' }, { name: 'DeadLetterQueueGrowing', condition: 'deadLetterQueueDepth > 100', severity: 'warning', description: 'Events failing processing - investigate cause' }, { name: 'OutboxBacklog', condition: 'outboxQueueDepth > 1000', severity: 'warning', description: 'Events not being relayed to consumers' }, { name: 'ProjectorDown', condition: 'eventsProcessedPerSecond == 0 AND eventsPublishedPerSecond > 0', severity: 'critical', description: 'Projection workers not processing events' }]; // RUNBOOK: Read Model Rebuild /** * When to rebuild read models: * 1. Bug in projector caused incorrect data * 2. Schema migration requires re-projection * 3. New read model added retroactively * * Process: * 1. Stop projector for target read model * 2. Truncate read model tables * 3. Start replay from event store position 0 * 4. Monitor progress and lag * 5. Resume normal projection when caught up */CQRS's benefits come with real implementation and operational costs. Here's what you need to manage:
What's Next:
In the final page, we'll explore CQRS with Event Sourcing—the powerful combination where events become the source of truth, enabling features like complete audit trails, temporal queries, and easy read model evolution. Event sourcing is CQRS's most natural companion.
You now understand the real complexity of CQRS implementation: synchronization challenges, failure handling, consistency management, testing strategies, and operational requirements. This awareness enables realistic planning and prevents unpleasant production surprises.