Loading content...
If events are the source of truth, then the event store is the vault that protects that truth. An event store is not just another database—it's a specialized storage system designed around the unique requirements of event sourcing: append-only writes, ordered reads, stream partitioning, and optimistic concurrency.
Choosing or designing an event store is one of the most consequential architectural decisions in an event-sourced system. Get it right, and you have a foundation that scales elegantly. Get it wrong, and you'll spend years fighting storage limitations, ordering bugs, and performance bottlenecks.
By the end of this page, you will understand the core requirements of an event store, different storage models and their tradeoffs, partitioning and ordering strategies, how to implement optimistic concurrency, and the critical decision of building versus buying an event store solution.
An event store must satisfy several critical requirements that distinguish it from general-purpose databases. Understanding these requirements is essential whether you're building a custom solution or evaluating existing products.
| Operation | Traditional Database | Event Store |
|---|---|---|
| Create | INSERT row | APPEND event(s) to stream |
| Read current state | SELECT row | READ stream + FOLD events |
| Update | UPDATE row | APPEND new event(s) |
| Delete | DELETE row | APPEND 'deleted' event |
| Concurrency | Pessimistic locks or OCC on row version | OCC on stream version |
| Query by field | WHERE clause on any column | Requires separate projection/index |
| Historical state | Not supported without audit table | Built-in: read up to point in time |
You can implement event sourcing on PostgreSQL or any RDBMS—and many teams do successfully. However, general-purpose databases require careful schema design, manual ordering guarantees, and custom subscription mechanisms. Purpose-built event stores provide these out of the box with optimized implementations.
Event stores can be implemented using various underlying storage models. Each has different tradeoffs for write throughput, read patterns, scalability, and operational complexity.
Single Table Model
The simplest approach: store all events in one large table with stream ID, sequence number, and event data columns.
Schema:
CREATE TABLE events (
global_position BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(255) NOT NULL,
stream_position INTEGER NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (stream_id, stream_position)
);
CREATE INDEX idx_events_stream ON events (stream_id, stream_position);
Pros:
global_positionCons:
How you organize and name streams has significant implications for querying, access control, and evolution. A well-designed stream naming convention provides natural categorization and enables efficient filtering.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
// Stream naming conventions // Pattern 1: {AggregateType}-{AggregateId}// Simple and widely usedconst orderStream = "Order-ord_12345";const accountStream = "Account-acct_67890";const inventoryStream = "Inventory-sku_abc123"; // Pattern 2: {Category}/{AggregateType}/{AggregateId}// Enables category-based projections and access controlconst orderStream2 = "commerce/Order/ord_12345";const shippingStream = "logistics/Shipment/ship_99999";const userStream = "identity/User/usr_44444"; // Pattern 3: Hierarchical with tenant isolation// Multi-tenant systems benefit from tenant prefixconst tenantOrderStream = "tenant_acme/Order/ord_12345";const tenantUserStream = "tenant_globex/User/usr_88888"; // Category streams (virtual streams aggregating by type)// EventStoreDB provides this via $ce-{category} system projectionsconst allOrdersCategory = "$ce-Order"; // All Order eventsconst allAccountsCategory = "$ce-Account"; // Utility functions for stream managementinterface StreamId { aggregateType: string; aggregateId: string; category?: string; tenant?: string;} function buildStreamId(params: StreamId): string { const parts: string[] = []; if (params.tenant) { parts.push(params.tenant); } if (params.category) { parts.push(params.category); } parts.push(params.aggregateType); parts.push(params.aggregateId); return parts.join('/');} function parseStreamId(streamId: string): StreamId { const parts = streamId.split('/'); // Handle different formats if (parts.length === 2) { // Simple: Type/Id or Type-Id const [type, id] = parts[0].includes('-') ? parts[0].split('-') : parts; return { aggregateType: type, aggregateId: id }; } if (parts.length === 3) { // With category: category/Type/Id return { category: parts[0], aggregateType: parts[1], aggregateId: parts[2], }; } if (parts.length === 4) { // With tenant: tenant/category/Type/Id return { tenant: parts[0], category: parts[1], aggregateType: parts[2], aggregateId: parts[3], }; } throw new Error(`Invalid stream ID format: ${streamId}`);} // Stream filtering by patternasync function readStreamsByPattern( eventStore: EventStore, pattern: string): Promise<Event[][]> { // Examples: // "Order-*" - all Order streams // "tenant_acme/*" - all streams for tenant // "*/Shipment/*" - all shipments across tenants const matchingStreamIds = await eventStore.listStreams({ pattern }); return Promise.all( matchingStreamIds.map(id => eventStore.readStream(id)) );}Use consistent, hierarchical naming that enables pattern-based queries. Include tenant/category prefixes if you'll need to filter by them. Avoid embedding mutable data (like user names) in stream IDs—use stable identifiers. Consider URL-style naming (slashes) for readability and tooling compatibility.
Optimistic concurrency is the mechanism that enables event sourcing to work correctly in concurrent environments. When appending events, the writer specifies the expected current version of the stream. If another writer has appended events in the meantime, the append fails, forcing the writer to reload and retry.
This is fundamentally different from pessimistic locking (which blocks writers) and is essential for the high-throughput, low-latency characteristics of event-sourced systems.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
// Optimistic concurrency implementation interface AppendResult { success: boolean; newVersion?: number; error?: 'WrongExpectedVersion' | 'StreamNotFound' | 'Unknown'; currentVersion?: number;} interface EventStore { append( streamId: string, events: DomainEvent[], expectedVersion: number | 'any' | 'noStream' ): Promise<AppendResult>; readStream(streamId: string): Promise<{ events: DomainEvent[]; version: number }>;} // Command handler with optimistic concurrency and retryasync function handleCommand<TAggregate extends Aggregate>( eventStore: EventStore, aggregateType: new (id: string) => TAggregate, aggregateId: string, command: Command, maxRetries: number = 3): Promise<void> { let lastError: Error | null = null; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { // 1. Load current state const { events, version } = await eventStore.readStream(aggregateId); // 2. Rehydrate aggregate const aggregate = new aggregateType(aggregateId); aggregate.loadFromHistory(events); // 3. Execute command (produces new events) aggregate.handle(command); // 4. Get uncommitted events const newEvents = aggregate.getUncommittedEvents(); if (newEvents.length === 0) { return; // No changes needed } // 5. Attempt to append with expected version const result = await eventStore.append( aggregateId, newEvents, version // Expected version from our read ); if (result.success) { console.log(`Command succeeded, new version: ${result.newVersion}`); return; } if (result.error === 'WrongExpectedVersion') { console.log( `Concurrency conflict on attempt ${attempt}. ` + `Expected: ${version}, Current: ${result.currentVersion}. Retrying...` ); lastError = new Error('WrongExpectedVersion'); continue; // Retry with fresh state } throw new Error(result.error); } catch (error) { lastError = error as Error; if (attempt === maxRetries) break; } } throw new Error( `Failed to handle command after ${maxRetries} attempts: ${lastError?.message}` );} // Special version constantsconst ExpectedVersion = { // Stream must not exist (for creating new aggregates) NoStream: -1, // Any version is acceptable (dangerous: skips concurrency check) Any: -2, // Stream must exist (any version) StreamExists: -4,} as const; // Usage examplesasync function examples(eventStore: EventStore) { // Create new aggregate (stream must not exist) await eventStore.append( 'Order-new123', [orderCreatedEvent], ExpectedVersion.NoStream ); // Append to existing (with concurrency check) const { version } = await eventStore.readStream('Order-existing456'); await eventStore.append( 'Order-existing456', [orderUpdatedEvent], version ); // Idempotent append (when you don't care about conflicts) // Use sparingly - usually indicates design issue await eventStore.append( 'Order-any789', [logEvent], ExpectedVersion.Any );}Using ExpectedVersion.Any bypasses concurrency protection and can lead to data corruption in business aggregates. It's acceptable for append-only logs (telemetry, analytics) but should be avoided for domain aggregates where business invariants matter.
An event store isn't just a write destination—it's also a source for downstream consumers that build read models, trigger integrations, and synchronize external systems. This is accomplished through subscriptions: mechanisms for efficiently reading new events as they're written.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
// Subscription patterns for event processing interface Subscription { start(): void; stop(): Promise<void>;} interface SubscriptionOptions { fromPosition: number | 'start' | 'end'; batchSize?: number; onEvent: (event: StoredEvent) => Promise<void>; onError: (error: Error) => void; checkpointInterval?: number;} // Catch-up subscription with checkpointingclass CatchUpSubscription implements Subscription { private position: number; private running = false; private checkpointStore: CheckpointStore; constructor( private eventStore: EventStore, private streamPattern: string, private options: SubscriptionOptions, checkpointStore: CheckpointStore ) { this.checkpointStore = checkpointStore; } async start(): Promise<void> { // Load last checkpoint this.position = await this.checkpointStore.getPosition( this.streamPattern ) ?? ( this.options.fromPosition === 'start' ? 0 : this.options.fromPosition === 'end' ? await this.eventStore.getLatestPosition() : this.options.fromPosition ); this.running = true; await this.processLoop(); } private async processLoop(): Promise<void> { let eventsProcessed = 0; while (this.running) { try { // Read batch of events const events = await this.eventStore.readForward( this.streamPattern, this.position + 1, this.options.batchSize ?? 100 ); if (events.length === 0) { // No new events, wait before polling again await sleep(100); continue; } // Process each event for (const event of events) { await this.options.onEvent(event); this.position = event.globalPosition; eventsProcessed++; // Checkpoint periodically if (eventsProcessed % (this.options.checkpointInterval ?? 100) === 0) { await this.checkpointStore.savePosition( this.streamPattern, this.position ); } } } catch (error) { this.options.onError(error as Error); await sleep(1000); // Back off on error } } } async stop(): Promise<void> { this.running = false; // Final checkpoint await this.checkpointStore.savePosition( this.streamPattern, this.position ); }} // Projection manager using subscriptionsclass ProjectionManager { private projections = new Map<string, Projection>(); registerProjection(projection: Projection): void { this.projections.set(projection.name, projection); } async rebuildProjection(name: string): Promise<void> { const projection = this.projections.get(name); if (!projection) throw new Error(`Unknown projection: ${name}`); console.log(`Rebuilding projection: ${name}`); // Clear existing read model await projection.reset(); // Replay all events from beginning const subscription = new CatchUpSubscription( this.eventStore, projection.streamPattern, { fromPosition: 'start', onEvent: async (event) => { await projection.apply(event); }, onError: (error) => { console.error(`Projection error: ${error.message}`); }, }, this.checkpointStore ); // Wait until caught up, then switch to live await subscription.start(); console.log(`Projection ${name} rebuilt and running live`); }} // Example: Building an order summary read modelconst orderSummaryProjection: Projection = { name: 'OrderSummary', streamPattern: '$ce-Order', // All Order category events async apply(event: StoredEvent): Promise<void> { switch (event.eventType) { case 'OrderPlaced': { const data = event.payload as OrderPlacedPayload; await db.insert('order_summaries').values({ orderId: event.aggregateId, customerId: data.customerId, status: 'pending', totalAmount: data.totalAmount, itemCount: data.items.length, createdAt: event.occurredAt, lastUpdated: event.occurredAt, }); break; } case 'PaymentReceived': await db.update('order_summaries') .set({ status: 'paid', lastUpdated: event.occurredAt }) .where({ orderId: event.aggregateId }); break; case 'OrderShipped': await db.update('order_summaries') .set({ status: 'shipped', lastUpdated: event.occurredAt }) .where({ orderId: event.aggregateId }); break; } }, async reset(): Promise<void> { await db.delete('order_summaries').execute(); },};Design projections to be idempotent—processing the same event multiple times should produce the same result. This allows safe replay after failures. Use upsert operations where possible, and store event position with projection rows to detect duplicates.
Ordering is critical in event sourcing—events must be processed in the correct sequence to reconstruct valid state. However, different ordering guarantees have vastly different scalability implications.
| Level | Guarantee | Scalability | Use Case |
|---|---|---|---|
| Per-Stream | Events ordered within single stream | Excellent: horizontal scaling | Default for aggregate streams |
| Per-Category | Events ordered within category (e.g., all Orders) | Moderate: limited parallelism | Projections needing cross-aggregate ordering |
| Global | All events globally ordered | Limited: single writer bottleneck | Full audit logs, compliance |
Per-stream ordering is almost always sufficient for aggregate operations and most projections. Global ordering is expensive and rarely necessary.
Causation and correlation IDs help when you need to understand relationships between events across streams without strict global ordering:
With these, you can reconstruct the causal chain without global ordering.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
// Causal ordering through metadata interface EventMetadata { // Unique ID for this event eventId: string; // Links related events across a distributed operation // All events from same user action share correlationId correlationId: string; // ID of the event that directly caused this event // Creates a DAG of causation causationId: string; // Stream position for this specific stream streamPosition: number; // Global position in the event store (if available) globalPosition?: number;} // Example: Order flow with causal chain// User places order -> Payment processed -> Inventory reserved -> Shipment created const events = [ { eventId: "evt_001", eventType: "OrderPlaced", streamId: "Order-123", metadata: { correlationId: "corr_user_request_abc", causationId: "cmd_place_order_xyz", // The command that triggered this streamPosition: 1, globalPosition: 1001, }, }, { eventId: "evt_002", eventType: "PaymentProcessed", streamId: "Payment-456", metadata: { correlationId: "corr_user_request_abc", // Same correlation causationId: "evt_001", // Caused by OrderPlaced streamPosition: 1, globalPosition: 1005, // Later global position }, }, { eventId: "evt_003", eventType: "InventoryReserved", streamId: "Inventory-789", metadata: { correlationId: "corr_user_request_abc", causationId: "evt_002", // Caused by PaymentProcessed streamPosition: 15, globalPosition: 1008, }, }, { eventId: "evt_004", eventType: "ShipmentCreated", streamId: "Shipment-999", metadata: { correlationId: "corr_user_request_abc", causationId: "evt_003", // Caused by InventoryReserved streamPosition: 1, globalPosition: 1012, }, },]; // Reconstruct causal chain for debuggingfunction buildCausalChain( events: StoredEvent[], correlationId: string): Map<string, StoredEvent[]> { // Group events by correlation const correlated = events.filter( e => e.metadata.correlationId === correlationId ); // Build causation tree const byEventId = new Map(correlated.map(e => [e.eventId, e])); const chain = new Map<string, StoredEvent[]>(); for (const event of correlated) { const cause = event.metadata.causationId; if (!chain.has(cause)) { chain.set(cause, []); } chain.get(cause)!.push(event); } return chain;} // Print causal chain for debuggingfunction printCausalChain( chain: Map<string, StoredEvent[]>, rootId: string, indent = 0): void { const effects = chain.get(rootId) || []; for (const event of effects) { console.log(`${' '.repeat(indent)}-> [${event.streamId}] ${event.eventType}`); printCausalChain(chain, event.eventId, indent + 1); }} // Output:// -> [Order-123] OrderPlaced// -> [Payment-456] PaymentProcessed// -> [Inventory-789] InventoryReserved// -> [Shipment-999] ShipmentCreatedOne of the most significant decisions is whether to build your own event store on a general-purpose database or adopt a purpose-built solution. Both paths have legitimate use cases.
| Factor | Build (RDBMS) | Buy (EventStoreDB) | Recommendation |
|---|---|---|---|
| Team experience | Strong SQL, new to ES | Willing to learn new tool | Start with SQL if team is new to ES |
| Scale expectations | <10M events/year | 100M events/year | Purpose-built scales easier |
| Subscription criticality | Simple, batch OK | Real-time, competing consumers | EventStoreDB excels here |
| Operational constraints | Must minimize infra | Can add new databases | SQL if infra is constrained |
| Budget | Tight | Available | SQL is cheaper initially |
| Timeline | Prototype/MVP | Production system | SQL for speed, migrate later |
If uncertain, start with PostgreSQL. The schema is simple, and you can migrate events to a purpose-built store later if needed. The event format is what matters—the storage can be swapped. Facebook, LinkedIn, and many others run event-sourced systems on PostgreSQL-based stores.
We've covered the essential aspects of designing and selecting an event store. Let's consolidate the key insights:
What's next:
With events stored durably, we need to reconstruct current state efficiently. The next page explores rebuilding state from events—the mechanics of rehydration, the cost of long event streams, and strategies for handling aggregates with thousands of events.
You now understand the architecture and design considerations for event stores. Whether building on PostgreSQL or adopting EventStoreDB, you know the essential requirements: append-only storage, ordered reads, optimistic concurrency, and efficient subscriptions. Next, we'll explore how to efficiently rebuild state from these event streams.