Loading content...
So far, we've focused on the Command Pattern's ability to decouple invokers from receivers and enable undo/redo. But the pattern's power extends much further. When requests become objects, they become data—and data can be queued, persisted, transmitted, replayed, and analyzed.
Consider these scenarios:
All of these are natural extensions of the Command Pattern. This page explores command queues and logging in depth.
By the end of this page, you will understand how to build command queues for batch and asynchronous execution, implement comprehensive operation logging for audit trails, serialize commands for persistence and transmission, and leverage command logs for debugging and analytics.
A command queue is a data structure that holds commands for deferred execution. Rather than executing commands immediately, the invoker adds them to a queue, and a separate executor processes them—potentially asynchronously, in batches, or on a schedule.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
interface QueueableCommand { execute(): Promise<void>; getDescription(): string; getPriority(): number; // Higher = more important} class CommandQueue { private queue: QueueableCommand[] = []; private isProcessing: boolean = false; private isPaused: boolean = false; /** * Add a command to the queue. * Commands are sorted by priority (highest first). */ enqueue(command: QueueableCommand): void { this.queue.push(command); this.queue.sort((a, b) => b.getPriority() - a.getPriority()); // Start processing if not already running if (!this.isProcessing && !this.isPaused) { this.processQueue(); } } /** * Process commands one at a time, in priority order. */ private async processQueue(): Promise<void> { if (this.isProcessing) return; this.isProcessing = true; while (this.queue.length > 0 && !this.isPaused) { const command = this.queue.shift()!; try { console.log(`Executing: ${command.getDescription()}`); await command.execute(); } catch (error) { console.error(`Failed: ${command.getDescription()}`, error); // Could re-queue, log, or invoke error handler } } this.isProcessing = false; } pause(): void { this.isPaused = true; } resume(): void { this.isPaused = false; if (!this.isProcessing) { this.processQueue(); } } clear(): void { this.queue = []; } getQueueLength(): number { return this.queue.length; } getQueueSnapshot(): string[] { return this.queue.map(c => c.getDescription()); }}Different applications require different processing strategies:
| Strategy | Description | Use Case |
|---|---|---|
| FIFO | First In, First Out | Simple task queues, message passing |
| Priority | Highest priority first | Mixed-urgency tasks, VIP processing |
| LIFO/Stack | Last In, First Out | Undo stacks, recent-item prioritization |
| Scheduled | Execute at specific time | Cron jobs, reminders, delayed actions |
| Batch | Accumulate, then process together | Database writes, API calls, notifications |
| Rate-limited | Fixed throughput maximum | API rate limits, resource protection |
Command queues are the foundation of message-driven architectures. The producer (invoker) and consumer (executor) don't need to know about each other. They only share the command interface. This enables independent scaling, failure isolation, and system evolution.
Many operations are more efficient when batched. Database inserts, API calls, and network requests all benefit from grouping. Command queues make batching natural.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
interface BatchableCommand { execute(): Promise<void>; getDescription(): string; getBatchKey(): string; // Commands with same key can batch} interface BatchExecutor { executeBatch(commands: BatchableCommand[]): Promise<void>;} class BatchCommandQueue { private batches: Map<string, BatchableCommand[]> = new Map(); private batchExecutors: Map<string, BatchExecutor> = new Map(); private batchSize: number; private batchTimeout: number; private timeoutHandles: Map<string, NodeJS.Timeout> = new Map(); constructor(options: { batchSize?: number; batchTimeoutMs?: number; } = {}) { this.batchSize = options.batchSize ?? 100; this.batchTimeout = options.batchTimeoutMs ?? 1000; } registerExecutor(batchKey: string, executor: BatchExecutor): void { this.batchExecutors.set(batchKey, executor); } enqueue(command: BatchableCommand): void { const key = command.getBatchKey(); // Get or create batch let batch = this.batches.get(key); if (!batch) { batch = []; this.batches.set(key, batch); // Start timeout for this batch const handle = setTimeout( () => this.flushBatch(key), this.batchTimeout ); this.timeoutHandles.set(key, handle); } batch.push(command); // Flush if batch is full if (batch.length >= this.batchSize) { this.flushBatch(key); } } private async flushBatch(key: string): Promise<void> { const batch = this.batches.get(key); const executor = this.batchExecutors.get(key); if (!batch || batch.length === 0) return; // Clear batch and timeout this.batches.delete(key); const timeout = this.timeoutHandles.get(key); if (timeout) { clearTimeout(timeout); this.timeoutHandles.delete(key); } // Execute batch if (executor) { try { console.log(`Executing batch [${key}]: ${batch.length} commands`); await executor.executeBatch(batch); } catch (error) { console.error(`Batch [${key}] failed:`, error); // Could retry individually, dead-letter, etc. } } else { // No batch executor, run individually for (const command of batch) { await command.execute(); } } } async flushAll(): Promise<void> { const keys = Array.from(this.batches.keys()); await Promise.all(keys.map(key => this.flushBatch(key))); }} // Example: Batch database insertsclass InsertRecordCommand implements BatchableCommand { constructor( private tableName: string, private record: Record<string, unknown> ) {} async execute(): Promise<void> { await db.insert(this.tableName, this.record); } getDescription(): string { return `Insert into ${this.tableName}`; } getBatchKey(): string { return `db:insert:${this.tableName}`; }} class BulkInsertExecutor implements BatchExecutor { constructor(private tableName: string) {} async executeBatch(commands: BatchableCommand[]): Promise<void> { const records = (commands as InsertRecordCommand[]) .map(c => c.record); // Single batch insert instead of N individual inserts await db.bulkInsert(this.tableName, records); }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
interface ScheduledCommand { command: QueueableCommand; executeAt: Date; id: string; recurring?: { interval: number; // milliseconds maxOccurrences?: number; };} class ScheduledCommandQueue { private scheduled: ScheduledCommand[] = []; private executedCount: Map<string, number> = new Map(); private checkInterval: NodeJS.Timeout | null = null; constructor(private checkIntervalMs: number = 1000) { this.startScheduler(); } /** * Schedule a command for future execution. */ schedule( command: QueueableCommand, executeAt: Date, recurring?: { interval: number; maxOccurrences?: number } ): string { const id = crypto.randomUUID(); this.scheduled.push({ command, executeAt, id, recurring, }); // Sort by execution time this.scheduled.sort((a, b) => a.executeAt.getTime() - b.executeAt.getTime() ); return id; } /** * Schedule a command to run after a delay. */ scheduleAfter( command: QueueableCommand, delayMs: number ): string { const executeAt = new Date(Date.now() + delayMs); return this.schedule(command, executeAt); } /** * Cancel a scheduled command. */ cancel(id: string): boolean { const index = this.scheduled.findIndex(s => s.id === id); if (index !== -1) { this.scheduled.splice(index, 1); return true; } return false; } private startScheduler(): void { this.checkInterval = setInterval( () => this.processScheduled(), this.checkIntervalMs ); } private async processScheduled(): Promise<void> { const now = new Date(); // Find all commands due for execution while ( this.scheduled.length > 0 && this.scheduled[0].executeAt <= now ) { const scheduled = this.scheduled.shift()!; try { await scheduled.command.execute(); // Handle recurring if (scheduled.recurring) { const count = (this.executedCount.get(scheduled.id) ?? 0) + 1; this.executedCount.set(scheduled.id, count); const maxOccurrences = scheduled.recurring.maxOccurrences; if (!maxOccurrences || count < maxOccurrences) { // Schedule next occurrence const nextExecuteAt = new Date( scheduled.executeAt.getTime() + scheduled.recurring.interval ); this.scheduled.push({ ...scheduled, executeAt: nextExecuteAt, }); this.scheduled.sort((a, b) => a.executeAt.getTime() - b.executeAt.getTime() ); } } } catch (error) { console.error( `Scheduled command failed: ${scheduled.command.getDescription()}`, error ); } } } stop(): void { if (this.checkInterval) { clearInterval(this.checkInterval); this.checkInterval = null; } } getScheduled(): Array<{ id: string; description: string; executeAt: Date }> { return this.scheduled.map(s => ({ id: s.id, description: s.command.getDescription(), executeAt: s.executeAt, })); }}For production workloads, use dedicated job schedulers like RabbitMQ with delayed messages, AWS SQS with delay queues, or specialized tools like Temporal.io or Bull. These provide persistence, distribution, monitoring, and resilience that in-memory queues cannot.
Real-world command execution fails. Networks timeout, databases lock, services become unavailable. Robust command queues need retry strategies and error handling.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
interface RetryableCommand extends QueueableCommand { getMaxRetries(): number; shouldRetry(error: Error, attempt: number): boolean; onRetry?(attempt: number, error: Error): void; onFinalFailure?(error: Error, attempts: number): void;} interface RetryPolicy { maxRetries: number; initialDelayMs: number; maxDelayMs: number; backoffMultiplier: number; retryableErrors?: string[]; // Error types to retry} class RetryingCommandQueue { private queue: Array<{ command: RetryableCommand; attempts: number; }> = []; private deadLetterQueue: Array<{ command: RetryableCommand; error: Error; attempts: number; failedAt: Date; }> = []; private defaultPolicy: RetryPolicy; constructor(defaultPolicy: RetryPolicy = { maxRetries: 3, initialDelayMs: 1000, maxDelayMs: 30000, backoffMultiplier: 2, }) { this.defaultPolicy = defaultPolicy; } async enqueue(command: RetryableCommand): Promise<void> { this.queue.push({ command, attempts: 0 }); await this.processNext(); } private async processNext(): Promise<void> { while (this.queue.length > 0) { const { command, attempts } = this.queue.shift()!; try { await command.execute(); console.log(`✓ ${command.getDescription()}`); } catch (error) { const err = error as Error; const newAttempts = attempts + 1; if (this.shouldRetry(command, err, newAttempts)) { const delay = this.calculateDelay(newAttempts); console.log( `↻ Retrying ${command.getDescription()} ` + `in ${delay}ms (attempt ${newAttempts})` ); command.onRetry?.(newAttempts, err); // Re-queue with delay await this.sleep(delay); this.queue.unshift({ command, attempts: newAttempts }); } else { // Move to dead letter queue console.error( `✗ Failed permanently: ${command.getDescription()}` ); command.onFinalFailure?.(err, newAttempts); this.deadLetterQueue.push({ command, error: err, attempts: newAttempts, failedAt: new Date(), }); } } } } private shouldRetry( command: RetryableCommand, error: Error, attempt: number ): boolean { const maxRetries = command.getMaxRetries(); if (attempt > maxRetries) { return false; } return command.shouldRetry(error, attempt); } private calculateDelay(attempt: number): number { const delay = this.defaultPolicy.initialDelayMs * Math.pow(this.defaultPolicy.backoffMultiplier, attempt - 1); // Add jitter (±20%) const jitter = delay * 0.2 * (Math.random() * 2 - 1); return Math.min(delay + jitter, this.defaultPolicy.maxDelayMs); } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } /** * Get failed commands for manual review or retry. */ getDeadLetterQueue(): Array<{ command: RetryableCommand; error: Error; attempts: number; failedAt: Date; }> { return [...this.deadLetterQueue]; } /** * Retry all dead-letter commands (e.g., after infrastructure fix). */ async retryDeadLetters(): Promise<void> { const toRetry = [...this.deadLetterQueue]; this.deadLetterQueue = []; for (const { command } of toRetry) { await this.enqueue(command); } }} // Example: API call with retriesclass ApiCallCommand implements RetryableCommand { constructor( private endpoint: string, private payload: unknown ) {} async execute(): Promise<void> { const response = await fetch(this.endpoint, { method: 'POST', body: JSON.stringify(this.payload), }); if (!response.ok) { throw new Error(`HTTP ${response.status}`); } } getDescription(): string { return `API call to ${this.endpoint}`; } getPriority(): number { return 1; } getMaxRetries(): number { return 5; } shouldRetry(error: Error, attempt: number): boolean { // Retry on network errors and 5xx server errors const message = error.message; if (message.includes('network') || message.includes('timeout')) { return true; } if (message.startsWith('HTTP 5')) { return true; } // Don't retry client errors (4xx) if (message.startsWith('HTTP 4')) { return false; } return true; } onRetry(attempt: number, error: Error): void { console.log(` Retry #${attempt} due to: ${error.message}`); } onFinalFailure(error: Error, attempts: number): void { // Could log to monitoring, send alert, etc. console.error(` Final failure after ${attempts} attempts`); }}When retrying commands, ensure operations are idempotent—executing them multiple times produces the same result as executing once. Use idempotency keys, unique identifiers, or upsert semantics to prevent duplicate effects when the executor doesn't know if a previous attempt succeeded.
Commands are perfect for creating audit logs. Since each command encapsulates a complete operation description, logging becomes trivial—and the log becomes a complete record of system activity.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
interface LoggableCommand { execute(): Promise<void>; getDescription(): string; getType(): string; getParameters(): Record<string, unknown>; getUserId(): string; getSensitiveFields(): string[]; // Fields to redact} interface CommandLogEntry { id: string; timestamp: Date; userId: string; commandType: string; description: string; parameters: Record<string, unknown>; duration: number; success: boolean; errorMessage?: string; correlationId: string; sessionId: string; ipAddress?: string; userAgent?: string;} class AuditingCommandExecutor { private logger: CommandLogger; private correlationId: string; private sessionContext: { sessionId: string; ipAddress?: string; userAgent?: string; }; constructor( logger: CommandLogger, sessionContext: { sessionId: string; ipAddress?: string; userAgent?: string; } ) { this.logger = logger; this.correlationId = crypto.randomUUID(); this.sessionContext = sessionContext; } async execute(command: LoggableCommand): Promise<void> { const startTime = Date.now(); const logId = crypto.randomUUID(); // Redact sensitive fields before logging const parameters = this.redactSensitiveFields( command.getParameters(), command.getSensitiveFields() ); let success = true; let errorMessage: string | undefined; try { await command.execute(); } catch (error) { success = false; errorMessage = (error as Error).message; throw error; } finally { const duration = Date.now() - startTime; const entry: CommandLogEntry = { id: logId, timestamp: new Date(), userId: command.getUserId(), commandType: command.getType(), description: command.getDescription(), parameters, duration, success, errorMessage, correlationId: this.correlationId, sessionId: this.sessionContext.sessionId, ipAddress: this.sessionContext.ipAddress, userAgent: this.sessionContext.userAgent, }; await this.logger.log(entry); } } private redactSensitiveFields( params: Record<string, unknown>, sensitiveFields: string[] ): Record<string, unknown> { const redacted = { ...params }; for (const field of sensitiveFields) { if (field in redacted) { redacted[field] = '[REDACTED]'; } } // Also redact common sensitive field names const commonSensitive = [ 'password', 'token', 'secret', 'apiKey', 'creditCard', 'ssn', 'dob' ]; for (const key of Object.keys(redacted)) { if (commonSensitive.some(s => key.toLowerCase().includes(s.toLowerCase()) )) { redacted[key] = '[REDACTED]'; } } return redacted; }} interface CommandLogger { log(entry: CommandLogEntry): Promise<void>; query(criteria: LogQueryCriteria): Promise<CommandLogEntry[]>;} interface LogQueryCriteria { userId?: string; commandType?: string; startTime?: Date; endTime?: Date; success?: boolean; correlationId?: string; limit?: number;} // Database-backed loggerclass DatabaseCommandLogger implements CommandLogger { constructor(private db: Database) {} async log(entry: CommandLogEntry): Promise<void> { await this.db.insert('command_logs', { ...entry, parameters: JSON.stringify(entry.parameters), }); } async query(criteria: LogQueryCriteria): Promise<CommandLogEntry[]> { let query = this.db.from('command_logs'); if (criteria.userId) { query = query.where('userId', '=', criteria.userId); } if (criteria.commandType) { query = query.where('commandType', '=', criteria.commandType); } if (criteria.startTime) { query = query.where('timestamp', '>=', criteria.startTime); } if (criteria.endTime) { query = query.where('timestamp', '<=', criteria.endTime); } if (criteria.success !== undefined) { query = query.where('success', '=', criteria.success); } if (criteria.correlationId) { query = query.where('correlationId', '=', criteria.correlationId); } query = query.orderBy('timestamp', 'desc'); if (criteria.limit) { query = query.limit(criteria.limit); } const rows = await query.execute(); return rows.map(row => ({ ...row, parameters: JSON.parse(row.parameters), })); }}For persistence, transmission, or cross-system communication, commands must be serializable. This enables:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
interface SerializableCommand { execute(): Promise<void>; serialize(): SerializedCommand;} interface SerializedCommand { type: string; version: number; payload: Record<string, unknown>; metadata: { createdAt: string; userId: string; correlationId: string; };} // Command registry for deserializationclass CommandRegistry { private factories: Map<string, CommandFactory> = new Map(); register<T extends SerializableCommand>( type: string, factory: CommandFactory<T> ): void { this.factories.set(type, factory); } deserialize(serialized: SerializedCommand): SerializableCommand { const factory = this.factories.get(serialized.type); if (!factory) { throw new Error(`Unknown command type: ${serialized.type}`); } // Handle version migration if needed const migratedPayload = factory.migrate?.( serialized.payload, serialized.version ) ?? serialized.payload; return factory.create(migratedPayload, serialized.metadata); }} type CommandFactory<T extends SerializableCommand = SerializableCommand> = { create: ( payload: Record<string, unknown>, metadata: SerializedCommand['metadata'] ) => T; migrate?: ( payload: Record<string, unknown>, fromVersion: number ) => Record<string, unknown>;}; // Example: Transfer money commandclass TransferMoneyCommand implements SerializableCommand { private static readonly TYPE = 'banking:transfer'; private static readonly VERSION = 2; constructor( private sourceAccountId: string, private targetAccountId: string, private amount: number, private currency: string, private metadata: { userId: string; correlationId: string; } ) {} async execute(): Promise<void> { await bankingService.transfer( this.sourceAccountId, this.targetAccountId, this.amount, this.currency ); } serialize(): SerializedCommand { return { type: TransferMoneyCommand.TYPE, version: TransferMoneyCommand.VERSION, payload: { sourceAccountId: this.sourceAccountId, targetAccountId: this.targetAccountId, amount: this.amount, currency: this.currency, }, metadata: { createdAt: new Date().toISOString(), userId: this.metadata.userId, correlationId: this.metadata.correlationId, }, }; } // Factory for the registry static factory: CommandFactory<TransferMoneyCommand> = { create: (payload, metadata) => new TransferMoneyCommand( payload.sourceAccountId as string, payload.targetAccountId as string, payload.amount as number, payload.currency as string, { userId: metadata.userId, correlationId: metadata.correlationId } ), // Handle version migration migrate: (payload, fromVersion) => { if (fromVersion === 1) { // V1 had 'from' and 'to' instead of 'sourceAccountId'/targetAccountId' return { sourceAccountId: payload.from, targetAccountId: payload.to, amount: payload.amount, currency: payload.currency ?? 'USD', // V1 didn't have currency }; } return payload; }, };} // Setup registryconst registry = new CommandRegistry();registry.register('banking:transfer', TransferMoneyCommand.factory); // Serialize for storageconst command = new TransferMoneyCommand( 'ACC-001', 'ACC-002', 100.00, 'USD', { userId: 'user-123', correlationId: 'txn-456' }); const serialized = command.serialize();const json = JSON.stringify(serialized);await persistentQueue.push(json); // Deserialize for executionconst loadedJson = await persistentQueue.pop();const loadedSerialized = JSON.parse(loadedJson);const loadedCommand = registry.deserialize(loadedSerialized);await loadedCommand.execute();Command schemas evolve. When you add, remove, or rename fields, old serialized commands must still deserialize correctly. Use explicit version numbers and migration functions. This is especially important for long-lived queues or event sourcing systems.
One of the most powerful applications of command logging is replay—re-executing a sequence of commands to reproduce bugs, analyze behavior, or rebuild state.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
class CommandReplayer { private registry: CommandRegistry; private logger: CommandLogger; private hooks: ReplayHooks; constructor( registry: CommandRegistry, logger: CommandLogger, hooks: ReplayHooks = {} ) { this.registry = registry; this.logger = logger; this.hooks = hooks; } /** * Replay commands for a specific session to reproduce a bug. */ async replaySession( sessionId: string, options: ReplayOptions = {} ): Promise<ReplayResult> { const logs = await this.logger.query({ sessionId, success: undefined, // Include failures too limit: options.maxCommands, }); // Sort by timestamp ascending logs.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime() ); return this.replayLogs(logs, options); } /** * Replay commands between two timestamps. */ async replayTimeRange( startTime: Date, endTime: Date, options: ReplayOptions = {} ): Promise<ReplayResult> { const logs = await this.logger.query({ startTime, endTime, limit: options.maxCommands, }); logs.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime() ); return this.replayLogs(logs, options); } private async replayLogs( logs: CommandLogEntry[], options: ReplayOptions ): Promise<ReplayResult> { const result: ReplayResult = { totalCommands: logs.length, successfulReplays: 0, failedReplays: 0, skipped: 0, errors: [], }; this.hooks.onReplayStart?.(logs.length); for (let i = 0; i < logs.length; i++) { const log = logs[i]; // Check filter if (options.filter && !options.filter(log)) { result.skipped++; continue; } // Speed control if (options.speedMultiplier && i > 0) { const prevTime = new Date(logs[i - 1].timestamp).getTime(); const currTime = new Date(log.timestamp).getTime(); const realDelay = currTime - prevTime; const scaledDelay = realDelay / options.speedMultiplier; if (scaledDelay > 0) { await this.sleep(scaledDelay); } } try { // Reconstruct command from log const serialized: SerializedCommand = { type: log.commandType, version: 1, // May need to store version in log payload: log.parameters, metadata: { createdAt: log.timestamp.toISOString(), userId: log.userId, correlationId: log.correlationId, }, }; const command = this.registry.deserialize(serialized); this.hooks.beforeCommand?.(log, command); if (!options.dryRun) { await command.execute(); } this.hooks.afterCommand?.(log, command, true); result.successfulReplays++; } catch (error) { const err = error as Error; this.hooks.afterCommand?.(log, undefined, false, err); result.failedReplays++; result.errors.push({ logEntry: log, error: err.message, }); if (options.stopOnError) { break; } } } this.hooks.onReplayEnd?.(result); return result; } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); }} interface ReplayOptions { maxCommands?: number; speedMultiplier?: number; // 10 = 10x speed, 0.5 = half speed dryRun?: boolean; // Log without executing stopOnError?: boolean; filter?: (log: CommandLogEntry) => boolean;} interface ReplayHooks { onReplayStart?: (totalCommands: number) => void; beforeCommand?: (log: CommandLogEntry, command: unknown) => void; afterCommand?: ( log: CommandLogEntry, command: unknown | undefined, success: boolean, error?: Error ) => void; onReplayEnd?: (result: ReplayResult) => void;} interface ReplayResult { totalCommands: number; successfulReplays: number; failedReplays: number; skipped: number; errors: Array<{ logEntry: CommandLogEntry; error: string; }>;}For reliable replay, commands must be deterministic—given the same inputs, they produce the same outputs. Watch out for: random numbers (seed them), current time (use logged timestamps), external API calls (mock them), database state (reset between replays).
The Command Pattern naturally leads to Event Sourcing—an architectural pattern where application state is derived from a sequence of events (commands) rather than directly mutated.
In event sourcing:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
// Event (result of a command)interface DomainEvent { eventId: string; aggregateId: string; eventType: string; timestamp: Date; payload: Record<string, unknown>; version: number;} // Command that produces eventsinterface EventProducingCommand { execute(): DomainEvent[]; validate(): ValidationResult;} // Event-sourced aggregateabstract class EventSourcedAggregate { protected id: string; protected version: number = 0; private uncommittedEvents: DomainEvent[] = []; constructor(id: string) { this.id = id; } /** * Apply an event to update state. * Override in concrete aggregates. */ protected abstract applyEvent(event: DomainEvent): void; /** * Handle a command by validating, executing, and recording events. */ handleCommand(command: EventProducingCommand): void { const validation = command.validate(); if (!validation.valid) { throw new Error(`Invalid command: ${validation.errors.join(', ')}`); } const events = command.execute(); for (const event of events) { // Apply to current state this.applyEvent(event); // Track for persistence this.version++; this.uncommittedEvents.push({ ...event, version: this.version, }); } } /** * Rebuild state from event history. */ loadFromHistory(events: DomainEvent[]): void { for (const event of events) { this.applyEvent(event); this.version = event.version; } } /** * Get events to persist, then clear. */ getUncommittedEvents(): DomainEvent[] { const events = [...this.uncommittedEvents]; this.uncommittedEvents = []; return events; }} // Example: Bank Account Aggregateclass BankAccountAggregate extends EventSourcedAggregate { private balance: number = 0; private status: 'active' | 'closed' = 'active'; protected applyEvent(event: DomainEvent): void { switch (event.eventType) { case 'AccountOpened': this.balance = event.payload.initialBalance as number; this.status = 'active'; break; case 'MoneyDeposited': this.balance += event.payload.amount as number; break; case 'MoneyWithdrawn': this.balance -= event.payload.amount as number; break; case 'AccountClosed': this.status = 'closed'; break; } } getBalance(): number { return this.balance; }} // Deposit command producing an eventclass DepositCommand implements EventProducingCommand { constructor( private accountId: string, private amount: number ) {} validate(): ValidationResult { if (this.amount <= 0) { return { valid: false, errors: ['Amount must be positive'], warnings: [] }; } return { valid: true, errors: [], warnings: [] }; } execute(): DomainEvent[] { return [{ eventId: crypto.randomUUID(), aggregateId: this.accountId, eventType: 'MoneyDeposited', timestamp: new Date(), payload: { amount: this.amount }, version: 0, // Will be set by aggregate }]; }}In CQRS/ES terminology, Commands are intentions ("deposit $100") while Events are facts ("$100 was deposited"). Commands may be rejected; events are immutable history. The Command Pattern provides the structure for commands; Event Sourcing provides the persistence model for events they produce.
We've explored how the Command Pattern extends beyond simple invocation decoupling to enable powerful queuing, logging, and replay capabilities. Let's consolidate the key concepts:
Module Conclusion:
The Command Pattern is far more than a way to decouple invokers from receivers. It's a foundational pattern for building flexible, maintainable, and powerful systems. By treating requests as objects, we gain:
Master this pattern, and you'll find it applicable across domains—from GUIs to distributed systems, from transaction processing to game engines.
You now have a comprehensive understanding of the Command Pattern. From the fundamental problem of request coupling, through the complete pattern structure, undo/redo implementation, to advanced applications in queuing and logging—you're equipped to apply this pattern in production systems.