Loading content...
CQRS and Event Sourcing are often discussed together because they form a natural partnership. While CQRS can be implemented without event sourcing (and event sourcing without CQRS), combining them creates an architecture greater than the sum of its parts.
Event Sourcing changes how we persist data: instead of storing current state, we store the sequence of events that led to that state. The current state is derived by replaying events.
Combined with CQRS, this creates a powerful architecture where:
This combination is particularly powerful for systems requiring complete audit trails, temporal queries, or the ability to evolve read models without data migration.
By the end of this page, you will understand how event sourcing works, why CQRS and event sourcing complement each other, implementation patterns for event-sourced CQRS, the unique benefits this combination provides, and when the added complexity is justified.
Before exploring the combination, let's ensure a solid understanding of event sourcing itself.
Traditional State Storage:
Order Table:
┌──────────┬───────────┬────────────┬────────┐
│ order_id │ customer │ status │ total │
├──────────┼───────────┼────────────┼────────┤
│ order-1 │ cust-123 │ shipped │ 150.00 │
└──────────┴───────────┴────────────┴────────┘
We know the current state, but not HOW we got here.
Was it: Created → Confirmed → Shipped?
Or: Created → Cancelled → Reopened → Confirmed → Shipped?
Event-Sourced Storage:
Order Events:
┌───────┬──────────┬─────────────────────┬───────────────────────────┐
│ seq │ order_id │ event_type │ event_data │
├───────┼──────────┼─────────────────────┼───────────────────────────┤
│ 1 │ order-1 │ OrderPlaced │ {customer: cust-123, ...} │
│ 2 │ order-1 │ ItemAdded │ {product: prod-A, qty: 2} │
│ 3 │ order-1 │ DiscountApplied │ {amount: 10, code: SAVE10}│
│ 4 │ order-1 │ OrderConfirmed │ {confirmedAt: ...} │
│ 5 │ order-1 │ OrderShipped │ {tracking: TRACK123} │
└───────┴──────────┴─────────────────────┴───────────────────────────┘
We have the COMPLETE history. Current state = replay all events.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
// EVENT-SOURCED ORDER AGGREGATE // Events are immutable factsinterface OrderEvent { readonly eventId: string; readonly eventType: string; readonly orderId: string; readonly timestamp: Date; readonly version: number;} class OrderPlacedEvent implements OrderEvent { readonly eventType = 'OrderPlaced'; constructor( public readonly eventId: string, public readonly orderId: string, public readonly customerId: string, public readonly items: OrderItemData[], public readonly timestamp: Date, public readonly version: number ) {}} class OrderConfirmedEvent implements OrderEvent { readonly eventType = 'OrderConfirmed'; constructor( public readonly eventId: string, public readonly orderId: string, public readonly confirmedAt: Date, public readonly timestamp: Date, public readonly version: number ) {}} // The aggregate derives state from eventsclass Order { private id: string; private customerId: string; private items: OrderItem[] = []; private status: OrderStatus; private version: number = 0; // Uncommitted events from current operation private uncommittedEvents: OrderEvent[] = []; // Private constructor - use factory methods private constructor() {} // FACTORY: Create from event history (reconstitution) static fromEvents(events: OrderEvent[]): Order { const order = new Order(); for (const event of events) { order.apply(event, false); // Don't track as uncommitted } return order; } // FACTORY: Create new order (generates events) static create(command: CreateOrderCommand): Order { const order = new Order(); // Generate and apply creation event const event = new OrderPlacedEvent( uuid(), command.orderId ?? uuid(), command.customerId, command.items, new Date(), 1 // First event for this aggregate ); order.apply(event, true); // Track as uncommitted return order; } // COMMAND: Confirm the order confirm(): void { // Validate business rules if (this.status !== OrderStatus.PENDING) { throw new InvalidStateTransitionError( `Cannot confirm order in status ${this.status}` ); } // Apply the event const event = new OrderConfirmedEvent( uuid(), this.id, new Date(), new Date(), this.version + 1 ); this.apply(event, true); } // Core: Apply an event to update state private apply(event: OrderEvent, isNew: boolean): void { // Update state based on event type switch (event.eventType) { case 'OrderPlaced': this.applyOrderPlaced(event as OrderPlacedEvent); break; case 'OrderConfirmed': this.applyOrderConfirmed(event as OrderConfirmedEvent); break; case 'OrderShipped': this.applyOrderShipped(event as OrderShippedEvent); break; // ... other event types } this.version = event.version; if (isNew) { this.uncommittedEvents.push(event); } } private applyOrderPlaced(event: OrderPlacedEvent): void { this.id = event.orderId; this.customerId = event.customerId; this.items = event.items.map(i => new OrderItem(i)); this.status = OrderStatus.PENDING; } private applyOrderConfirmed(event: OrderConfirmedEvent): void { this.status = OrderStatus.CONFIRMED; } private applyOrderShipped(event: OrderShippedEvent): void { this.status = OrderStatus.SHIPPED; } // Accessor for persistence getUncommittedEvents(): OrderEvent[] { return [...this.uncommittedEvents]; } clearUncommittedEvents(): void { this.uncommittedEvents = []; } getId(): string { return this.id; } getVersion(): number { return this.version; }}Events describe something that happened (past tense): OrderPlaced, OrderConfirmed, OrderShipped. They are immutable facts. Commands describe what we want to happen (imperative): PlaceOrder, ConfirmOrder, ShipOrder. The aggregate validates commands and, if valid, produces events.
When combined, CQRS and Event Sourcing create a distinctive architecture:
┌─────────────────────────────────────────────────────────────────────────┐
│ CQRS + EVENT SOURCING │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ COMMAND SIDE │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌──────────────────┐ │ │
│ │ │ Command │───▶│ Aggregate │───▶│ Event Store │ │ │
│ │ │ Handler │ │(Event-Sourced)│ │ (Append-Only) │ │ │
│ │ └─────────────┘ └─────────────┘ └────────┬─────────┘ │ │
│ │ │ │ │
│ └───────────────────────────────────────────────────│──────────────┘ │
│ │ │
│ Events Published │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ Event Bus │ │
│ └─────────┬─────────┘ │
│ ┌──────────────┬─────────┴─────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ QUERY SIDE │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ │
│ │ │ Projector │ │ Projector │ │ Projector │ │ │
│ │ │ (List View) │ │ (Search) │ │ (Analytics) │ │ │
│ │ └───────┬──────┘ └───────┬──────┘ └──────────┬───────────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ │
│ │ │ PostgreSQL │ │ Elasticsearch│ │ ClickHouse │ │ │
│ │ │ (List Table) │ │ (Index) │ │ (Time-Series) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Key Characteristics:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
// EVENT STORE: Append-only event log interface StoredEvent { eventId: string; streamId: string; // Aggregate ID streamType: string; // Aggregate type (Order, Customer, etc.) eventType: string; eventData: string; // JSON payload metadata: string; // Correlation IDs, causation, user info version: number; // Position in stream globalPosition: number; // Position across all streams createdAt: Date;} class EventStore { constructor(private readonly database: Database) {} // Append events to a stream (aggregate) async append( streamId: string, streamType: string, events: DomainEvent[], expectedVersion: number ): Promise<void> { await this.database.transaction(async (tx) => { // Optimistic concurrency check const currentVersion = await tx.queryScalar<number>(` SELECT COALESCE(MAX(version), 0) FROM events WHERE stream_id = $1 `, [streamId]); if (currentVersion !== expectedVersion) { throw new ConcurrencyError( `Expected version ${expectedVersion}, found ${currentVersion}` ); } // Append each event for (let i = 0; i < events.length; i++) { const event = events[i]; const version = expectedVersion + i + 1; await tx.execute(` INSERT INTO events ( event_id, stream_id, stream_type, event_type, event_data, metadata, version, created_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW()) `, [ event.eventId, streamId, streamType, event.eventType, JSON.stringify(event), JSON.stringify(event.metadata ?? {}), version ]); } }); } // Load all events for a stream (to reconstitute aggregate) async loadStream(streamId: string): Promise<StoredEvent[]> { return this.database.query<StoredEvent>(` SELECT * FROM events WHERE stream_id = $1 ORDER BY version ASC `, [streamId]); } // Load events from a global position (for projections) async loadFromPosition( fromPosition: number, batchSize: number = 1000 ): Promise<StoredEvent[]> { return this.database.query<StoredEvent>(` SELECT * FROM events WHERE global_position > $1 ORDER BY global_position ASC LIMIT $2 `, [fromPosition, batchSize]); } // Subscribe to new events (for live projections) async subscribe( fromPosition: number, handler: (event: StoredEvent) => Promise<void> ): Promise<Subscription> { // Implementation depends on tech: // - PostgreSQL: LISTEN/NOTIFY or polling // - EventStoreDB: Native subscriptions // - Kafka: Consumer groups }} // REPOSITORY: Load and save aggregates via event store class EventSourcedOrderRepository { constructor(private readonly eventStore: EventStore) {} async getById(orderId: string): Promise<Order | null> { const events = await this.eventStore.loadStream(orderId); if (events.length === 0) { return null; } // Reconstitute aggregate from events const domainEvents = events.map(e => this.deserializeEvent(e.eventType, e.eventData) ); return Order.fromEvents(domainEvents); } async save(order: Order): Promise<void> { const uncommittedEvents = order.getUncommittedEvents(); if (uncommittedEvents.length === 0) { return; // Nothing to save } const expectedVersion = order.getVersion() - uncommittedEvents.length; await this.eventStore.append( order.getId(), 'Order', uncommittedEvents, expectedVersion ); order.clearUncommittedEvents(); } private deserializeEvent(type: string, data: string): DomainEvent { const parsed = JSON.parse(data); // Map to specific event classes switch (type) { case 'OrderPlaced': return new OrderPlacedEvent(parsed); case 'OrderConfirmed': return new OrderConfirmedEvent(parsed); // ... etc } }}The event store is append-only. Events are never updated or deleted (except for GDPR compliance via crypto-shredding). This immutability provides the foundation for audit trails, temporal queries, and the ability to rebuild read models from scratch at any time.
In CQRS+ES, read models are built by projecting events into queryable structures. Projections are the bridge between the event stream and the data shapes needed by queries.
Types of Projections:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
// PROJECTION: Order List View class OrderListProjection { private lastProcessedPosition: number = 0; constructor( private readonly eventStore: EventStore, private readonly readDatabase: Database, private readonly positionStore: PositionStore ) {} // Start the projection async start(): Promise<void> { // Resume from last checkpoint this.lastProcessedPosition = await this.positionStore.get('order_list'); // Catch up on missed events await this.catchUp(); // Subscribe to live events await this.subscribeLive(); } private async catchUp(): Promise<void> { let hasMore = true; while (hasMore) { const events = await this.eventStore.loadFromPosition( this.lastProcessedPosition, 1000 // Batch size ); if (events.length === 0) { hasMore = false; continue; } for (const event of events) { await this.processEvent(event); } console.log(`Caught up to position ${this.lastProcessedPosition}`); } } private async subscribeLive(): Promise<void> { await this.eventStore.subscribe( this.lastProcessedPosition, async (event) => { await this.processEvent(event); } ); } private async processEvent(event: StoredEvent): Promise<void> { // Route to appropriate handler switch (event.eventType) { case 'OrderPlaced': await this.handleOrderPlaced(event); break; case 'OrderConfirmed': await this.handleOrderConfirmed(event); break; case 'OrderShipped': await this.handleOrderShipped(event); break; // Ignore events we don't care about } // Update checkpoint this.lastProcessedPosition = event.globalPosition; await this.positionStore.set('order_list', this.lastProcessedPosition); } private async handleOrderPlaced(event: StoredEvent): Promise<void> { const data = JSON.parse(event.eventData); await this.readDatabase.upsert('order_list_view', { order_id: data.orderId, customer_id: data.customerId, customer_name: data.customerName, item_count: data.items.length, total_amount: this.calculateTotal(data.items), status: 'pending', created_at: event.createdAt, last_event_position: event.globalPosition }); } private async handleOrderConfirmed(event: StoredEvent): Promise<void> { const data = JSON.parse(event.eventData); // Conditional update to handle out-of-order delivery await this.readDatabase.execute(` UPDATE order_list_view SET status = 'confirmed', confirmed_at = $1, last_event_position = $2 WHERE order_id = $3 AND last_event_position < $2 `, [data.confirmedAt, event.globalPosition, data.orderId]); }} // MANAGING MULTIPLE PROJECTIONS class ProjectionManager { private projections: Map<string, Projection> = new Map(); register(name: string, projection: Projection): void { this.projections.set(name, projection); } async startAll(): Promise<void> { await Promise.all( Array.from(this.projections.values()).map(p => p.start()) ); } // Rebuild a specific projection from scratch async rebuild(name: string): Promise<void> { const projection = this.projections.get(name); if (!projection) throw new Error(`Unknown projection: ${name}`); console.log(`Rebuilding projection: ${name}`); // 1. Stop the projection await projection.stop(); // 2. Clear its data await projection.truncate(); // 3. Reset position to 0 await this.positionStore.set(name, 0); // 4. Restart (will catch up from beginning) await projection.start(); console.log(`Rebuild complete: ${name}`); } // Status for monitoring async getStatus(): Promise<ProjectionStatus[]> { const globalHead = await this.eventStore.getGlobalPosition(); return Promise.all( Array.from(this.projections.entries()).map(async ([name, p]) => ({ name, lastPosition: await this.positionStore.get(name), globalHead, lag: globalHead - await this.positionStore.get(name), isRunning: p.isRunning() })) ); }}Adding New Projections Retroactively:
One of the most powerful benefits of event sourcing is the ability to create new read models from historical events. If you decide you need a new view or analytics model, you can build it by replaying all past events.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
// SCENARIO: 6 months after launch, marketing wants order analytics // 1. Define the new projectionclass OrderAnalyticsProjection { async handleOrderPlaced(event: StoredEvent): Promise<void> { const data = JSON.parse(event.eventData); await this.analyticsDb.insert('order_facts', { order_id: data.orderId, customer_id: data.customerId, order_date: event.createdAt, order_hour: new Date(event.createdAt).getHours(), day_of_week: new Date(event.createdAt).getDay(), item_count: data.items.length, total_amount: this.calculateTotal(data.items), product_ids: data.items.map(i => i.productId) }); } async handleOrderConfirmed(event: StoredEvent): Promise<void> { const data = JSON.parse(event.eventData); // Calculate confirmation latency const orderFact = await this.analyticsDb.findOne('order_facts', { order_id: data.orderId }); const confirmationLatency = new Date(data.confirmedAt).getTime() - new Date(orderFact.order_date).getTime(); await this.analyticsDb.update('order_facts', { order_id: data.orderId }, { confirmed_at: data.confirmedAt, confirmation_latency_ms: confirmationLatency } ); }} // 2. Replay all historical eventsasync function buildAnalyticsProjection(): Promise<void> { const projection = new OrderAnalyticsProjection(analyticsDb); console.log('Building analytics from historical events...'); let position = 0; let eventsProcessed = 0; while (true) { const events = await eventStore.loadFromPosition(position, 10000); if (events.length === 0) break; for (const event of events) { // Filter to relevant event types if (['OrderPlaced', 'OrderConfirmed', 'OrderShipped'].includes(event.eventType)) { await projection.processEvent(event); } position = event.globalPosition; eventsProcessed++; } console.log(`Processed ${eventsProcessed} events...`); } console.log('Historical replay complete. Switching to live subscription...'); // Now subscribe to new events await eventStore.subscribe(position, event => projection.processEvent(event));} // RESULT: Full analytics with 6 months of historical data// No backfill scripts, no data migration—just replay eventsWith event sourcing, you're not limited to the reports you thought of at launch. New analytics requirements? New search capabilities? New aggregate views? Just create a new projection and replay historical events. The past is never lost.
The combination of CQRS and Event Sourcing provides capabilities that are difficult or impossible to achieve with other architectures.
Complete Audit Trail:
Every state change is captured as an immutable event. This provides a complete, tamper-evident history of everything that happened in the system.
1234567891011121314151617181920212223242526272829303132333435363738394041
// AUDIT TRAIL: Complete history of any entity async function getOrderAuditTrail(orderId: string): Promise<AuditEntry[]> { const events = await eventStore.loadStream(orderId); return events.map(event => ({ timestamp: event.createdAt, action: event.eventType, actor: event.metadata.userId, details: event.eventData, correlationId: event.metadata.correlationId }));} // Result:// [// { timestamp: "2024-01-15T10:00:00Z", action: "OrderPlaced", actor: "customer-123", ... },// { timestamp: "2024-01-15T10:05:00Z", action: "ItemAdded", actor: "customer-123", ... },// { timestamp: "2024-01-15T10:30:00Z", action: "DiscountApplied", actor: "promo-system", ... },// { timestamp: "2024-01-15T11:00:00Z", action: "OrderConfirmed", actor: "customer-123", ... },// { timestamp: "2024-01-16T14:00:00Z", action: "OrderShipped", actor: "warehouse-user-5", ... }// ] // COMPLIANCE: Export full audit for regulatorsasync function exportAuditReport( fromDate: Date, toDate: Date): Promise<AuditReport> { const events = await eventStore.loadByDateRange(fromDate, toDate); return { generatedAt: new Date(), period: { from: fromDate, to: toDate }, totalEvents: events.length, events: events.map(e => ({ ...e, // Cryptographic proof of ordering hash: crypto.hash(e.eventData + e.globalPosition.toString()) })) };}Temporal Queries:
Since we have the complete history, we can reconstruct the state of any entity at any point in time.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
// TEMPORAL QUERY: What was the order state at a specific time? async function getOrderAtTime( orderId: string, asOfTime: Date): Promise<Order> { // Load only events up to the specified time const events = await eventStore.loadStream(orderId); const eventsUpToTime = events.filter(e => e.createdAt <= asOfTime); if (eventsUpToTime.length === 0) { throw new Error(`Order ${orderId} did not exist at ${asOfTime}`); } return Order.fromEvents(eventsUpToTime);} // Usage:const orderNow = await getOrderAtTime('order-123', new Date());// status: 'shipped', items: [A, B, C] const orderYesterday = await getOrderAtTime('order-123', yesterday);// status: 'confirmed', items: [A, B] const orderLastWeek = await getOrderAtTime('order-123', lastWeek);// status: 'pending', items: [A] // BI-TEMPORAL QUERIES: What did we KNOW at a specific time? // This is different from "what was true" - it's "what was recorded"// Useful for understanding delayed event processing, corrections, etc. async function getOrderAsKnownAt( orderId: string, asOfTime: Date, knownAtTime: Date): Promise<Order> { // Events that were both: // 1. About states before asOfTime (business time) // 2. Recorded before knownAtTime (system time) const events = await eventStore.loadStream(orderId); const relevantEvents = events.filter(e => e.timestamp <= asOfTime && // When it happened e.createdAt <= knownAtTime // When we recorded it ); return Order.fromEvents(relevantEvents);}What-If Analysis and Debugging:
With the full event stream, you can replay events to understand how bugs manifested or analyze what would happen if rules change.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
// DEBUGGING: Replay to understand how a bug affected state async function debugOrderState(orderId: string): Promise<void> { const events = await eventStore.loadStream(orderId); console.log(`Replaying ${events.length} events for order ${orderId}:\n`); let order: Order | null = null; for (const event of events) { console.log(`--- Event ${event.version}: ${event.eventType} ---`); console.log(`Timestamp: ${event.createdAt}`); console.log(`Data: ${event.eventData}\n`); if (order === null) { order = Order.fromEvents([event]); } else { order.apply(event); } console.log(`State after event:`); console.log(` Status: ${order.getStatus()}`); console.log(` Total: ${order.getTotal()}`); console.log(` Items: ${order.getItemCount()}\n`); }} // WHAT-IF: If we change a discount rule, what orders would be affected? async function analyzeDiscountRuleChange( newDiscountLogic: (event: DiscountAppliedEvent) => Money): Promise<ImpactAnalysis> { const discountEvents = await eventStore.loadByType('DiscountApplied'); const impacts: OrderImpact[] = []; for (const event of discountEvents) { const originalDiscount = event.amount; const newDiscount = newDiscountLogic(event); if (!originalDiscount.equals(newDiscount)) { impacts.push({ orderId: event.orderId, originalDiscount, newDiscount, difference: newDiscount.subtract(originalDiscount) }); } } return { affectedOrders: impacts.length, totalDifference: impacts.reduce((sum, i) => sum + i.difference, 0), details: impacts };}| Benefit | How It Works | Business Value |
|---|---|---|
| Complete Audit Trail | Every event is immutable, timestamped, and attributed | Compliance, fraud detection, dispute resolution |
| Temporal Queries | Reconstruct any entity at any point in time | Historical analysis, debugging, rollback understanding |
| Retroactive Projections | New read models built from historical events | New features without data migration, changing requirements |
| What-If Analysis | Replay events with modified logic | Impact analysis, testing rule changes safely |
| Event Replay for Debugging | Step through history to find bugs | Faster root cause analysis, reproducible issues |
| Natural Decoupling | Events as the integration contract | Independent team evolution, technology flexibility |
CQRS with Event Sourcing adds substantial complexity beyond CQRS alone. This combination is powerful but should be adopted judiciously.
Strong Indicators for CQRS + ES:
CQRS + Event Sourcing is one of the most complex architectural patterns. It requires understanding of event design, projection management, snapshot strategies, event schema evolution, and distributed systems concepts. Adopt it only when the benefits clearly outweigh the learning and maintenance costs.
Adoption Spectrum:
You don't have to adopt everything at once:
| Level | Description | Complexity |
|---|---|---|
| 1 | CQRS only, same database | Low |
| 2 | CQRS with event publishing (not stored) | Medium |
| 3 | CQRS + Event Log (events stored but not primary) | Medium-High |
| 4 | Full CQRS + Event Sourcing | High |
Consider starting at a lower level and evolving upward only when concrete needs emerge.
CQRS and Event Sourcing form a powerful combination that enables capabilities beyond what either pattern provides alone. Here are the key insights:
Module Complete:
You've now completed a comprehensive exploration of the CQRS pattern, from its foundational principles through implementation complexity to its powerful combination with event sourcing. You understand:
This knowledge enables you to make informed architectural decisions and implement CQRS successfully when the pattern fits your requirements.
Congratulations! You've mastered the CQRS pattern—one of the most powerful and nuanced patterns in distributed system design. You're now equipped to evaluate when CQRS provides value, implement it correctly, and combine it with event sourcing when the situation demands. This knowledge places you among engineers who truly understand how to build scalable, maintainable distributed systems.