Loading content...
Storing events is only half the story. To serve a user request, process a command, or render a UI, we need current state—not a history of what happened. The magic of event sourcing lies in the transformation: events go in, actionable state comes out.
This transformation isn't magic, though—it's engineering. We need clear patterns for:
This page explores these patterns in depth, showing how the immutable event log becomes the dynamic, queryable state your applications need.
By the end of this page, you will understand the left-fold pattern for deriving state from events, how aggregates and projections differ in purpose and implementation, projection update strategies (synchronous vs. asynchronous), handling projection rebuilds and schema evolution, and the performance characteristics of different reconstitution approaches.
At its core, deriving state from events is a left-fold (also called reduce) operation. We start with an initial state and apply each event in sequence, producing a new state at each step. The final state after all events have been applied is the current state.
Mathematically:
State₀ → Event₁ → State₁ → Event₂ → State₂ → ... → Eventₙ → Stateₙ
Or in functional terms:
currentState = events.reduce(applyEvent, initialState)
This pattern is simple but has profound implications:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
/** * The left-fold pattern for event application */ // Generic event application signaturetype EventApplier<TState, TEvent> = (state: TState, event: TEvent) => TState; // Generic fold implementationfunction foldEvents<TState, TEvent>( events: TEvent[], initialState: TState, apply: EventApplier<TState, TEvent>): TState { return events.reduce(apply, initialState);} // Concrete example: Shopping Cartinterface CartState { cartId: string; customerId: string | null; items: Map<string, CartItem>; status: 'active' | 'checked_out' | 'abandoned'; createdAt: Date | null; lastModified: Date | null;} interface CartItem { productId: string; productName: string; quantity: number; unitPrice: number;} // Initial state factoryconst createEmptyCartState = (cartId: string): CartState => ({ cartId, customerId: null, items: new Map(), status: 'active', createdAt: null, lastModified: null,}); // Event application - pure function, no side effectsfunction applyCartEvent(state: CartState, event: CartEvent): CartState { switch (event.type) { case 'CartCreated': return { ...state, customerId: event.data.customerId, createdAt: event.timestamp, lastModified: event.timestamp, }; case 'ItemAdded': { const newItems = new Map(state.items); const existing = newItems.get(event.data.productId); newItems.set(event.data.productId, { productId: event.data.productId, productName: event.data.productName, quantity: (existing?.quantity ?? 0) + event.data.quantity, unitPrice: event.data.unitPrice, }); return { ...state, items: newItems, lastModified: event.timestamp, }; } case 'ItemRemoved': { const newItems = new Map(state.items); const existing = newItems.get(event.data.productId); if (existing) { const newQuantity = existing.quantity - event.data.quantity; if (newQuantity <= 0) { newItems.delete(event.data.productId); } else { newItems.set(event.data.productId, { ...existing, quantity: newQuantity, }); } } return { ...state, items: newItems, lastModified: event.timestamp, }; } case 'ItemQuantityChanged': { const newItems = new Map(state.items); const existing = newItems.get(event.data.productId); if (existing) { newItems.set(event.data.productId, { ...existing, quantity: event.data.newQuantity, }); } return { ...state, items: newItems, lastModified: event.timestamp, }; } case 'CartCheckedOut': return { ...state, status: 'checked_out', lastModified: event.timestamp, }; case 'CartAbandoned': return { ...state, status: 'abandoned', lastModified: event.timestamp, }; default: // Unknown event type - log and continue console.warn(`Unknown cart event type: ${(event as any).type}`); return state; }} // Reconstituting a cart from eventsfunction reconstitute(cartId: string, events: CartEvent[]): CartState { const initialState = createEmptyCartState(cartId); return foldEvents(events, initialState, applyCartEvent);} // Usageconst events: CartEvent[] = [ { type: 'CartCreated', timestamp: new Date('2024-03-01'), data: { customerId: 'cust-1' } }, { type: 'ItemAdded', timestamp: new Date('2024-03-01'), data: { productId: 'prod-1', productName: 'Widget', quantity: 2, unitPrice: 29.99 } }, { type: 'ItemAdded', timestamp: new Date('2024-03-02'), data: { productId: 'prod-2', productName: 'Gadget', quantity: 1, unitPrice: 49.99 } }, { type: 'ItemRemoved', timestamp: new Date('2024-03-03'), data: { productId: 'prod-1', quantity: 1 } },]; const currentState = reconstitute('cart-123', events);// Result: Cart with 1 Widget, 1 GadgetThe event application function should be pure—no side effects, no mutations. Return a new state object rather than modifying the input. This enables easy testing, simplifies debugging (you can inspect any intermediate state), and is essential for functional programming patterns.
In Domain-Driven Design (DDD), an Aggregate is a cluster of domain objects treated as a single unit for data changes. In event sourcing, aggregates are reconstituted from their event streams to process commands.
The Aggregate Lifecycle:
This pattern differs from traditional ORM-based approaches where you load a record, modify fields, and save. In event sourcing, you never modify—you only append new facts.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
/** * Base class for event-sourced aggregates */abstract class Aggregate<TEvent extends DomainEvent> { private uncommittedEvents: TEvent[] = []; private _version = -1; // -1 = doesn't exist yet abstract readonly aggregateType: string; get id(): string { return this._id; } get version(): number { return this._version; } constructor(private readonly _id: string) {} /** * Apply an event to update internal state. * Called both during reconstitution and for new events. */ protected abstract apply(event: TEvent): void; /** * Record a new event (typically called by command handlers). * Applies the event immediately and queues for persistence. */ protected recordEvent(event: TEvent): void { // Apply to self immediately this.apply(event); // Queue for persistence this.uncommittedEvents.push(event); } /** * Get events that haven't been persisted yet. */ getUncommittedEvents(): TEvent[] { return [...this.uncommittedEvents]; } /** * Mark all uncommitted events as persisted. */ clearUncommittedEvents(): void { this.uncommittedEvents = []; } /** * Reconstitute aggregate from stored events. */ loadFromHistory(events: TEvent[]): void { for (const event of events) { this.apply(event); this._version++; } } /** * Set version explicitly (used by repository). */ setVersion(version: number): void { this._version = version; }} /** * Concrete aggregate example: Order */class OrderAggregate extends Aggregate<OrderEvent> { readonly aggregateType = 'Order'; private customerId: string | null = null; private items: OrderItem[] = []; private status: OrderStatus = 'draft'; private shippingAddress: Address | null = null; private total = 0; // Command: Place the order placeOrder(customerId: string, items: OrderItemData[]): void { // Invariant checks if (this.status !== 'draft') { throw new InvalidOperationError( `Cannot place order in ${this.status} status` ); } if (items.length === 0) { throw new InvalidOperationError('Order must have at least one item'); } // Calculate total const total = items.reduce( (sum, item) => sum + item.quantity * item.unitPrice, 0 ); // Record the event this.recordEvent({ eventType: 'OrderPlaced', aggregateId: this.id, timestamp: new Date(), payload: { customerId, items, total }, }); } // Command: Set shipping address setShippingAddress(address: Address): void { if (this.status === 'shipped' || this.status === 'delivered') { throw new InvalidOperationError( 'Cannot change address for shipped orders' ); } this.recordEvent({ eventType: 'ShippingAddressSet', aggregateId: this.id, timestamp: new Date(), payload: { address }, }); } // Command: Ship the order ship(trackingNumber: string, carrier: string): void { if (this.status !== 'confirmed') { throw new InvalidOperationError( `Cannot ship order in ${this.status} status` ); } if (!this.shippingAddress) { throw new InvalidOperationError('Shipping address not set'); } this.recordEvent({ eventType: 'OrderShipped', aggregateId: this.id, timestamp: new Date(), payload: { trackingNumber, carrier }, }); } // Event application - updates internal state protected apply(event: OrderEvent): void { switch (event.eventType) { case 'OrderPlaced': this.customerId = event.payload.customerId; this.items = event.payload.items.map(i => ({ productId: i.productId, quantity: i.quantity, unitPrice: i.unitPrice, })); this.total = event.payload.total; this.status = 'pending_payment'; break; case 'PaymentReceived': this.status = 'confirmed'; break; case 'ShippingAddressSet': this.shippingAddress = event.payload.address; break; case 'OrderShipped': this.status = 'shipped'; break; case 'OrderDelivered': this.status = 'delivered'; break; case 'OrderCancelled': this.status = 'cancelled'; break; } } // Getters for queries (if needed within aggregate) getStatus(): OrderStatus { return this.status; } getTotal(): number { return this.total; }} /** * Repository pattern for loading/saving aggregates */class OrderRepository { constructor(private eventStore: EventStore) {} async load(orderId: string): Promise<OrderAggregate | null> { const stream = `order-${orderId}`; const result = await this.eventStore.readStream(stream); if (result.events.length === 0) { return null; } const order = new OrderAggregate(orderId); order.loadFromHistory(result.events as OrderEvent[]); order.setVersion(result.version); return order; } async save(order: OrderAggregate): Promise<void> { const uncommitted = order.getUncommittedEvents(); if (uncommitted.length === 0) { return; } const stream = `order-${order.id}`; await this.eventStore.append(stream, order.version, uncommitted); order.clearUncommittedEvents(); }}Commands are imperative ('PlaceOrder')—they represent intentions that might fail. Events are past tense ('OrderPlaced')—they represent facts that have occurred. Aggregates validate commands and, if successful, emit events. This distinction is fundamental to event sourcing's clarity around what's uncertain (commands) versus what's settled (events).
While aggregates serve the write side, projections serve the read side. A projection consumes events and maintains a read-optimized view—denormalized, pre-aggregated, or structured specifically for a particular query pattern.
Why Projections?
The aggregate structure optimized for command validation is rarely optimal for queries. Consider an e-commerce system:
CustomerOrders table, updated by order events, enables instant lookups by customerProjections decouple query optimization from write model design—the core insight of CQRS.
| Aspect | Aggregate | Projection |
|---|---|---|
| Purpose | Command validation; state transitions | Query optimization; read models |
| Data Source | Single stream (one aggregate instance) | Multiple streams; cross-aggregate |
| Consistency | Strongly consistent (optimistic concurrency) | Eventually consistent (typical) |
| Structure | Domain model structure | Query-specific denormalized structure |
| Multiplicity | One per aggregate | Many per aggregate type (different views) |
| Rebuild | Fast (single stream) | Slow (all relevant streams) |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
/** * Projection framework for building read models */interface Projection<TReadModel> { readonly projectionName: string; // Handle a single event handle(event: StoredEvent): Promise<void>; // Build or rebuild from scratch rebuild(): Promise<void>; // Get current projection state getState(): Promise<TReadModel>;} /** * Example: Customer Orders Summary Projection * * Maintains a denormalized view of order counts and totals per customer */interface CustomerOrderSummary { customerId: string; totalOrders: number; totalSpent: number; lastOrderDate: Date | null; orderStatuses: Record<OrderStatus, number>;} class CustomerOrdersProjection implements Projection<Map<string, CustomerOrderSummary>> { readonly projectionName = 'customer-orders-summary'; private state = new Map<string, CustomerOrderSummary>(); constructor( private eventStore: EventStore, private checkpointStore: CheckpointStore, private readModelStore: ReadModelStore ) {} async handle(event: StoredEvent): Promise<void> { // Only care about order events if (!event.streamId.startsWith('order-')) { return; } switch (event.eventType) { case 'OrderPlaced': { const { customerId, total } = event.data as OrderPlacedData; const current = this.getCustomerSummary(customerId); await this.readModelStore.upsert('customer_orders', customerId, { ...current, totalOrders: current.totalOrders + 1, totalSpent: current.totalSpent + total, lastOrderDate: event.timestamp, orderStatuses: { ...current.orderStatuses, pending_payment: (current.orderStatuses.pending_payment ?? 0) + 1, }, }); break; } case 'PaymentReceived': { const { customerId } = await this.getOrderData(event.streamId); const current = this.getCustomerSummary(customerId); await this.readModelStore.upsert('customer_orders', customerId, { ...current, orderStatuses: { ...current.orderStatuses, pending_payment: Math.max(0, (current.orderStatuses.pending_payment ?? 0) - 1), confirmed: (current.orderStatuses.confirmed ?? 0) + 1, }, }); break; } case 'OrderCancelled': { // Similar status transition logic break; } } // Update checkpoint await this.checkpointStore.saveCheckpoint({ subscriptionId: this.projectionName, position: event.globalPosition + 1, updatedAt: new Date(), }); } async rebuild(): Promise<void> { console.log(`Rebuilding projection: ${this.projectionName}`); // Clear existing state await this.readModelStore.clear('customer_orders'); await this.checkpointStore.saveCheckpoint({ subscriptionId: this.projectionName, position: 0, updatedAt: new Date(), }); // Replay all events let position = 0; const batchSize = 1000; while (true) { const result = await this.eventStore.readAll({ fromPosition: position, count: batchSize, }); if (result.events.length === 0) { break; } for (const event of result.events) { await this.handle(event); } position = result.events[result.events.length - 1].globalPosition + 1; console.log(`Projection rebuild progress: position ${position}`); } console.log(`Projection rebuild complete: ${this.projectionName}`); } async getState(): Promise<Map<string, CustomerOrderSummary>> { const all = await this.readModelStore.getAll<CustomerOrderSummary>('customer_orders'); return new Map(all.map(item => [item.customerId, item])); } private getCustomerSummary(customerId: string): CustomerOrderSummary { return this.state.get(customerId) ?? { customerId, totalOrders: 0, totalSpent: 0, lastOrderDate: null, orderStatuses: {}, }; } private async getOrderData(streamId: string): Promise<{ customerId: string }> { // Read from stream or cached order data const result = await this.eventStore.readStream(streamId, { count: 1 }); const orderPlaced = result.events.find(e => e.eventType === 'OrderPlaced'); return { customerId: orderPlaced?.data.customerId }; }} /** * Query handler that uses the projection */class CustomerQueryHandler { constructor(private readModelStore: ReadModelStore) {} async getCustomerDashboard(customerId: string): Promise<CustomerDashboard> { const summary = await this.readModelStore.get<CustomerOrderSummary>( 'customer_orders', customerId ); if (!summary) { return { customerId, hasOrders: false, stats: null, }; } return { customerId, hasOrders: true, stats: { orderCount: summary.totalOrders, totalSpent: summary.totalSpent, lastOrderDate: summary.lastOrderDate, pendingOrders: summary.orderStatuses.pending_payment ?? 0, completedOrders: summary.orderStatuses.delivered ?? 0, }, }; }}How projections receive and process events significantly impacts system behavior. The two primary approaches are synchronous (inline with write) and asynchronous (via subscription after write).
Synchronous Projection Updates
The projection is updated in the same transaction as the event write. This guarantees immediate consistency but couples write and read paths.
Asynchronous Projection Updates
Events are written first; projections subscribe to the event stream and update independently. This decouples paths but introduces eventual consistency.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
/** * Synchronous projection update - coupled with write transaction */class SynchronousProjectionUpdater { constructor( private eventStore: TransactionalEventStore, private projections: Projection[] ) {} async appendWithProjections( streamId: string, expectedVersion: number, events: NewEvent[] ): Promise<void> { await this.eventStore.withTransaction(async (tx) => { // 1. Append events (will fail if version mismatch) const storedEvents = await tx.append(streamId, expectedVersion, events); // 2. Update all projections (in same transaction) for (const projection of this.projections) { for (const event of storedEvents) { await projection.handle(event); } } // Transaction commits only if all succeed }); }} /** * Asynchronous projection update - subscription-based */class AsyncProjectionManager { private subscriptions = new Map<string, Subscription>(); constructor( private eventStore: EventStore, private checkpointStore: CheckpointStore ) {} registerProjection(projection: Projection<unknown>): void { // Create catch-up subscription for this projection this.subscriptions.set( projection.projectionName, new ProjectionSubscription( this.eventStore, this.checkpointStore, projection ) ); } async startAll(): Promise<void> { for (const [name, subscription] of this.subscriptions) { console.log(`Starting projection: ${name}`); await subscription.start(); } } async stopAll(): Promise<void> { for (const [name, subscription] of this.subscriptions) { await subscription.stop(); } } async rebuildProjection(projectionName: string): Promise<void> { const subscription = this.subscriptions.get(projectionName); if (!subscription) { throw new Error(`Unknown projection: ${projectionName}`); } // Stop, rebuild, restart await subscription.stop(); await subscription.projection.rebuild(); await subscription.start(); }} /** * Handling eventual consistency in the application layer */class OrderCommandHandler { constructor( private orderRepo: OrderRepository, private readModelStore: ReadModelStore ) {} async placeOrder(command: PlaceOrderCommand): Promise<PlaceOrderResult> { // Execute command against aggregate const order = new OrderAggregate(command.orderId); order.placeOrder(command.customerId, command.items); await this.orderRepo.save(order); // Return command result - don't wait for projections return { orderId: command.orderId, status: 'placed', // Indicate to client that full details may take a moment message: 'Order placed successfully. Details will be available shortly.', }; } async getOrderDetails(orderId: string): Promise<OrderDetails | null> { // First, try the read model (fast, but maybe stale) const cached = await this.readModelStore.get<OrderDetails>('orders', orderId); if (cached) { return cached; } // If not in read model yet, fall back to aggregate reconstitution // This handles the case where projections haven't caught up const order = await this.orderRepo.load(orderId); if (!order) { return null; } // Build response from aggregate (slower but always current) return order.toOrderDetails(); }} /** * Consider-Then-React pattern for UI handling eventual consistency */interface OrderConfirmationPage { // UI shows: "Your order #123 is being processed" // Poll or websocket to update when projection catches up orderId: string; status: 'processing' | 'confirmed'; details?: OrderDetails; lastChecked: Date;}When projections update asynchronously, UIs must handle the lag gracefully. Common patterns include: (1) Optimistic UI updates that assume success, (2) Polling with exponential backoff, (3) WebSocket notifications when projections complete, (4) Clear messaging like 'Processing...' or 'Changes may take a moment to appear'. The key is setting proper user expectations.
One of event sourcing's superpowers is the ability to rebuild projections from scratch. Since all events are retained, you can delete a projection entirely and reconstruct it by replaying all relevant events. This enables:
Rebuilding Considerations
| Strategy | Description | When to Use |
|---|---|---|
| Full Rebuild | Delete and replay from event 0 | Bug fixes; schema changes; small event stores |
| Parallel Rebuild | Build new version alongside old; switch when complete | Zero-downtime migrations; large event stores |
| Incremental Rebuild | Update specific records based on criteria | Targeted fixes; performance optimization |
| Blue-Green Deploy | New projection version live; old available as fallback | Critical projections needing rollback capability |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
/** * Parallel rebuild strategy for zero-downtime projection migrations */class ParallelRebuildManager { constructor( private eventStore: EventStore, private readModelStore: ReadModelStore ) {} async parallelRebuild(projection: Projection<unknown>): Promise<void> { const currentTable = `${projection.projectionName}_current`; const newTable = `${projection.projectionName}_rebuild`; const tempTable = `${projection.projectionName}_old`; console.log(`Starting parallel rebuild of ${projection.projectionName}`); // 1. Create new table with updated schema await this.readModelStore.createTable(newTable, projection.schema); // 2. Replay all events into new table const rebuildProjection = projection.withTable(newTable); let position = 0; let eventCount = 0; const startTime = Date.now(); while (true) { const result = await this.eventStore.readAll({ fromPosition: position, count: 1000, }); if (result.events.length === 0) { break; } for (const event of result.events) { await rebuildProjection.handle(event); eventCount++; } position = result.events[result.events.length - 1].globalPosition + 1; // Progress logging if (eventCount % 10000 === 0) { const elapsed = (Date.now() - startTime) / 1000; console.log( `Rebuild progress: ${eventCount} events, ${Math.round(eventCount / elapsed)} events/sec` ); } } // 3. Catch up any events written during rebuild const catchUpStart = position; console.log(`Catching up from position ${catchUpStart}`); // Brief pause to let any in-flight writes complete await sleep(1000); const catchUpResult = await this.eventStore.readAll({ fromPosition: position, count: 10000, // Catch-up should be small }); for (const event of catchUpResult.events) { await rebuildProjection.handle(event); } // 4. Atomic table swap console.log('Performing atomic table swap'); await this.readModelStore.withTransaction(async (tx) => { await tx.renameTable(currentTable, tempTable); await tx.renameTable(newTable, currentTable); }); // 5. Cleanup old table (after verification period) console.log(`Rebuild complete. Old table ${tempTable} can be dropped after verification.`); // Optionally, schedule deferred cleanup // await this.scheduleCleanup(tempTable, { after: '1 hour' }); }} /** * Handling projection version mismatches during rebuild */interface ProjectionMetadata { version: number; // Schema/logic version lastPosition: number; rebuildInProgress: boolean; rebuildStartedAt?: Date;} class VersionedProjection { static readonly CURRENT_VERSION = 3; constructor( private eventStore: EventStore, private metadataStore: MetadataStore ) {} async initialize(): Promise<void> { const metadata = await this.metadataStore.get<ProjectionMetadata>( 'projection_meta', this.projectionName ); if (!metadata) { // First time - full rebuild console.log(`Initializing projection ${this.projectionName} v${VersionedProjection.CURRENT_VERSION}`); await this.rebuild(); return; } if (metadata.version < VersionedProjection.CURRENT_VERSION) { // Version mismatch - need rebuild console.log( `Projection version changed (${metadata.version} -> ${VersionedProjection.CURRENT_VERSION}). Rebuilding...` ); await this.rebuild(); return; } if (metadata.rebuildInProgress) { // Previous rebuild was interrupted console.log('Resuming interrupted rebuild'); await this.rebuild(); // Or resume from metadata.lastPosition return; } // Normal startup - continue from checkpoint console.log(`Resuming from position ${metadata.lastPosition}`); } async rebuild(): Promise<void> { // Mark rebuild in progress await this.metadataStore.upsert('projection_meta', this.projectionName, { version: VersionedProjection.CURRENT_VERSION, lastPosition: 0, rebuildInProgress: true, rebuildStartedAt: new Date(), }); try { // ... full rebuild logic ... // Mark rebuild complete await this.metadataStore.upsert('projection_meta', this.projectionName, { version: VersionedProjection.CURRENT_VERSION, lastPosition: finalPosition, rebuildInProgress: false, }); } catch (error) { console.error('Rebuild failed:', error); throw error; } }}For large event stores (millions of events), rebuilds can take hours or days. Plan accordingly: (1) Benchmark rebuild time before you need it, (2) Consider snapshot-based acceleration, (3) Implement progress tracking and resumability, (4) Use parallel rebuilds for zero-downtime. The best time to discover your rebuild takes 12 hours is not during an incident.
Understanding where consistency is guaranteed—and where it's not—is crucial for building correct event-sourced applications. The key insight is that aggregates define consistency boundaries.
Within an Aggregate: Strong Consistency
Across Aggregates: Eventual Consistency
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
/** * Patterns for handling consistency in event-sourced systems */ // PATTERN 1: Read-Your-Writes with fallback// When you need immediate consistency after a write class ReadYourWritesHandler { constructor( private aggregateRepo: AggregateRepository, private readModelStore: ReadModelStore ) {} async getOrderAfterUpdate(orderId: string): Promise<OrderView> { // Try read model first (fast path) const readModel = await this.readModelStore.get<OrderView>('orders', orderId); // Check if read model is current enough // (e.g., compare lastUpdateCommand with what we just wrote) if (readModel && this.isFreshEnough(readModel)) { return readModel; } // Fallback to aggregate reconstitution (slow but consistent) const aggregate = await this.aggregateRepo.load(orderId); return aggregate.toView(); } private isFreshEnough(readModel: OrderView): boolean { // Could check timestamp, version, or command correlation const maxLag = 5000; // 5 seconds return Date.now() - readModel.lastUpdated.getTime() < maxLag; }} // PATTERN 2: Saga for cross-aggregate operations// When a business process spans multiple aggregates interface OrderSagaState { orderId: string; customerId: string; total: number; step: 'started' | 'payment_reserved' | 'inventory_reserved' | 'completed' | 'compensating' | 'failed'; failures: string[];} class OrderPlacementSaga { constructor( private sagaStore: SagaStore, private commandBus: CommandBus ) {} // Saga reacts to events and issues commands async handle(event: DomainEvent): Promise<void> { switch (event.eventType) { case 'OrderPlaced': { // Start the saga await this.sagaStore.create<OrderSagaState>({ sagaId: `order-saga-${event.payload.orderId}`, state: { orderId: event.payload.orderId, customerId: event.payload.customerId, total: event.payload.total, step: 'started', failures: [], }, }); // Issue next command await this.commandBus.send({ type: 'ReservePayment', customerId: event.payload.customerId, amount: event.payload.total, orderId: event.payload.orderId, }); break; } case 'PaymentReserved': { const saga = await this.sagaStore.get<OrderSagaState>( `order-saga-${event.payload.orderId}` ); await this.sagaStore.update(saga.sagaId, { step: 'payment_reserved', }); // Next step: reserve inventory await this.commandBus.send({ type: 'ReserveInventory', orderId: event.payload.orderId, items: saga.state.items, }); break; } case 'InventoryReserved': { const saga = await this.sagaStore.get<OrderSagaState>( `order-saga-${event.payload.orderId}` ); await this.sagaStore.update(saga.sagaId, { step: 'completed', }); // Saga complete - trigger notification await this.commandBus.send({ type: 'NotifyOrderConfirmed', orderId: event.payload.orderId, customerId: saga.state.customerId, }); break; } case 'InventoryReservationFailed': { // Compensating action - release payment await this.compensate(event.payload.orderId, 'Inventory unavailable'); break; } } } private async compensate(orderId: string, reason: string): Promise<void> { const saga = await this.sagaStore.get<OrderSagaState>(`order-saga-${orderId}`); await this.sagaStore.update(saga.sagaId, { step: 'compensating', failures: [...saga.state.failures, reason], }); // Issue compensating commands based on completed steps if (saga.state.step === 'payment_reserved' || saga.state.step === 'inventory_reserved') { await this.commandBus.send({ type: 'ReleasePaymentReservation', orderId, customerId: saga.state.customerId, }); } // Mark order as failed await this.commandBus.send({ type: 'FailOrder', orderId, reason, }); }}Event-sourced systems have different performance profiles than traditional CRUD applications. Understanding these characteristics helps you design for your workload.
Write Performance
Writes in event sourcing are typically fast—they're append-only operations with no need to read before writing (except for version checks). The absence of complex JOIN updates or cascading triggers simplifies the write path.
Read Performance
Read performance depends heavily on whether you're reading from projections (fast, pre-computed) or reconstituting aggregates (slower, requires event replay). The key optimization is ensuring that common read patterns are served by projections, while aggregate reconstitution is reserved for command processing.
| Operation | CRUD | Event Sourcing | Optimization Strategy |
|---|---|---|---|
| Single Record Write | UPDATE (read-modify-write) | APPEND (no read) | ES faster due to append-only |
| Aggregate Load | JOIN or multiple SELECTs | Replay N events | Snapshots reduce N |
| Simple Query | SELECT on denormalized table | Query projection | Projections match CRUD |
| Historical Query | Complex if supported | Replay to timestamp | ES intrinsically supports |
| Bulk Analytics | Query data warehouse | Query projections or replay | Purpose-built projections |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
/** * Performance optimization patterns */ // 1. Snapshot-based aggregate loadinginterface SnapshotConfig { // Create snapshot every N events snapshotInterval: number; // Keep last N snapshots for fallback retainedSnapshots: number;} const DEFAULT_SNAPSHOT_CONFIG: SnapshotConfig = { snapshotInterval: 100, retainedSnapshots: 3,}; class OptimizedAggregateRepository { constructor( private eventStore: EventStore, private snapshotStore: SnapshotStore, private config: SnapshotConfig = DEFAULT_SNAPSHOT_CONFIG ) {} async load<T extends Aggregate>( aggregateId: string, factory: AggregateFactory<T> ): Promise<T | null> { // Load snapshot if available const snapshot = await this.snapshotStore.getLatest(aggregateId); // Calculate events to replay const fromPosition = snapshot ? snapshot.position + 1 : 0; const events = await this.eventStore.readStream(aggregateId, { fromPosition }); if (!snapshot && events.length === 0) { return null; } // Reconstitute from snapshot + events const aggregate = snapshot ? factory.fromSnapshot(aggregateId, snapshot.state) : factory.create(aggregateId); aggregate.loadFromHistory(events); // Log warning if too many events since snapshot if (events.length > this.config.snapshotInterval * 2) { console.warn( `Aggregate ${aggregateId} has ${events.length} events since last snapshot. Consider snapshotting.` ); } return aggregate; } async save<T extends Aggregate>(aggregate: T): Promise<void> { const events = aggregate.getUncommittedEvents(); if (events.length === 0) return; await this.eventStore.append( aggregate.id, aggregate.version, events ); // Check if we should create a snapshot const newVersion = aggregate.version + events.length; const eventsSinceSnapshot = await this.countEventsSinceSnapshot(aggregate.id); if (eventsSinceSnapshot >= this.config.snapshotInterval) { await this.createSnapshot(aggregate, newVersion); } aggregate.clearUncommittedEvents(); } private async createSnapshot<T extends Aggregate>( aggregate: T, position: number ): Promise<void> { await this.snapshotStore.save({ aggregateId: aggregate.id, aggregateType: aggregate.aggregateType, position, state: aggregate.getSnapshotState(), createdAt: new Date(), }); // Cleanup old snapshots await this.snapshotStore.pruneOldSnapshots( aggregate.id, this.config.retainedSnapshots ); }} // 2. Batch projection updatesclass BatchingProjection { private pendingEvents: StoredEvent[] = []; private flushTimer: NodeJS.Timeout | null = null; constructor( private baseProjection: Projection<unknown>, private batchSize: number = 100, private flushIntervalMs: number = 1000 ) {} async handle(event: StoredEvent): Promise<void> { this.pendingEvents.push(event); if (this.pendingEvents.length >= this.batchSize) { await this.flush(); } else if (!this.flushTimer) { this.flushTimer = setTimeout(() => this.flush(), this.flushIntervalMs); } } private async flush(): Promise<void> { if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } if (this.pendingEvents.length === 0) return; const batch = this.pendingEvents; this.pendingEvents = []; // Process batch with single transaction await this.baseProjection.handleBatch(batch); }} // 3. Materialized view with TTL-based refreshclass TTLProjection { private cache = new Map<string, { data: unknown; expiresAt: number }>(); constructor( private ttlMs: number, private computeFn: (id: string) => Promise<unknown> ) {} async get(id: string): Promise<unknown> { const cached = this.cache.get(id); if (cached && cached.expiresAt > Date.now()) { return cached.data; } const data = await this.computeFn(id); this.cache.set(id, { data, expiresAt: Date.now() + this.ttlMs, }); return data; } invalidate(id: string): void { this.cache.delete(id); }}Performance characteristics vary significantly based on event store technology, projection complexity, and workload patterns. Benchmark your specific use case. Common metrics to track: aggregate load time (p50, p95, p99), events processed per second, projection lag time, and storage growth rate.
We've covered how events become working state—the mechanics that transform an append-only log into a responsive application. Let's consolidate the key patterns:
What's Next:
With the mechanics of state derivation understood, the next page explores Benefits and Challenges—a honest assessment of where event sourcing excels and where it introduces complexity. We'll examine audit trails, debugging capabilities, integration patterns, and the operational challenges of managing event stores and projections at scale.
You now understand how events become actionable state—through aggregates for command processing and projections for queries. You've seen patterns for consistency boundaries, projection updates, rebuilding, and performance optimization. Next, we'll honestly assess the benefits and challenges of event sourcing to help you make informed architectural decisions.