Loading learning content...
Event sourcing stores events as the source of truth—but your application needs working state to make decisions. When a user tries to withdraw money, you need to know their current balance. When an order is placed, you need to verify inventory levels. The magic happens in state reconstruction: the process of transforming an immutable event stream into usable, current state.
This seemingly simple process—"just replay events"—contains deep practical challenges. What happens when an aggregate has 100,000 events? How do you handle events that were stored years ago with a different schema? How do you ensure consistency when multiple processes might be reconstructing the same aggregate?
This page explores the mechanics of state reconstruction in production systems, covering efficient replay strategies, snapshot optimization, event versioning, and the architectural patterns that make event sourcing practical at scale.
By the end of this page, you will understand: (1) the mechanics of event replay and state application, (2) how snapshots optimize reconstruction performance, (3) strategies for handling event schema evolution, and (4) concurrency considerations during reconstruction.
At its core, state reconstruction follows a simple algorithm:
This is a classic fold or reduce operation: events.reduce(applyEvent, initialState).
The entire process must be deterministic: given the same events in the same order, you must always produce the same final state. Any randomness, time-sensitivity, or external dependencies in your apply function will break this guarantee.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
// ============================================================// THE AGGREGATE: Encapsulating state and behavior// ============================================================ interface AggregateRoot<TState, TEvent extends DomainEvent> { // The current state derived from events readonly state: TState; // The version (number of events applied) readonly version: number; // Apply an event to produce new state apply(event: TEvent): void; // Rehydrate from a stream of events loadFromHistory(events: TEvent[]): void;} // ============================================================// CONCRETE EXAMPLE: Shopping Cart Aggregate// ============================================================ interface CartState { cartId: string; customerId: string; items: Map<string, { productId: string; quantity: number; price: number }>; status: 'active' | 'checked_out' | 'abandoned'; totalValue: number;} interface CartEvent { eventId: string; aggregateId: string; version: number; timestamp: Date; type: string; payload: unknown;} class ShoppingCart implements AggregateRoot<CartState, CartEvent> { private _state: CartState; private _version: number = 0; constructor() { // Initial state: an empty cart this._state = { cartId: '', customerId: '', items: new Map(), status: 'active', totalValue: 0, }; } get state(): CartState { return this._state; } get version(): number { return this._version; } // ============================================================ // THE APPLY FUNCTION: Pure state transformation // ============================================================ apply(event: CartEvent): void { // CRITICAL: This function must be DETERMINISTIC // - No side effects (no API calls, no logging to external systems) // - No randomness (no UUID generation, no Date.now()) // - No external dependencies (no reading from other aggregates) switch (event.type) { case 'CartCreated': { const payload = event.payload as { customerId: string }; this._state = { ...this._state, cartId: event.aggregateId, customerId: payload.customerId, status: 'active', }; break; } case 'ItemAdded': { const payload = event.payload as { productId: string; quantity: number; price: number; }; const newItems = new Map(this._state.items); const existing = newItems.get(payload.productId); if (existing) { newItems.set(payload.productId, { ...existing, quantity: existing.quantity + payload.quantity, }); } else { newItems.set(payload.productId, { productId: payload.productId, quantity: payload.quantity, price: payload.price, }); } this._state = { ...this._state, items: newItems, totalValue: this.calculateTotal(newItems), }; break; } case 'ItemRemoved': { const payload = event.payload as { productId: string; quantity: number }; const newItems = new Map(this._state.items); const existing = newItems.get(payload.productId); if (existing) { if (existing.quantity <= payload.quantity) { newItems.delete(payload.productId); } else { newItems.set(payload.productId, { ...existing, quantity: existing.quantity - payload.quantity, }); } } this._state = { ...this._state, items: newItems, totalValue: this.calculateTotal(newItems), }; break; } case 'CartCheckedOut': { this._state = { ...this._state, status: 'checked_out', }; break; } default: // Unknown event types are ignored (forward compatibility) break; } this._version = event.version; } // ============================================================ // REHYDRATION: Loading state from event history // ============================================================ loadFromHistory(events: CartEvent[]): void { // Events MUST be in order by version for (const event of events) { // Validate event ordering (defensive programming) if (event.version !== this._version + 1) { throw new Error( `Event version mismatch: expected ${this._version + 1}, got ${event.version}` ); } this.apply(event); } } private calculateTotal(items: Map<string, { quantity: number; price: number }>): number { let total = 0; for (const item of items.values()) { total += item.quantity * item.price; } return total; }} // ============================================================// REPOSITORY: Loading aggregates from the event store// ============================================================ class ShoppingCartRepository { constructor(private eventStore: EventStore) {} async getById(cartId: string): Promise<ShoppingCart> { // Load all events for this aggregate const events = await this.eventStore.loadStream(`cart-${cartId}`); // Create a new aggregate and rehydrate from events const cart = new ShoppingCart(); cart.loadFromHistory(events); return cart; } async save(cart: ShoppingCart, newEvents: CartEvent[]): Promise<void> { // Append new events to the stream await this.eventStore.append( `cart-${cart.state.cartId}`, newEvents, cart.version // Expected version for optimistic concurrency ); }} // ============================================================// EXAMPLE USAGE// ============================================================ async function processCheckout(cartId: string) { const repository = new ShoppingCartRepository(eventStore); // Load cart by replaying all its events const cart = await repository.getById(cartId); // Business logic validation using current state if (cart.state.status !== 'active') { throw new Error('Cannot checkout: cart is not active'); } if (cart.state.items.size === 0) { throw new Error('Cannot checkout: cart is empty'); } // Generate new event based on validated state const checkoutEvent: CartEvent = { eventId: generateEventId(), aggregateId: cartId, version: cart.version + 1, timestamp: new Date(), type: 'CartCheckedOut', payload: { totalAmount: cart.state.totalValue, itemCount: cart.state.items.size, }, }; // Persist the new event await repository.save(cart, [checkoutEvent]);}The apply function must be absolutely deterministic. If replaying the same events produces different states on different machines or at different times, your system's integrity is compromised. Never use Date.now(), Math.random(), or any external calls in your apply function. All necessary information must come from the event itself.
The naive replay approach has an obvious problem: as aggregates accumulate events, reconstruction becomes slower. A bank account with 10 years of transactions might have 50,000 events. Replaying all of them for every read would be prohibitively expensive.
Snapshots solve this problem. A snapshot is a captured state at a specific version, stored alongside the event stream. To reconstruct current state, you:
Instead of replaying 50,000 events, you might replay only 100 (events since the last snapshot).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
// ============================================================// SNAPSHOT STORAGE// ============================================================ interface Snapshot<TState> { aggregateId: string; version: number; // The event version this snapshot represents timestamp: Date; // When the snapshot was taken state: TState; // The serialized aggregate state} interface SnapshotStore<TState> { // Get the most recent snapshot for an aggregate getLatest(aggregateId: string): Promise<Snapshot<TState> | null>; // Save a new snapshot save(snapshot: Snapshot<TState>): Promise<void>;} // ============================================================// REPOSITORY WITH SNAPSHOT OPTIMIZATION// ============================================================ class SnapshottedCartRepository { constructor( private eventStore: EventStore, private snapshotStore: SnapshotStore<CartState>, private snapshotThreshold: number = 100 // Snapshot every 100 events ) {} async getById(cartId: string): Promise<ShoppingCart> { const streamId = `cart-${cartId}`; // Step 1: Try to load a snapshot const snapshot = await this.snapshotStore.getLatest(streamId); let cart: ShoppingCart; let fromVersion: number; if (snapshot) { // Step 2a: Initialize from snapshot cart = new ShoppingCart(); cart.initializeFromSnapshot(snapshot.state, snapshot.version); fromVersion = snapshot.version + 1; console.log(`Loaded snapshot at version ${snapshot.version}`); } else { // Step 2b: No snapshot, start from scratch cart = new ShoppingCart(); fromVersion = 1; console.log('No snapshot found, replaying from beginning'); } // Step 3: Load events after the snapshot (or from beginning) const events = await this.eventStore.loadStream(streamId, fromVersion); console.log(`Replaying ${events.length} events from version ${fromVersion}`); // Step 4: Apply remaining events cart.loadFromHistory(events); return cart; } async save(cart: ShoppingCart, newEvents: CartEvent[]): Promise<void> { const streamId = `cart-${cart.state.cartId}`; // Append new events await this.eventStore.append(streamId, newEvents, cart.version); // Apply events to get new state for (const event of newEvents) { cart.apply(event); } // Check if we should create a new snapshot await this.maybeCreateSnapshot(cart); } private async maybeCreateSnapshot(cart: ShoppingCart): Promise<void> { const currentSnapshot = await this.snapshotStore.getLatest( `cart-${cart.state.cartId}` ); const eventsSinceSnapshot = currentSnapshot ? cart.version - currentSnapshot.version : cart.version; if (eventsSinceSnapshot >= this.snapshotThreshold) { const snapshot: Snapshot<CartState> = { aggregateId: `cart-${cart.state.cartId}`, version: cart.version, timestamp: new Date(), state: cart.state, }; await this.snapshotStore.save(snapshot); console.log(`Created snapshot at version ${cart.version}`); } }} // ============================================================// AGGREGATE WITH SNAPSHOT SUPPORT// ============================================================ class ShoppingCart { private _state: CartState; private _version: number = 0; // ... (other methods from before) // Initialize from a snapshot instead of empty state initializeFromSnapshot(state: CartState, version: number): void { // Deep clone the state to avoid mutation issues this._state = { ...state, items: new Map(state.items), // Clone the Map }; this._version = version; } // Serialize state for snapshot storage toSnapshot(): CartState { return { ...this._state, items: new Map(this._state.items), }; }} // ============================================================// SNAPSHOT STRATEGIES// ============================================================ interface SnapshotStrategy { shouldSnapshot(aggregate: AggregateRoot<unknown, unknown>, eventsSinceLastSnapshot: number): boolean;} // Strategy 1: Fixed interval (every N events)class FixedIntervalStrategy implements SnapshotStrategy { constructor(private interval: number) {} shouldSnapshot(aggregate: AggregateRoot<unknown, unknown>, eventsSinceLastSnapshot: number): boolean { return eventsSinceLastSnapshot >= this.interval; }} // Strategy 2: Time-based (snapshot if older than N hours)class TimeBasedStrategy implements SnapshotStrategy { constructor(private maxAgeHours: number, private snapshotStore: SnapshotStore<unknown>) {} async shouldSnapshot(aggregate: AggregateRoot<unknown, unknown>): Promise<boolean> { const snapshot = await this.snapshotStore.getLatest(aggregate.id); if (!snapshot) return true; const ageHours = (Date.now() - snapshot.timestamp.getTime()) / (1000 * 60 * 60); return ageHours > this.maxAgeHours; }} // Strategy 3: Adaptive (based on load time)class AdaptiveStrategy implements SnapshotStrategy { constructor(private maxLoadTimeMs: number) {} shouldSnapshot(aggregate: AggregateRoot<unknown, unknown>, eventsSinceLastSnapshot: number): boolean { // Estimate load time based on events (assume 0.1ms per event) const estimatedLoadTime = eventsSinceLastSnapshot * 0.1; return estimatedLoadTime > this.maxLoadTimeMs; }}Think of snapshots as caches. Like all caches, they can become stale or corrupted. The powerful guarantee of event sourcing—that you can always reconstruct state from events—means you can always regenerate snapshots if needed. This is your safety net.
Events are immutable—but your understanding of the domain evolves. Over years of operation, you might need to:
Since you can't modify historical events, you need strategies to handle schema evolution—reading old events with new code, and vice versa.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
// ============================================================// STRATEGY 1: Event Upcasting// ============================================================ // Transform old event versions to the current version during load// The event store returns raw events; an upcaster transforms them interface EventUpcaster { canUpcast(eventType: string, version: number): boolean; upcast(event: RawEvent): RawEvent;} // Example: ItemAdded v1 → v2 (added 'currency' field)class ItemAddedV1ToV2Upcaster implements EventUpcaster { canUpcast(eventType: string, version: number): boolean { return eventType === 'ItemAdded' && version === 1; } upcast(event: RawEvent): RawEvent { const payload = event.payload as { productId: string; quantity: number; price: number }; return { ...event, schemaVersion: 2, payload: { ...payload, currency: 'USD', // Default for old events without currency }, }; }} // Example: Renamed event typeclass ProductAddedToItemAddedUpcaster implements EventUpcaster { canUpcast(eventType: string, version: number): boolean { return eventType === 'ProductAdded'; // Old name } upcast(event: RawEvent): RawEvent { return { ...event, type: 'ItemAdded', // New name }; }} // Upcaster chain applies all necessary transformationsclass UpcasterChain { private upcasters: EventUpcaster[] = []; register(upcaster: EventUpcaster): void { this.upcasters.push(upcaster); } upcast(event: RawEvent): RawEvent { let current = event; // Apply upcasters until no more match let changed = true; while (changed) { changed = false; for (const upcaster of this.upcasters) { if (upcaster.canUpcast(current.type, current.schemaVersion || 1)) { current = upcaster.upcast(current); changed = true; break; // Restart loop with updated event } } } return current; }} // ============================================================// STRATEGY 2: Versioned Event Types// ============================================================ // Store version explicitly in event typetype ItemAddedV1 = { type: 'ItemAdded_v1'; payload: { productId: string; quantity: number; price: number; };}; type ItemAddedV2 = { type: 'ItemAdded_v2'; payload: { productId: string; quantity: number; priceAmount: number; priceCurrency: string; };}; // Apply function handles all versionsfunction applyItemAdded(state: CartState, event: ItemAddedV1 | ItemAddedV2): CartState { let productId: string; let quantity: number; let price: number; if (event.type === 'ItemAdded_v1') { productId = event.payload.productId; quantity = event.payload.quantity; price = event.payload.price; // Assume USD } else { productId = event.payload.productId; quantity = event.payload.quantity; price = convertToUSD(event.payload.priceAmount, event.payload.priceCurrency); } // Common logic for all versions const newItems = new Map(state.items); // ... apply the item return { ...state, items: newItems };} // ============================================================// STRATEGY 3: Weak Schema (JSON with optional fields)// ============================================================ interface FlexibleEvent { type: string; aggregateId: string; version: number; timestamp: Date; payload: Record<string, unknown>; // Flexible schema} // Apply function uses optional chaining and defaultsfunction applyFlexibleItemAdded(state: CartState, event: FlexibleEvent): CartState { const payload = event.payload; // Use optional chaining and defaults for backward/forward compatibility const productId = payload.productId as string | undefined ?? payload.product_id as string; const quantity = payload.quantity as number ?? 1; const price = payload.price as number ?? payload.priceAmount as number ?? 0; const currency = payload.currency as string ?? payload.priceCurrency as string ?? 'USD'; // Proceed with normalized values // ... return state;} // ============================================================// STRATEGY 4: Event Store-Level Transformation// ============================================================ // Some event stores support server-side transformationsclass TransformingEventStore implements EventStore { constructor( private underlying: EventStore, private upcasterChain: UpcasterChain ) {} async loadStream(streamId: string, fromVersion?: number): Promise<DomainEvent[]> { const rawEvents = await this.underlying.loadStream(streamId, fromVersion); // Transform all events to current schema before returning return rawEvents.map(event => this.upcasterChain.upcast(event)); } async append(streamId: string, events: DomainEvent[]): Promise<void> { // Store events with current schema version const versionedEvents = events.map(e => ({ ...e, schemaVersion: CURRENT_SCHEMA_VERSION, })); await this.underlying.append(streamId, versionedEvents); }} // ============================================================// MIGRATION: Rewriting historical events (use sparingly!)// ============================================================ // In extreme cases, you may need to migrate the event store itself// This is a significant operation and should be rare async function migrateEventStore( sourceStore: EventStore, targetStore: EventStore, upcasterChain: UpcasterChain): Promise<void> { const allStreamIds = await sourceStore.getAllStreamIds(); for (const streamId of allStreamIds) { const oldEvents = await sourceStore.loadStream(streamId); // Transform all events to new schema const newEvents = oldEvents.map(e => upcasterChain.upcast(e)); // Write to new store await targetStore.append(streamId, newEvents); console.log(`Migrated stream ${streamId}: ${oldEvents.length} events`); }} // WARNING: Event migration changes the source of truth// - Requires downtime or careful dual-write/dual-read// - Original events are no longer available// - Snapshots may be invalidated// Only do this when upcasting becomes unsustainable| Strategy | Complexity | Runtime Cost | Best For |
|---|---|---|---|
| Upcasting | Medium | Per-event transformation on read | Additive changes, field defaults |
| Versioned Types | Low | Switch statement in apply function | Clear breaking changes, few versions |
| Weak Schema | Low | Optional chaining at read time | Gradual evolution, tolerant readers |
| Store Migration | High | One-time migration cost | Major restructuring, accumulated debt |
Make events additive by default: add new fields rather than changing existing ones. Use optional fields liberally. Include a schema version from day one. These practices make upcasting trivial and rare.
State reconstruction isn't just a single-threaded read operation. In production systems, you must handle:
Event sourcing has natural properties that help, but careful design is still required.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
// ============================================================// OPTIMISTIC CONCURRENCY CONTROL// ============================================================ // The key: include expected version when appending eventsinterface EventStore { // Load events from a stream loadStream(streamId: string, fromVersion?: number): Promise<DomainEvent[]>; // Append events, but only if current version matches expected // Throws ConcurrencyError if versions don't match append( streamId: string, events: DomainEvent[], expectedVersion: number // <-- KEY PARAMETER ): Promise<void>;} class ConcurrencyError extends Error { constructor( public streamId: string, public expectedVersion: number, public actualVersion: number ) { super( `Concurrency conflict on ${streamId}: ` + `expected version ${expectedVersion}, actual ${actualVersion}` ); }} // ============================================================// THE OPTIMISTIC FLOW// ============================================================ async function transferMoney( fromAccountId: string, toAccountId: string, amount: number): Promise<void> { const repository = new AccountRepository(eventStore); // Step 1: Load both accounts (this records their current versions) const fromAccount = await repository.getById(fromAccountId); const toAccount = await repository.getById(toAccountId); // Step 2: Validate business rules against current state if (fromAccount.state.balance < amount) { throw new InsufficientFundsError(); } // Step 3: Generate events const withdrawEvent = createWithdrawEvent(fromAccountId, amount, fromAccount.version + 1); const depositEvent = createDepositEvent(toAccountId, amount, toAccount.version + 1); // Step 4: Try to append events with expected versions try { // In a real system, these should be in a transaction or saga await repository.save(fromAccount, [withdrawEvent]); await repository.save(toAccount, [depositEvent]); } catch (error) { if (error instanceof ConcurrencyError) { // Someone else modified the account while we were processing // Options: // 1. Retry the entire operation // 2. Return an error to the user // 3. Implement automatic conflict resolution console.log(`Concurrency conflict detected, retrying...`); return transferMoney(fromAccountId, toAccountId, amount); // Retry } throw error; }} // ============================================================// IMPLEMENTATION: Version checking in the event store// ============================================================ class PostgresEventStore implements EventStore { async append( streamId: string, events: DomainEvent[], expectedVersion: number ): Promise<void> { const client = await this.pool.connect(); try { await client.query('BEGIN'); // Lock the stream to prevent concurrent appends // (or use a unique constraint on stream_id + version) const result = await client.query( `SELECT MAX(version) as current_version FROM events WHERE stream_id = $1 FOR UPDATE`, [streamId] ); const currentVersion = result.rows[0]?.current_version ?? 0; // Check optimistic concurrency if (currentVersion !== expectedVersion) { await client.query('ROLLBACK'); throw new ConcurrencyError(streamId, expectedVersion, currentVersion); } // Append the new events for (let i = 0; i < events.length; i++) { const event = events[i]; await client.query( `INSERT INTO events (stream_id, version, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5)`, [streamId, expectedVersion + i + 1, event.type, event.payload, event.timestamp] ); } await client.query('COMMIT'); } catch (error) { await client.query('ROLLBACK'); throw error; } finally { client.release(); } }} // ============================================================// HANDLING CONCURRENT READS: Snapshots and Caching// ============================================================ class CachedAggregateRepository<T extends Aggregate> { private cache: Map<string, { aggregate: T; loadedAt: Date }> = new Map(); private maxCacheAge = 5000; // 5 seconds async getById(id: string): Promise<T> { const cached = this.cache.get(id); // Return from cache if fresh if (cached && Date.now() - cached.loadedAt.getTime() < this.maxCacheAge) { return cached.aggregate; } // Load from event store const aggregate = await this.loadFromEventStore(id); // Cache for subsequent reads this.cache.set(id, { aggregate, loadedAt: new Date() }); return aggregate; } // Invalidate cache on write async save(aggregate: T, events: DomainEvent[]): Promise<void> { await this.eventStore.append(aggregate.id, events, aggregate.version); this.cache.delete(aggregate.id); }} // ============================================================// KEY INSIGHT: Immutability simplifies concurrency// ============================================================ // Because events are immutable, concurrent READS are always safe// - Event #47 will always be event #47// - No locks needed for reading// - Snapshots can be cached indefinitely (they're immutable too) // The only contention is on WRITES:// - Multiple writers to the same aggregate must be serialized// - Optimistic concurrency detects conflicts on append// - Retry or error handling decides what to doEvent sourcing provides a significant concurrency advantage: immutable events can be read without locks. Multiple processes can read the same events simultaneously with no risk of corruption. Contention only occurs on the append operation—and optimistic concurrency handles this elegantly.
While aggregates reconstruct state for command processing, projections provide optimized read models for queries. A projection subscribes to events and maintains a denormalized view—often in a different database technology optimized for the query pattern.
Projections embrace eventual consistency: they may lag slightly behind the event stream, but they provide fast, scalable reads that don't require replaying events.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
// ============================================================// PROJECTION: Subscription-based read model// ============================================================ interface Projection { // Process a single event handle(event: DomainEvent): Promise<void>; // Get the last processed position (for resumption) getCheckpoint(): Promise<number>; // Update the checkpoint after successful processing setCheckpoint(position: number): Promise<void>;} // ============================================================// EXAMPLE: Order Summary Projection// ============================================================ // Read model stored in PostgreSQL (or Redis, Elasticsearch, etc.)interface OrderSummaryRow { orderId: string; customerId: string; customerName: string; status: string; totalAmount: number; itemCount: number; createdAt: Date; lastUpdatedAt: Date;} class OrderSummaryProjection implements Projection { constructor( private readDb: Database, // Could be different from event store private checkpointStore: CheckpointStore ) {} async handle(event: DomainEvent): Promise<void> { if (!event.type.startsWith('Order')) { return; // Ignore non-order events } switch (event.type) { case 'OrderPlaced': await this.handleOrderPlaced(event); break; case 'PaymentReceived': await this.handlePaymentReceived(event); break; case 'OrderShipped': await this.handleOrderShipped(event); break; case 'OrderCancelled': await this.handleOrderCancelled(event); break; } } private async handleOrderPlaced(event: DomainEvent): Promise<void> { const payload = event.payload as { customerId: string; customerName: string; items: Array<{ productId: string; quantity: number; price: number }>; }; const totalAmount = payload.items.reduce( (sum, item) => sum + (item.quantity * item.price), 0 ); // INSERT into read-optimized view await this.readDb.query(` INSERT INTO order_summaries (order_id, customer_id, customer_name, status, total_amount, item_count, created_at, last_updated_at) VALUES ($1, $2, $3, 'pending', $4, $5, $6, $6) ON CONFLICT (order_id) DO UPDATE SET customer_name = EXCLUDED.customer_name, total_amount = EXCLUDED.total_amount, item_count = EXCLUDED.item_count, last_updated_at = EXCLUDED.last_updated_at `, [ event.aggregateId, payload.customerId, payload.customerName, totalAmount, payload.items.length, event.timestamp ]); } private async handleOrderShipped(event: DomainEvent): Promise<void> { await this.readDb.query(` UPDATE order_summaries SET status = 'shipped', last_updated_at = $2 WHERE order_id = $1 `, [event.aggregateId, event.timestamp]); } // ... other handlers async getCheckpoint(): Promise<number> { return this.checkpointStore.get('order-summary-projection'); } async setCheckpoint(position: number): Promise<void> { await this.checkpointStore.set('order-summary-projection', position); }} // ============================================================// PROJECTION RUNNER: Consuming events// ============================================================ class ProjectionRunner { constructor( private eventStore: EventStore, private projections: Projection[] ) {} async start(): Promise<void> { for (const projection of this.projections) { this.runProjection(projection); } } private async runProjection(projection: Projection): Promise<void> { // Get last processed position let lastPosition = await projection.getCheckpoint(); console.log(`Starting projection from position ${lastPosition}`); // Subscribe to event stream from that position const subscription = this.eventStore.subscribeFromPosition(lastPosition); for await (const event of subscription) { try { // Process the event await projection.handle(event); // Update checkpoint await projection.setCheckpoint(event.globalPosition); } catch (error) { console.error(`Projection failed on event ${event.eventId}:`, error); // Options: // 1. Retry the event // 2. Skip and continue (with logging) // 3. Stop the projection and alert // 4. Dead-letter for manual intervention throw error; // Fail for now } } } // Rebuild a projection from scratch async rebuild(projection: Projection): Promise<void> { console.log('Starting projection rebuild...'); // Reset checkpoint to beginning await projection.setCheckpoint(0); // Process all historical events const allEvents = await this.eventStore.loadAll(); for (const event of allEvents) { await projection.handle(event); await projection.setCheckpoint(event.globalPosition); } console.log(`Rebuild complete: processed ${allEvents.length} events`); // Then continue with live subscription await this.runProjection(projection); }} // ============================================================// QUERY API: Using the projection// ============================================================ class OrderQueryService { constructor(private readDb: Database) {} // Fast query - single table lookup async getOrderSummary(orderId: string): Promise<OrderSummaryRow | null> { const result = await this.readDb.query( 'SELECT * FROM order_summaries WHERE order_id = $1', [orderId] ); return result.rows[0] || null; } // Complex query - still fast because it's a read-optimized view async getCustomerOrderHistory(customerId: string): Promise<OrderSummaryRow[]> { const result = await this.readDb.query( 'SELECT * FROM order_summaries WHERE customer_id = $1 ORDER BY created_at DESC', [customerId] ); return result.rows; } // Aggregation query async getOrdersByStatus(): Promise<Array<{ status: string; count: number }>> { const result = await this.readDb.query( 'SELECT status, COUNT(*) as count FROM order_summaries GROUP BY status' ); return result.rows; }}We've explored the practical mechanics of turning immutable event streams into usable application state. Let's consolidate the key insights:
What's next:
With the mechanics of event sourcing understood, we'll examine the trade-offs. Event sourcing isn't free—it adds complexity, changes your mental model, and requires different infrastructure. Understanding when the benefits outweigh the costs is critical for sound architectural decisions.
You now understand the practical mechanics of state reconstruction: replaying events, optimizing with snapshots, handling schema evolution, managing concurrency, and building projections for efficient reads.