Loading learning content...
A domain event is born when an aggregate performs a significant operation. But the event's journey doesn't end at creation—it must travel through the system, reaching all interested parties reliably and consistently.\n\nThis lifecycle involves critical decisions:\n- When should events be dispatched—during the transaction or after?\n- How should events flow—synchronously or asynchronously?\n- What happens when handlers fail—should the original operation fail too?\n\nThe answers to these questions profoundly impact system reliability, consistency, and complexity. This page covers the patterns and trade-offs for managing the complete domain event lifecycle.
By the end of this page, you will understand how aggregates collect and raise events, the differences between synchronous and asynchronous dispatching, the critical timing of event dispatch relative to persistence, handler design patterns, and error handling strategies.
The first step in the event lifecycle is collection. As an aggregate performs operations that trigger domain events, it must collect these events for later dispatch.\n\nKey Principle: Events are collected during the operation but not dispatched immediately. This separation is crucial because:\n\n1. The operation might fail validation later\n2. The aggregate might not be saved (transaction could fail)\n3. Multiple events from a single operation should be dispatched together\n4. Dispatching during construction creates complex initialization order issues
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
/** * Base class that provides event collection capabilities to aggregates. * * Events are collected during domain operations and retrieved later * for dispatching after the aggregate is successfully persisted. */abstract class AggregateRoot<TId = string> { private readonly _domainEvents: DomainEvent[] = []; /** * Protected method for aggregate operations to record events. * Events are added but not dispatched immediately. */ protected addDomainEvent(event: DomainEvent): void { this._domainEvents.push(event); // Optional: Log or trace event creation for debugging this.onEventAdded(event); } /** * Hook for subclasses to react to event additions. * Useful for logging, metrics, or debugging. */ protected onEventAdded(event: DomainEvent): void { // Default: no-op } /** * Returns all collected events for dispatching. * Returns a copy to prevent external modification. */ getDomainEvents(): ReadonlyArray<DomainEvent> { return [...this._domainEvents]; } /** * Clears collected events after successful dispatch. * Called by the infrastructure after persistence and dispatch. */ clearDomainEvents(): void { this._domainEvents.length = 0; } /** * Check if aggregate has pending events. */ hasPendingEvents(): boolean { return this._domainEvents.length > 0; }} /** * Order aggregate demonstrating event collection pattern. */class Order extends AggregateRoot<OrderId> { private _id: OrderId; private _status: OrderStatus; private _items: OrderItem[]; /** * Place a new order - collects OrderPlaced event. */ static place( id: OrderId, customerId: CustomerId, items: OrderItem[], context: OrderContext, ): Order { // Validate business rules first if (items.length === 0) { throw new EmptyOrderError('Cannot place an order with no items'); } const order = new Order(id, customerId, items); order._status = OrderStatus.PLACED; // Event is collected, not dispatched order.addDomainEvent(new OrderPlaced( generateEventId(), new Date(), id.value, customerId.value, items.map(i => i.toSnapshot()), order.calculateTotal(), context.shippingAddress, context.customerEmail, )); return order; } /** * Ship the order - validates and collects OrderShipped event. */ ship(shipment: ShipmentDetails): void { // Validate state transition if (this._status !== OrderStatus.PAID) { throw new InvalidOrderTransitionError( `Cannot ship order in status ${this._status}` ); } this._status = OrderStatus.SHIPPED; // Event collected after successful state change this.addDomainEvent(new OrderShipped( generateEventId(), new Date(), this._id.value, shipment.shipmentId, shipment.trackingNumber, shipment.carrier, shipment.estimatedDelivery, )); } /** * Cancel the order - may collect OrderCancelled event. */ cancel(reason: CancellationReason, actor: ActorId): void { if (!this.canBeCancelled()) { throw new OrderCannotBeCancelledError(this._id); } const previousStatus = this._status; this._status = OrderStatus.CANCELLED; this.addDomainEvent(new OrderCancelled( generateEventId(), new Date(), this._id.value, reason, actor.value, previousStatus, // Capture what state we cancelled from )); }}Notice that events are added only after the operation successfully completes. If validation throws an exception, no event is recorded. This ensures events always represent valid state transitions.
The timing of event dispatch relative to aggregate persistence is one of the most important architectural decisions in event-driven systems. There are three main approaches, each with significant trade-offs.
| Strategy | When | Pros | Cons |
|---|---|---|---|
| Before Commit | Within transaction, before committing | Handlers can participate in transaction; Strong consistency | Handler failures can rollback the entire operation; Performance coupling |
| After Commit | Immediately after transaction commits | Handlers can't fail the operation; Aggregate state is guaranteed saved | If dispatch fails, events may be lost; At-least-once delivery needed |
| Outbox Pattern | Events stored in DB; background process dispatches | Guaranteed delivery; Transaction safety; Exactly-once semantics possible | Additional complexity; Latency for eventual consumers |
Recommendation: After Commit or Outbox Pattern\n\nDispatching before commit creates dangerous coupling—a misbehaving handler can prevent valid domain operations from completing. Most systems should dispatch after commit, with the Outbox pattern for reliability in distributed systems.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
/** * Pattern 1: Dispatch AFTER Commit * * Events are dispatched only after the transaction successfully commits. * This ensures aggregate state is persisted before handlers react. */class UnitOfWork { private aggregates: AggregateRoot[] = []; registerAggregate(aggregate: AggregateRoot): void { this.aggregates.push(aggregate); } async commit(eventDispatcher: EventDispatcher): Promise<void> { // Begin transaction const transaction = await this.beginTransaction(); try { // Persist all aggregates for (const aggregate of this.aggregates) { await this.repository.save(aggregate, transaction); } // Commit transaction FIRST await transaction.commit(); // THEN dispatch events (after commit succeeded) for (const aggregate of this.aggregates) { const events = aggregate.getDomainEvents(); for (const event of events) { await eventDispatcher.dispatch(event); } aggregate.clearDomainEvents(); } } catch (error) { await transaction.rollback(); throw error; } }} /** * Pattern 2: Outbox Pattern * * Events are stored in an "outbox" table within the same transaction * as the aggregate. A background process reads the outbox and * dispatches events, marking them as processed. */class OutboxUnitOfWork { async commit(): Promise<void> { const transaction = await this.beginTransaction(); try { // Persist aggregates for (const aggregate of this.aggregates) { await this.repository.save(aggregate, transaction); // Store events in outbox table (same transaction!) const events = aggregate.getDomainEvents(); for (const event of events) { await this.outboxRepository.insert({ id: generateId(), eventType: event.eventType, payload: JSON.stringify(event), createdAt: new Date(), processedAt: null, // Not yet dispatched }, transaction); } aggregate.clearDomainEvents(); } // Single atomic commit - both aggregate and outbox entries await transaction.commit(); } catch (error) { await transaction.rollback(); throw error; } }} /** * Background processor for Outbox Pattern. * * Continuously polls the outbox table and dispatches unprocessed events. */class OutboxProcessor { async processOutbox(): Promise<void> { while (true) { const pendingEvents = await this.outboxRepository.findUnprocessed( { limit: 100 } ); for (const outboxEntry of pendingEvents) { try { const event = JSON.parse(outboxEntry.payload); await this.eventDispatcher.dispatch(event); // Mark as processed await this.outboxRepository.markProcessed(outboxEntry.id); } catch (error) { // Log error, will retry on next poll this.logger.error( `Failed to process outbox entry ${outboxEntry.id}`, error ); // Optionally: increment retry count, dead-letter after max retries await this.outboxRepository.incrementRetryCount(outboxEntry.id); } } // Wait before next poll await sleep(100); } }}If you dispatch after commit and the application crashes before dispatching, events are lost. The Outbox pattern solves this by storing events in the database atomically with the aggregate—if the aggregate is saved, so are its events.
After deciding when to dispatch events, you must decide how. The choice between synchronous and asynchronous dispatch has profound implications for system behavior and complexity.
Choosing the Right Approach:\n\n- Synchronous is appropriate when:\n - Handlers must complete before returning to caller\n - You need transactional consistency across handlers\n - The system is monolithic or handlers are in-process\n - Debugging simplicity is paramount\n\n- Asynchronous is appropriate when:\n - Handlers can be eventually consistent\n - You need scalability and resilience\n - Handlers are in different services/processes\n - Handler performance shouldn't affect response time
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
/** * Synchronous Event Dispatcher * * Events are dispatched in-process, handlers execute sequentially. * Caller waits for all handlers to complete. */class SynchronousEventDispatcher { private handlers = new Map<string, EventHandler<any>[]>(); subscribe<T extends DomainEvent>( eventType: string, handler: EventHandler<T>, ): void { const existing = this.handlers.get(eventType) || []; this.handlers.set(eventType, [...existing, handler]); } async dispatch(event: DomainEvent): Promise<void> { const handlers = this.handlers.get(event.eventType) || []; // Execute each handler sequentially, in-process for (const handler of handlers) { try { await handler.handle(event); } catch (error) { // Option 1: Fail fast (all or nothing) throw new EventHandlerError(event, handler, error); // Option 2: Log and continue (best effort) // this.logger.error('Handler failed', { event, error }); } } }} /** * Asynchronous Event Dispatcher * * Events are published to a message queue. Handlers are decoupled * and may run in different processes or services. */class AsynchronousEventDispatcher { constructor(private messageQueue: MessageQueue) {} async dispatch(event: DomainEvent): Promise<void> { // Serialize event const message: QueueMessage = { id: generateId(), type: event.eventType, payload: JSON.stringify(event), metadata: { correlationId: event.correlationId, publishedAt: new Date().toISOString(), }, }; // Publish to queue - returns immediately await this.messageQueue.publish( this.getTopicForEvent(event), message, ); // Caller continues without waiting for handlers } private getTopicForEvent(event: DomainEvent): string { // Route events to appropriate topics return `domain-events.${event.aggregateType}.${event.eventType}`; }} /** * Hybrid approach: Some events sync, some async * * Often, a system needs both. Critical handlers run synchronously, * while optional handlers (notifications, analytics) run async. */class HybridEventDispatcher { constructor( private syncDispatcher: SynchronousEventDispatcher, private asyncDispatcher: AsynchronousEventDispatcher, private eventConfig: EventRoutingConfig, ) {} async dispatch(event: DomainEvent): Promise<void> { const config = this.eventConfig.get(event.eventType); // Synchronous handlers (must complete) if (config.syncHandlers.length > 0) { await this.syncDispatcher.dispatch(event); } // Asynchronous handlers (fire and forget) if (config.asyncEnabled) { await this.asyncDispatcher.dispatch(event); } }} // Usage: Configure routing per event typeconst eventConfig: EventRoutingConfig = { 'OrderPlaced': { syncHandlers: ['ReserveInventoryHandler'], // Must succeed asyncEnabled: true, // Also send to queue for analytics, email }, 'OrderShipped': { syncHandlers: [], // No sync handlers asyncEnabled: true, // All handlers are async },};Event handlers are the consumers of domain events. Well-designed handlers are focused, idempotent, and resilient. Let's explore the key patterns.
SendShippingNotificationHandler, UpdateInventoryOnOrderPlaced.123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
/** * Interface for event handlers. * Generic type ensures type safety for the event payload. */interface EventHandler<T extends DomainEvent> { /** The event types this handler processes */ readonly eventTypes: string[]; /** Handle the event */ handle(event: T): Promise<void>;} /** * Handler for sending shipping notifications. * * Single responsibility: Only sends notification emails. */class SendShippingNotificationHandler implements EventHandler<OrderShipped> { readonly eventTypes = ['OrderShipped']; constructor( private emailService: EmailService, private customerRepository: CustomerRepository, private processedEvents: ProcessedEventStore, ) {} async handle(event: OrderShipped): Promise<void> { // Idempotency check - don't process same event twice if (await this.processedEvents.hasBeenProcessed(event.eventId)) { this.logger.info('Event already processed, skipping', { eventId: event.eventId, }); return; } try { // Fetch additional data if needed const customer = await this.customerRepository.findById( event.customerId ); // Perform the handler's single responsibility await this.emailService.send({ to: customer.email, template: 'order-shipped', data: { orderNumber: event.orderNumber, trackingNumber: event.trackingNumber, carrier: event.carrier, estimatedDelivery: event.estimatedDelivery, }, }); // Mark as processed for idempotency await this.processedEvents.markProcessed(event.eventId); } catch (error) { // Log with context for debugging this.logger.error('Failed to send shipping notification', { eventId: event.eventId, orderId: event.orderId, error, }); // Re-throw for retry mechanism to handle throw error; } }} /** * Handler for updating inventory on order placement. * * Demonstrates command dispatch from handler. */class ReserveInventoryHandler implements EventHandler<OrderPlaced> { readonly eventTypes = ['OrderPlaced']; constructor( private commandBus: CommandBus, private idempotencyStore: IdempotencyStore, ) {} async handle(event: OrderPlaced): Promise<void> { // Idempotency check const idempotencyKey = `reserve-inventory:${event.eventId}`; if (await this.idempotencyStore.exists(idempotencyKey)) { return; } // Handler dispatches a command to another aggregate/service // This maintains separation - handler doesn't directly modify inventory for (const item of event.items) { await this.commandBus.dispatch(new ReserveInventoryCommand( item.productId, item.quantity, event.orderId, event.correlationId, // Propagate correlation )); } await this.idempotencyStore.set(idempotencyKey); }} /** * Handler for updating analytics/projections. * * Async-only handler - doesn't need to be in sync path. */class UpdateOrderAnalyticsHandler implements EventHandler<DomainEvent> { readonly eventTypes = [ 'OrderPlaced', 'OrderShipped', 'OrderCancelled', 'OrderDelivered', ]; constructor(private analyticsService: AnalyticsService) {} async handle(event: DomainEvent): Promise<void> { // Route to appropriate analytics update switch (event.eventType) { case 'OrderPlaced': await this.analyticsService.recordOrderPlaced(event); break; case 'OrderShipped': await this.analyticsService.recordOrderShipped(event); break; case 'OrderCancelled': await this.analyticsService.recordOrderCancelled(event); break; case 'OrderDelivered': await this.analyticsService.recordOrderDelivered(event); break; } }}A good rule: one handler per side effect. If OrderPlaced triggers email, inventory reservation, and analytics, create three handlers. Each can fail and retry independently. Each can be tested in isolation. Each can be scaled separately.
In distributed systems, the same event may be delivered multiple times. Network failures, retries, and partial failures all lead to duplicate delivery. Idempotent handlers ensure that processing an event multiple times has the same effect as processing it once.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
/** * Idempotency Pattern 1: Event ID Deduplication Store * * Simple and effective: store processed event IDs in a database or cache. */class ProcessedEventStore { constructor(private redis: RedisClient) {} async hasBeenProcessed(eventId: string): Promise<boolean> { const exists = await this.redis.exists( `processed-event:${eventId}` ); return exists === 1; } async markProcessed(eventId: string, ttlDays: number = 7): Promise<void> { await this.redis.setex( `processed-event:${eventId}`, ttlDays * 24 * 60 * 60, // TTL in seconds Date.now().toString(), ); }} // Usage in handler:async handle(event: OrderShipped): Promise<void> { if (await this.processedEvents.hasBeenProcessed(event.eventId)) { return; // Already processed, exit silently } // ... process event ... await this.processedEvents.markProcessed(event.eventId);} /** * Idempotency Pattern 2: Natural Idempotency * * Design operations so they're inherently idempotent. */class WalletBalanceProjection { // ❌ NOT IDEMPOTENT: Incrementing async handlePaymentReceivedBad(event: PaymentReceived): Promise<void> { await this.db.query( 'UPDATE wallets SET balance = balance + $1 WHERE id = $2', [event.amount, event.walletId] ); // If event replays, balance increments again! } // ✅ IDEMPOTENT: Track each payment explicitly async handlePaymentReceivedGood(event: PaymentReceived): Promise<void> { // Insert payment record with unique ID await this.db.query( `INSERT INTO wallet_transactions (id, wallet_id, amount, event_id) VALUES ($1, $2, $3, $4) ON CONFLICT (event_id) DO NOTHING`, // Skip if exists [generateId(), event.walletId, event.amount, event.eventId] ); // Balance is calculated from transactions, not stored directly // SELECT SUM(amount) FROM wallet_transactions WHERE wallet_id = ? }} /** * Idempotency Pattern 3: Conditional Updates * * Use database features to ensure updates are safe. */class InventoryProjection { async handleOrderPlaced(event: OrderPlaced): Promise<void> { for (const item of event.items) { // Only update if this reservation hasn't been processed const result = await this.db.query( `UPDATE inventory SET reserved_quantity = reserved_quantity + $1, last_reservation_event = $2 WHERE product_id = $3 AND (last_reservation_event IS NULL OR last_reservation_event < $2)`, [item.quantity, event.eventId, item.productId] ); if (result.rowCount === 0) { // Either product doesn't exist or already processed this.logger.debug('Reservation skipped (already processed)', { productId: item.productId, eventId: event.eventId, }); } } }} /** * Idempotency wrapper for any handler. * * Decorator pattern to add idempotency to existing handlers. */class IdempotentHandler<T extends DomainEvent> implements EventHandler<T> { constructor( private inner: EventHandler<T>, private idempotencyStore: ProcessedEventStore, ) {} get eventTypes(): string[] { return this.inner.eventTypes; } async handle(event: T): Promise<void> { const key = this.generateIdempotencyKey(event); if (await this.idempotencyStore.hasBeenProcessed(key)) { return; // Already processed } await this.inner.handle(event); await this.idempotencyStore.markProcessed(key); } private generateIdempotencyKey(event: T): string { // Key includes handler type so different handlers process same event return `${this.inner.constructor.name}:${event.eventId}`; }}Deduplication records can't be kept forever—storage would grow unbounded. Set a TTL longer than your maximum retry window. If you retry for up to 24 hours, keep dedup records for 7 days. Events older than your TTL won't be duplicated in practice.
Handlers fail. Networks timeout. Databases go down. External services become unavailable. Robust error handling ensures the system recovers gracefully from these inevitable failures.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
/** * Retry policy with exponential backoff */interface RetryPolicy { maxRetries: number; baseDelayMs: number; maxDelayMs: number; jitterMs: number;} const defaultRetryPolicy: RetryPolicy = { maxRetries: 5, baseDelayMs: 100, maxDelayMs: 30000, jitterMs: 100,}; class RetryingEventHandler<T extends DomainEvent> implements EventHandler<T> { constructor( private inner: EventHandler<T>, private policy: RetryPolicy = defaultRetryPolicy, private deadLetterQueue: DeadLetterQueue, ) {} get eventTypes(): string[] { return this.inner.eventTypes; } async handle(event: T): Promise<void> { let lastError: Error | null = null; for (let attempt = 0; attempt <= this.policy.maxRetries; attempt++) { try { await this.inner.handle(event); return; // Success! } catch (error) { lastError = error as Error; if (this.isRetryable(error)) { const delay = this.calculateDelay(attempt); await sleep(delay); } else { // Non-retryable error, fail immediately break; } } } // All retries exhausted, send to dead letter queue await this.deadLetterQueue.send({ event, error: lastError?.message, handler: this.inner.constructor.name, timestamp: new Date(), attempts: this.policy.maxRetries + 1, }); } private isRetryable(error: unknown): boolean { // Network errors, timeouts are retryable // Validation errors, business logic errors are not if (error instanceof NetworkError) return true; if (error instanceof TimeoutError) return true; if (error instanceof TransientDatabaseError) return true; if (error instanceof ValidationError) return false; if (error instanceof BusinessRuleViolation) return false; return false; } private calculateDelay(attempt: number): number { const exponentialDelay = this.policy.baseDelayMs * Math.pow(2, attempt); const jitter = Math.random() * this.policy.jitterMs; return Math.min(exponentialDelay + jitter, this.policy.maxDelayMs); }} /** * Circuit Breaker Pattern * * Prevents overwhelming a failing service with requests. */class CircuitBreakerHandler<T extends DomainEvent> implements EventHandler<T> { private failureCount = 0; private lastFailureTime: Date | null = null; private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED'; constructor( private inner: EventHandler<T>, private failureThreshold: number = 5, private resetTimeoutMs: number = 30000, ) {} get eventTypes(): string[] { return this.inner.eventTypes; } async handle(event: T): Promise<void> { // Check if circuit should transition from OPEN to HALF_OPEN if (this.state === 'OPEN') { const timeSinceFailure = Date.now() - (this.lastFailureTime?.getTime() || 0); if (timeSinceFailure > this.resetTimeoutMs) { this.state = 'HALF_OPEN'; } else { throw new CircuitBreakerOpenError( `Circuit breaker open for ${this.inner.constructor.name}` ); } } try { await this.inner.handle(event); // Success - reset circuit breaker this.failureCount = 0; this.state = 'CLOSED'; } catch (error) { this.failureCount++; this.lastFailureTime = new Date(); if (this.failureCount >= this.failureThreshold) { this.state = 'OPEN'; } throw error; } }} /** * Graceful degradation for non-critical handlers * * Log error but don't fail the operation. */class BestEffortHandler<T extends DomainEvent> implements EventHandler<T> { constructor( private inner: EventHandler<T>, private logger: Logger, private metrics: MetricsService, ) {} get eventTypes(): string[] { return this.inner.eventTypes; } async handle(event: T): Promise<void> { try { await this.inner.handle(event); } catch (error) { // Log but don't throw - allow other handlers to continue this.logger.error('Best-effort handler failed', { handler: this.inner.constructor.name, eventId: event.eventId, eventType: event.eventType, error, }); this.metrics.increment('event_handler_failures', { handler: this.inner.constructor.name, eventType: event.eventType, }); } }}Let's trace the complete lifecycle of a domain event from creation to handling, showing how all the pieces fit together.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
/** * Complete Domain Event Flow Example * * From command handling through event dispatch to handler execution. */ // 1. COMMAND arrives at application layerclass PlaceOrderCommand { constructor( readonly customerId: string, readonly items: OrderItemDto[], readonly shippingAddress: AddressDto, ) {}} // 2. COMMAND HANDLER orchestrates the operationclass PlaceOrderCommandHandler { constructor( private orderRepository: OrderRepository, private customerRepository: CustomerRepository, private unitOfWork: UnitOfWork, ) {} async handle(command: PlaceOrderCommand): Promise<OrderId> { // Load necessary aggregates const customer = await this.customerRepository.findById( command.customerId ); if (!customer) { throw new CustomerNotFoundError(command.customerId); } // Perform domain operation - this collects events const order = Order.place( OrderId.generate(), customer.id, command.items.map(this.toOrderItem), { shippingAddress: new Address(command.shippingAddress), customerEmail: customer.email, }, ); // Register for persistence this.unitOfWork.registerAggregate(order); // Commit - persists aggregate and dispatches events await this.unitOfWork.commit(); return order.id; }} // 3. UNIT OF WORK persists and dispatchesclass UnitOfWork { constructor( private repository: Repository, private eventDispatcher: EventDispatcher, private outboxRepository: OutboxRepository, ) {} async commit(): Promise<void> { const transaction = await this.db.beginTransaction(); try { // Persist all registered aggregates for (const aggregate of this.aggregates) { await this.repository.save(aggregate, transaction); // Store events in outbox (same transaction) for (const event of aggregate.getDomainEvents()) { await this.outboxRepository.insert(event, transaction); } } await transaction.commit(); // Clear events from aggregates for (const aggregate of this.aggregates) { aggregate.clearDomainEvents(); } } catch (error) { await transaction.rollback(); throw error; } }} // 4. OUTBOX PROCESSOR dispatches events asynchronouslyclass OutboxProcessor { async processOutbox(): Promise<void> { const pendingEvents = await this.outboxRepository.findUnprocessed(); for (const entry of pendingEvents) { const event = this.deserialize(entry.payload); // Dispatch to message queue await this.messageQueue.publish(event); // Mark as processed await this.outboxRepository.markProcessed(entry.id); } }} // 5. MESSAGE QUEUE delivers to HANDLERSclass EventConsumer { constructor( private messageQueue: MessageQueue, private handlers: EventHandler<any>[], ) {} async start(): Promise<void> { await this.messageQueue.subscribe( 'domain-events.*', async (message) => { const event = this.deserialize(message.payload); // Find handlers for this event type const applicableHandlers = this.handlers.filter( h => h.eventTypes.includes(event.eventType) ); // Execute each handler for (const handler of applicableHandlers) { try { await handler.handle(event); } catch (error) { // Error handling, DLQ, retries... await this.handleError(event, handler, error); } } // Acknowledge message await message.ack(); } ); }} // 6. HANDLER executes side effectclass SendShippingNotificationHandler implements EventHandler<OrderShipped> { readonly eventTypes = ['OrderShipped']; async handle(event: OrderShipped): Promise<void> { // Idempotency check if (await this.idempotencyStore.exists(event.eventId)) { return; } // Perform side effect await this.emailService.send({ to: event.customerEmail, template: 'order-shipped', data: { trackingNumber: event.trackingNumber, carrier: event.carrier, }, }); // Mark as processed await this.idempotencyStore.set(event.eventId); }}Command → Aggregate (raises event) → Repository (persists) → Outbox (stores event) → Processor (dispatches) → Queue (delivers) → Handler (reacts). This flow ensures reliable, decoupled event processing.
Let's consolidate the key principles for raising and handling domain events:
What's next:\n\nWe've covered events within a bounded context. The final page explores events for integration—how domain events become the backbone of communication between bounded contexts and external systems.
You now understand the complete lifecycle of domain events—from collection in aggregates through dispatch to handler execution. You can implement reliable event processing with proper timing, idempotency, and error handling.