Loading content...
A bank account that's been active for 10 years might have 50,000 transaction events. An e-commerce inventory item could accumulate 100,000 adjustment events. Replaying all these events on every load becomes painfully slow—seconds of latency for what should be a millisecond operation.
Snapshots solve this problem elegantly. A snapshot is a checkpoint of the aggregate's state at a specific version. Instead of replaying all 50,000 events, you load the snapshot from event 49,900 and replay only the 100 events since. This reduces O(50,000) to O(100)—a 500x improvement.
By the end of this page, you will understand when snapshots are necessary, how to design snapshot storage and versioning, strategies for creating and maintaining snapshots, handling schema evolution in snapshots, and the tradeoffs between different snapshotting approaches.
Not every aggregate needs snapshots. Adding them prematurely complicates your system with additional storage, versioning, and maintenance concerns. Apply snapshots when the cost justifies the complexity.
| Aggregate Characteristic | Snapshots Likely Needed? | Rationale |
|---|---|---|
| Short-lived (hours to days) | No | Events naturally bounded; full replay is fast |
| Few events (<100) | No | Full replay takes <1ms; no benefit |
| Moderate events (100-1,000) | Maybe | Monitor latency; implement if SLA violated |
| Many events (1,000-10,000) | Yes | Noticeable latency (50-200ms) affects UX |
| Very many events (10,000+) | Essential | Multi-second latency unacceptable for real-time |
| High-frequency access pattern | Yes | Repeated rehydration amplifies costs |
| Batch processing pattern | Maybe | Latency less critical; consider lazy snapshotting |
Add instrumentation to measure actual rehydration times before implementing snapshots. You may find that aggregate redesign (splitting large aggregates) is a better solution than snapshots. Snapshots treat the symptom; redesign fixes the cause.
A snapshot contains the serialized state of an aggregate at a specific point in its event stream. The snapshot includes metadata that enables correct loading and validation.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
// Snapshot structure and storage interface Snapshot<TState> { // Identity snapshotId: string; // Unique ID for this snapshot aggregateId: string; // Which aggregate this is a snapshot of aggregateType: string; // Type for validation and routing // Version tracking version: number; // Event stream version at snapshot time schemaVersion: number; // State schema version for compatibility // Timing createdAt: Date; // When snapshot was created // The actual state state: TState; // Serialized aggregate state // Integrity checksum?: string; // Hash for corruption detection} // Example snapshot for a bank accountinterface BankAccountState { accountId: string; balance: number; status: 'active' | 'frozen' | 'closed'; overdraftLimit: number; transactionCount: number; lastTransactionAt?: Date;} const exampleSnapshot: Snapshot<BankAccountState> = { snapshotId: 'snap_1234567890', aggregateId: 'BankAccount-acct_abc123', aggregateType: 'BankAccount', version: 5000, // State as of event #5000 schemaVersion: 3, // Third iteration of state schema createdAt: new Date('2024-01-15T10:00:00Z'), state: { accountId: 'acct_abc123', balance: 15750.42, status: 'active', overdraftLimit: 500, transactionCount: 5000, lastTransactionAt: new Date('2024-01-15T09:45:00Z'), }, checksum: 'sha256:a1b2c3d4...',}; // Snapshot store interfaceinterface SnapshotStore { // Save a new snapshot save<TState>(snapshot: Snapshot<TState>): Promise<void>; // Get the latest snapshot for an aggregate getLatest<TState>(aggregateId: string): Promise<Snapshot<TState> | null>; // Get snapshot at or before a specific version getAtVersion<TState>( aggregateId: string, maxVersion: number ): Promise<Snapshot<TState> | null>; // Delete old snapshots (retain only N most recent) prune(aggregateId: string, keepCount: number): Promise<number>;} // Loading with snapshotasync function loadAggregate<TState, TEvent>( aggregateId: string, eventStore: EventStore, snapshotStore: SnapshotStore, initialState: TState, apply: (state: TState, event: TEvent) => TState): Promise<{ state: TState; version: number }> { // 1. Try to load the latest snapshot const snapshot = await snapshotStore.getLatest<TState>(aggregateId); let currentState: TState; let fromVersion: number; if (snapshot) { // Start from snapshot currentState = snapshot.state; fromVersion = snapshot.version; console.log(`Loaded snapshot at version ${fromVersion}`); } else { // No snapshot, start from scratch currentState = initialState; fromVersion = 0; } // 2. Load events since snapshot const events = await eventStore.readStream<TEvent>(aggregateId, { fromVersion: fromVersion + 1, }); console.log(`Replaying ${events.length} events from version ${fromVersion + 1}`); // 3. Apply events to reach current state for (const event of events) { currentState = apply(currentState, event); } const finalVersion = fromVersion + events.length; return { state: currentState, version: finalVersion };}Snapshots can be stored in various ways, each with different tradeoffs for simplicity, performance, and operational complexity.
Store Snapshots as Special Events
Snapshots are stored in the same event store, either in a separate stream or as specially-typed events in the main stream.
Stream: BankAccount-acct_123
[1] AccountOpened
[2] MoneyDeposited
...
[100] MoneyWithdrawn
[101] __Snapshot__ {state: {...}, version: 100}
[102] MoneyDeposited
...
Pros:
Cons:
Begin with a single separate store (PostgreSQL table or Redis). Add tiers only when you have clear evidence of access patterns that justify the complexity. Premature optimization of snapshot storage is a common mistake.
When should a snapshot be created? Too frequently wastes storage and I/O. Too infrequently leaves you replaying thousands of events. Finding the right balance depends on your access patterns.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
// Snapshot creation strategies // Strategy 1: Every N eventsclass PeriodicSnapshotter { constructor( private snapshotStore: SnapshotStore, private interval: number = 100 // Snapshot every 100 events ) {} shouldSnapshot(previousVersion: number, currentVersion: number): boolean { // Snapshot if we've crossed an interval boundary const previousInterval = Math.floor(previousVersion / this.interval); const currentInterval = Math.floor(currentVersion / this.interval); return currentInterval > previousInterval; } async maybeSnapshot<TState>( aggregate: { id: string; state: TState; version: number }, previousVersion: number ): Promise<void> { if (this.shouldSnapshot(previousVersion, aggregate.version)) { await this.snapshotStore.save({ snapshotId: generateId(), aggregateId: aggregate.id, aggregateType: aggregate.constructor.name, version: aggregate.version, schemaVersion: getCurrentSchemaVersion(), createdAt: new Date(), state: aggregate.state, }); } }} // Strategy 2: Time-basedclass TimeBasedSnapshotter { private lastSnapshotTime = new Map<string, Date>(); constructor( private snapshotStore: SnapshotStore, private minIntervalMs: number = 3600000 // 1 hour minimum ) {} shouldSnapshot(aggregateId: string): boolean { const lastTime = this.lastSnapshotTime.get(aggregateId); if (!lastTime) return true; // Never snapshotted const elapsed = Date.now() - lastTime.getTime(); return elapsed >= this.minIntervalMs; } async maybeSnapshot<TState>( aggregate: { id: string; state: TState; version: number } ): Promise<void> { if (this.shouldSnapshot(aggregate.id)) { await this.snapshotStore.save({ snapshotId: generateId(), aggregateId: aggregate.id, aggregateType: aggregate.constructor.name, version: aggregate.version, schemaVersion: getCurrentSchemaVersion(), createdAt: new Date(), state: aggregate.state, }); this.lastSnapshotTime.set(aggregate.id, new Date()); } }} // Strategy 3: Adaptive based on replay costclass AdaptiveSnapshotter { constructor( private snapshotStore: SnapshotStore, private maxReplayEvents: number = 500 ) {} async shouldSnapshot( aggregateId: string, currentVersion: number ): Promise<boolean> { const latestSnapshot = await this.snapshotStore.getLatest(aggregateId); const snapshotVersion = latestSnapshot?.version ?? 0; const eventsSinceSnapshot = currentVersion - snapshotVersion; return eventsSinceSnapshot >= this.maxReplayEvents; }} // Strategy 4: Background snapshotting via subscriptionclass BackgroundSnapshotter { constructor( private eventStore: EventStore, private snapshotStore: SnapshotStore, private snapshotThreshold: number = 100 ) {} async start(): Promise<void> { // Subscribe to all events await this.eventStore.subscribeToAll(async (event) => { await this.checkAndSnapshot(event.aggregateId, event.streamPosition); }); } private async checkAndSnapshot( aggregateId: string, currentVersion: number ): Promise<void> { const latestSnapshot = await this.snapshotStore.getLatest(aggregateId); const snapshotVersion = latestSnapshot?.version ?? 0; if (currentVersion - snapshotVersion >= this.snapshotThreshold) { // Create snapshot asynchronously await this.createSnapshot(aggregateId, currentVersion); } } private async createSnapshot( aggregateId: string, targetVersion: number ): Promise<void> { // Load aggregate up to target version const events = await this.eventStore.readStream(aggregateId, { maxCount: targetVersion, }); const state = events.reduce(applyEvent, initialState); await this.snapshotStore.save({ snapshotId: generateId(), aggregateId, aggregateType: inferAggregateType(aggregateId), version: targetVersion, schemaVersion: getCurrentSchemaVersion(), createdAt: new Date(), state, }); }} // Strategy 5: On-demand / lazy snapshottingclass LazySnapshotter { constructor( private snapshotStore: SnapshotStore, private replayThreshold: number = 200 ) {} // Only snapshot after a slow load async snapshotIfSlowLoad<TState>( aggregate: { id: string; state: TState; version: number }, eventsReplayed: number, loadTimeMs: number ): Promise<void> { // Snapshot if we replayed many events or load was slow if (eventsReplayed >= this.replayThreshold || loadTimeMs > 100) { await this.snapshotStore.save({ snapshotId: generateId(), aggregateId: aggregate.id, aggregateType: aggregate.constructor.name, version: aggregate.version, schemaVersion: getCurrentSchemaVersion(), createdAt: new Date(), state: aggregate.state, }); } }}Creating snapshots in the command path adds latency. Background snapshotting via a subscription process is non-blocking: the main path remains fast, and snapshots are created asynchronously. The tradeoff is a brief window where the latest snapshot may be slightly stale.
Unlike events (which are immutable), snapshots contain the current state structure. When the state schema changes—new fields, renamed properties, type changes—old snapshots become incompatible. You need a migration strategy.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
// Snapshot schema evolution and migration // Version-aware snapshot loadinginterface SnapshotMigrator<TState> { currentVersion: number; migrate(snapshot: Snapshot<unknown>): TState;} class BankAccountMigrator implements SnapshotMigrator<BankAccountState> { currentVersion = 3; migrate(snapshot: Snapshot<unknown>): BankAccountState { let state = snapshot.state as Record<string, unknown>; let version = snapshot.schemaVersion; // Apply migrations sequentially if (version === 1) { state = this.migrateV1ToV2(state); version = 2; } if (version === 2) { state = this.migrateV2ToV3(state); version = 3; } if (version !== this.currentVersion) { throw new Error( `Cannot migrate from version ${snapshot.schemaVersion} to ${this.currentVersion}` ); } return state as BankAccountState; } // V1 -> V2: Added 'overdraftLimit' field private migrateV1ToV2(state: Record<string, unknown>): Record<string, unknown> { return { ...state, overdraftLimit: 0, // Default value for existing accounts }; } // V2 -> V3: Renamed 'balance' to 'currentBalance', added 'currency' private migrateV2ToV3(state: Record<string, unknown>): Record<string, unknown> { return { ...state, currentBalance: state.balance, currency: 'USD', // Assume USD for legacy accounts }; }} // Safe snapshot loading with migrationasync function loadWithMigration<TState>( aggregateId: string, snapshotStore: SnapshotStore, eventStore: EventStore, migrator: SnapshotMigrator<TState>, initialState: TState, apply: (state: TState, event: unknown) => TState): Promise<{ state: TState; version: number }> { const snapshot = await snapshotStore.getLatest<unknown>(aggregateId); let state: TState; let fromVersion: number; if (snapshot) { if (snapshot.schemaVersion === migrator.currentVersion) { // Current version, no migration needed state = snapshot.state as TState; } else if (snapshot.schemaVersion < migrator.currentVersion) { // Old version, migrate it console.log( `Migrating snapshot from v${snapshot.schemaVersion} to v${migrator.currentVersion}` ); state = migrator.migrate(snapshot); // Optionally: save migrated snapshot await snapshotStore.save({ ...snapshot, schemaVersion: migrator.currentVersion, state, }); } else { // Snapshot is from the FUTURE? This is a problem. throw new Error( `Snapshot version ${snapshot.schemaVersion} is newer than ` + `current version ${migrator.currentVersion}. ` + `Is this service running old code?` ); } fromVersion = snapshot.version; } else { state = initialState; fromVersion = 0; } // Replay events since snapshot const events = await eventStore.readStream(aggregateId, { fromVersion: fromVersion + 1, }); for (const event of events) { state = apply(state, event); } return { state, version: fromVersion + events.length };} // Alternative: Invalidate old snapshots instead of migratingclass InvalidatingSnapshotStore implements SnapshotStore { constructor( private baseStore: SnapshotStore, private currentSchemaVersion: number ) {} async getLatest<TState>(aggregateId: string): Promise<Snapshot<TState> | null> { const snapshot = await this.baseStore.getLatest<TState>(aggregateId); if (snapshot && snapshot.schemaVersion !== this.currentSchemaVersion) { // Old schema version, treat as if no snapshot exists // Full replay will create a snapshot with new schema console.log( `Ignoring stale snapshot (v${snapshot.schemaVersion}) for ${aggregateId}` ); return null; } return snapshot; } save<TState>(snapshot: Snapshot<TState>): Promise<void> { // Always save with current version return this.baseStore.save({ ...snapshot, schemaVersion: this.currentSchemaVersion, }); } // ... other methods delegate to baseStore}Always store the schema version with snapshots. Without it, you cannot safely evolve state structure. Even if you plan to always invalidate old snapshots, the version tells you which snapshots are stale.
Snapshots must accurately represent the aggregate state at their version. A corrupt or incorrect snapshot silently propagates wrong data. Implement safeguards to detect and recover from snapshot issues.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
// Snapshot verification and recovery // Checksums for corruption detectionfunction createSnapshotWithChecksum<TState>( aggregateId: string, state: TState, version: number): Snapshot<TState> { const stateJson = JSON.stringify(state); const checksum = crypto .createHash('sha256') .update(stateJson) .digest('hex'); return { snapshotId: generateId(), aggregateId, aggregateType: inferType(aggregateId), version, schemaVersion: getCurrentSchemaVersion(), createdAt: new Date(), state, checksum: `sha256:${checksum}`, };} function verifySnapshotChecksum<TState>(snapshot: Snapshot<TState>): boolean { if (!snapshot.checksum) return true; // No checksum to verify const stateJson = JSON.stringify(snapshot.state); const expectedChecksum = crypto .createHash('sha256') .update(stateJson) .digest('hex'); return snapshot.checksum === `sha256:${expectedChecksum}`;} // Verification during loadasync function loadWithVerification<TState>( aggregateId: string, snapshotStore: SnapshotStore, eventStore: EventStore, initialState: TState, apply: (state: TState, event: unknown) => TState): Promise<{ state: TState; version: number }> { const snapshot = await snapshotStore.getLatest<TState>(aggregateId); if (snapshot) { // Verify checksum if (!verifySnapshotChecksum(snapshot)) { console.error(`Snapshot corruption detected for ${aggregateId}`); metrics.increment('snapshot.corruption.detected'); // Fall back to full replay return loadWithoutSnapshot(aggregateId, eventStore, initialState, apply); } } // Normal load path... return loadWithSnapshot(snapshot, aggregateId, eventStore, initialState, apply);} // Periodic snapshot verification jobasync function verifyAllSnapshots( snapshotStore: SnapshotStore, eventStore: EventStore): Promise<VerificationReport> { const report: VerificationReport = { totalChecked: 0, valid: 0, corrupted: 0, stale: 0, errors: [], }; // Iterate all snapshots for await (const snapshot of snapshotStore.listAll()) { report.totalChecked++; try { // 1. Check checksum if (!verifySnapshotChecksum(snapshot)) { report.corrupted++; report.errors.push({ aggregateId: snapshot.aggregateId, issue: 'checksum_mismatch', }); continue; } // 2. Verify snapshot matches replay (expensive but thorough) const replayedState = await replayFromScratch( snapshot.aggregateId, eventStore, snapshot.version ); if (!deepEqual(snapshot.state, replayedState)) { report.stale++; report.errors.push({ aggregateId: snapshot.aggregateId, issue: 'state_mismatch', snapshotVersion: snapshot.version, }); continue; } report.valid++; } catch (error) { report.errors.push({ aggregateId: snapshot.aggregateId, issue: 'verification_error', error: (error as Error).message, }); } } return report;} // Automatic snapshot regeneration for corrupted/stale snapshotsasync function regenerateSnapshot( aggregateId: string, snapshotStore: SnapshotStore, eventStore: EventStore): Promise<void> { console.log(`Regenerating snapshot for ${aggregateId}`); // Full replay to current version const stream = await eventStore.readStream(aggregateId); const state = stream.events.reduce(applyEvent, initialState); // Create new, verified snapshot const snapshot = createSnapshotWithChecksum( aggregateId, state, stream.version ); await snapshotStore.save(snapshot); console.log(`Snapshot regenerated at version ${stream.version}`);}If a snapshot is corrupted or suspect, you can always regenerate it from events. This is the fundamental safety net of event sourcing: snapshots are an optimization, not the authority. Never modify events to match a snapshot—always do the opposite.
Snapshots accumulate over time. An aggregate with 10,000 events might have 100 snapshots (one per 100 events). Most are obsolete—only the latest is needed for normal operations. Implement pruning to control storage growth.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
// Snapshot retention and pruning strategies interface RetentionPolicy { // How many snapshots to keep per aggregate keepLatestCount: number; // Maximum age for any snapshot maxAgeDays: number; // Keep snapshots at specific intervals for history keepAtIntervals?: { dailyCount: number; // Keep N daily snapshots weeklyCount: number; // Keep N weekly snapshots monthlyCount: number; // Keep N monthly snapshots };} const defaultRetentionPolicy: RetentionPolicy = { keepLatestCount: 2, // Keep 2 most recent maxAgeDays: 90, // Delete anything older than 90 days}; // Pruning implementationclass SnapshotPruner { constructor( private snapshotStore: SnapshotStore, private policy: RetentionPolicy ) {} async pruneAggregate(aggregateId: string): Promise<PruneResult> { const snapshots = await this.snapshotStore.listByAggregate( aggregateId, { orderBy: 'createdAt', direction: 'desc' } ); const toDelete: string[] = []; const toKeep: string[] = []; const now = Date.now(); for (let i = 0; i < snapshots.length; i++) { const snapshot = snapshots[i]; const ageInDays = (now - snapshot.createdAt.getTime()) / (1000 * 60 * 60 * 24); // Always keep the N most recent if (i < this.policy.keepLatestCount) { toKeep.push(snapshot.snapshotId); continue; } // Delete if older than max age if (ageInDays > this.policy.maxAgeDays) { toDelete.push(snapshot.snapshotId); continue; } // Otherwise delete (we only keep keepLatestCount) toDelete.push(snapshot.snapshotId); } // Perform deletions for (const snapshotId of toDelete) { await this.snapshotStore.delete(snapshotId); } return { aggregateId, originalCount: snapshots.length, deletedCount: toDelete.length, keptCount: toKeep.length, }; } async pruneAll(): Promise<PruneSummary> { const summary: PruneSummary = { aggregatesPruned: 0, snapshotsDeleted: 0, snapshotsKept: 0, bytesFreed: 0, }; const aggregateIds = await this.snapshotStore.listAggregateIds(); for (const aggregateId of aggregateIds) { const result = await this.pruneAggregate(aggregateId); summary.aggregatesPruned++; summary.snapshotsDeleted += result.deletedCount; summary.snapshotsKept += result.keptCount; } return summary; }} // Scheduled pruning jobclass ScheduledSnapshotPruner { private timer: NodeJS.Timer | null = null; constructor( private pruner: SnapshotPruner, private intervalMs: number = 24 * 60 * 60 * 1000 // Daily ) {} start(): void { console.log('Starting scheduled snapshot pruning...'); // Run immediately on start this.runPruning(); // Schedule regular runs this.timer = setInterval(() => this.runPruning(), this.intervalMs); } stop(): void { if (this.timer) { clearInterval(this.timer); this.timer = null; } } private async runPruning(): Promise<void> { console.log('Running snapshot pruning...'); const startTime = Date.now(); try { const summary = await this.pruner.pruneAll(); console.log( `Snapshot pruning complete: ` + `${summary.aggregatesPruned} aggregates, ` + `${summary.snapshotsDeleted} deleted, ` + `${summary.snapshotsKept} kept, ` + `${Date.now() - startTime}ms` ); metrics.gauge('snapshot.count', summary.snapshotsKept); metrics.increment('snapshot.pruned', summary.snapshotsDeleted); } catch (error) { console.error('Snapshot pruning failed:', error); metrics.increment('snapshot.pruning.error'); } }}Always keep at least 2 snapshots per aggregate. If the latest snapshot is corrupted or from a bad schema version, you can fall back to the previous one rather than replaying from scratch.
We've covered comprehensive strategies for optimizing rehydration with snapshots. Let's consolidate the key insights:
What's next:
We've covered the core mechanics of event sourcing: events as source of truth, event store design, state reconstruction, and snapshot optimization. The final page explores when to use event sourcing—the decision framework for choosing this pattern, its benefits and costs, and common pitfalls to avoid.
You now understand how to use snapshots to keep event-sourced systems fast regardless of event count. Snapshots transform O(n) rehydration into O(1) + O(k) where k is the events since the last snapshot—a critical optimization for long-lived aggregates. Next, we'll synthesize everything into guidance on when event sourcing is the right choice.