Loading learning content...
In the previous page, we explored publishers and subscribers as independent actors who don't know about each other. But if they don't know each other, how do events actually flow from publishers to subscribers?
The answer is the Event Bus (also called an Event Dispatcher, Message Broker, or Event Channel). This is the central infrastructure component that:
The event bus is the crucial mediator that enables the decoupling we prize in event-driven systems. Without it, publishers would need to know about subscribers—destroying the pattern's benefits.
By the end of this page, you will understand event bus architecture deeply: its responsibilities, implementation patterns, delivery semantics, error handling strategies, and how to choose between in-process and distributed implementations.
An event bus has several core responsibilities that, when implemented correctly, create a robust foundation for event-driven systems.
| Responsibility | Description | Why It Matters |
|---|---|---|
| Subscription Management | Register subscribers and track which events each wants | Enables dynamic addition/removal of subscribers |
| Event Routing | Direct published events to correct subscribers | Ensures events reach interested parties only |
| Delivery Execution | Invoke subscriber handlers with events | Controls how and when handlers execute |
| Error Isolation | Prevent one subscriber's failure from affecting others | Maintains system resilience |
| Lifecycle Management | Handle startup, shutdown, and cleanup | Ensures no events lost during transitions |
At its core, an event bus exposes a very simple interface. This simplicity is intentional—it hides significant complexity behind a clean API.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
/** * Event Bus Interface * * The contract for any event bus implementation. * Keeps the API simple while hiding complexity. */interface IEventBus { /** * Publishes an event to all interested subscribers. * * @param event The event to publish * @returns Promise that resolves when publishing completes * (what "completes" means depends on delivery semantics) */ publish<TEvent>(event: TEvent): Promise<void>; /** * Registers a subscriber to receive events. * * @param subscriber The subscriber to register * @returns A function to unsubscribe (for cleanup) */ subscribe(subscriber: ISubscriber<unknown>): () => void; /** * Unregisters a subscriber. * * @param subscriber The subscriber to remove */ unsubscribe(subscriber: ISubscriber<unknown>): void; /** * Gracefully shuts down the event bus. * Waits for in-flight events to complete. */ shutdown(): Promise<void>;} /** * Extended interface for observable event buses */interface IObservableEventBus extends IEventBus { /** * Returns current subscriber count for an event type. * Useful for monitoring and debugging. */ subscriberCount(eventType: string): number; /** * Returns all registered event types. */ registeredEventTypes(): string[]; /** * Checks if any subscriber is registered for an event type. */ hasSubscribers(eventType: string): boolean;}The simplest form of event bus operates within a single process, routing events between components in the same application. This is appropriate for:
Let's build a production-quality in-process event bus step by step.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
/** * In-Process Event Bus Implementation * * A production-quality event bus for single-process applications. * Features: * - Type-safe event routing * - Parallel subscriber execution * - Error isolation between subscribers * - Priority-based ordering * - Graceful shutdown */class InProcessEventBus implements IObservableEventBus { // Maps event types to their subscribers private readonly subscriptions: Map<string, Set<ISubscriber<unknown>>> = new Map(); // Pending operations for graceful shutdown private readonly pendingOperations: Set<Promise<void>> = new Set(); // Shutdown state private isShuttingDown = false; // Logger for debugging and monitoring private readonly logger: ILogger; constructor(logger: ILogger = console) { this.logger = logger; } /** * Registers a subscriber for its declared event types. */ subscribe(subscriber: ISubscriber<unknown>): () => void { const eventTypes = subscriber.subscribedEventTypes(); for (const eventType of eventTypes) { if (!this.subscriptions.has(eventType)) { this.subscriptions.set(eventType, new Set()); } this.subscriptions.get(eventType)!.add(subscriber); this.logger.debug?.(`Subscriber registered for ${eventType}`); } // Return unsubscribe function for cleanup return () => this.unsubscribe(subscriber); } /** * Removes a subscriber from all its event types. */ unsubscribe(subscriber: ISubscriber<unknown>): void { const eventTypes = subscriber.subscribedEventTypes(); for (const eventType of eventTypes) { this.subscriptions.get(eventType)?.delete(subscriber); // Clean up empty sets if (this.subscriptions.get(eventType)?.size === 0) { this.subscriptions.delete(eventType); } } } /** * Publishes an event to all interested subscribers. * Executes subscribers in parallel for performance. */ async publish<TEvent extends { eventType: string }>(event: TEvent): Promise<void> { if (this.isShuttingDown) { throw new Error('Event bus is shutting down, cannot publish'); } const eventType = event.eventType; const subscribers = this.subscriptions.get(eventType); if (!subscribers || subscribers.size === 0) { this.logger.debug?.(`No subscribers for event type: ${eventType}`); return; } // Sort by priority (lower = higher priority) const sortedSubscribers = [...subscribers].sort( (a, b) => (a.priority || 100) - (b.priority || 100) ); // Execute all subscribers in parallel const operation = this.executeSubscribers(sortedSubscribers, event); this.pendingOperations.add(operation); try { await operation; } finally { this.pendingOperations.delete(operation); } } /** * Executes subscribers with error isolation. * One subscriber's failure doesn't affect others. */ private async executeSubscribers<TEvent>( subscribers: ISubscriber<TEvent>[], event: TEvent ): Promise<void> { const results = await Promise.allSettled( subscribers.map(subscriber => this.executeWithErrorHandling(subscriber, event) ) ); // Log any failures for (let i = 0; i < results.length; i++) { const result = results[i]; if (result.status === 'rejected') { this.logger.error( `Subscriber ${subscribers[i].constructor.name} failed:`, result.reason ); } } } /** * Wraps subscriber execution with error handling. */ private async executeWithErrorHandling<TEvent>( subscriber: ISubscriber<TEvent>, event: TEvent ): Promise<void> { try { await subscriber.handle(event); } catch (error) { // Error is caught and isolated - other subscribers continue // Re-throw to be caught by Promise.allSettled throw error; } } /** * Graceful shutdown: wait for pending operations. */ async shutdown(): Promise<void> { this.isShuttingDown = true; if (this.pendingOperations.size > 0) { this.logger.info(`Waiting for ${this.pendingOperations.size} pending operations...`); await Promise.all(this.pendingOperations); } this.subscriptions.clear(); this.logger.info('Event bus shut down successfully'); } // Observable methods subscriberCount(eventType: string): number { return this.subscriptions.get(eventType)?.size || 0; } registeredEventTypes(): string[] { return [...this.subscriptions.keys()]; } hasSubscribers(eventType: string): boolean { return this.subscriberCount(eventType) > 0; }}Using Promise.allSettled (instead of Promise.all) is crucial. It runs all subscribers to completion even if some throw errors. Promise.all would fail fast on the first error, leaving other subscribers unexecuted.
How an event bus delivers events to subscribers significantly impacts system behavior. Understanding delivery semantics is crucial for building reliable systems.
The first major decision is whether publish() waits for all subscribers to complete.
publish() waits for all handlerspublish() returns immediately123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
/** * Synchronous Event Bus * * publish() awaits all subscriber completions. * Publisher blocks until all reactions finish. */class SynchronousEventBus implements IEventBus { async publish<TEvent>(event: TEvent): Promise<void> { const subscribers = this.getSubscribers(event); // Publisher waits for ALL subscribers to complete // This means publish() can take a long time await Promise.all( subscribers.map(s => s.handle(event)) ); }} // Usage: Publisher blocks until all handlers completeasync function placeOrder(command: PlaceOrderCommand): Promise<Order> { const order = await createOrder(command); // This line blocks until: // - Email is sent // - Inventory is reserved // - Analytics are tracked // - All other subscribers complete await eventBus.publish(orderPlacedEvent); return order; // Only returns after all reactions} /** * Asynchronous Event Bus * * publish() returns immediately after queuing. * Handlers execute in background. */class AsynchronousEventBus implements IEventBus { private readonly queue: AsyncQueue<{event: unknown, subscribers: ISubscriber<unknown>[]}>; constructor() { this.queue = new AsyncQueue(); this.startBackgroundProcessor(); } async publish<TEvent>(event: TEvent): Promise<void> { const subscribers = this.getSubscribers(event); // Queue the event and return immediately // Publisher is not blocked await this.queue.enqueue({ event, subscribers }); // Returns here without waiting for handlers } private async startBackgroundProcessor(): Promise<void> { while (true) { const { event, subscribers } = await this.queue.dequeue(); // Process in background - not blocking any publisher await Promise.allSettled( subscribers.map(s => s.handle(event)) ); } }} // Usage: Publisher returns immediatelyasync function placeOrder(command: PlaceOrderCommand): Promise<Order> { const order = await createOrder(command); // This returns IMMEDIATELY // Email, inventory, analytics happen in background await eventBus.publish(orderPlacedEvent); return order; // Returns before reactions complete} /** * Hybrid Event Bus * * Supports both modes based on configuration or event properties. * Best of both worlds for mixed requirements. */class HybridEventBus implements IEventBus { async publish<TEvent extends { async?: boolean }>(event: TEvent): Promise<void> { const subscribers = this.getSubscribers(event); if (event.async) { // Fire-and-forget for async events setImmediate(() => this.executeSubscribers(subscribers, event)); } else { // Wait for completion for sync events await this.executeSubscribers(subscribers, event); } } private async executeSubscribers<TEvent>( subscribers: ISubscriber<TEvent>[], event: TEvent ): Promise<void> { await Promise.allSettled( subscribers.map(s => s.handle(event)) ); }}In distributed systems, event buses must declare their delivery guarantees. This affects how subscribers are designed.
| Guarantee | Behavior | Use Cases | Subscriber Requirements |
|---|---|---|---|
| At-Most-Once | Event delivered 0 or 1 time. No retries on failure. | Logging, metrics, non-critical notifications | Accept potential data loss |
| At-Least-Once | Event delivered 1 or more times. Retries until acknowledged. | Most business processes, payments, orders | Must be idempotent |
| Exactly-Once | Event delivered exactly 1 time. Complex to implement. | Financial transactions requiring absolute precision | Relies on coordination (2PC, outbox pattern) |
Most production systems use at-least-once semantics because it balances reliability with implementation complexity. This means you should ALWAYS design subscribers to be idempotent—assume every event might arrive multiple times.
Production event buses often include additional features beyond basic pub-sub. These features improve observability, reliability, and flexibility.
Allow subscribers to filter events beyond just type matching:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
/** * Event bus with content-based filtering * * Subscribers can specify predicates beyond event type. */interface IFilterableSubscriber<TEvent> extends ISubscriber<TEvent> { /** * Optional filter predicate. * Returns true if this subscriber should handle the event. */ filter?(event: TEvent): boolean;} class FilterableEventBus implements IEventBus { async publish<TEvent>(event: TEvent): Promise<void> { const subscribers = this.getSubscribers(event); // Filter subscribers based on their predicates const eligibleSubscribers = subscribers.filter(sub => { const filterableSub = sub as IFilterableSubscriber<TEvent>; if (filterableSub.filter) { return filterableSub.filter(event); } return true; // No filter = receive all }); await this.executeSubscribers(eligibleSubscribers, event); }} // Example: Subscriber that only handles high-value ordersclass HighValueOrderSubscriber implements IFilterableSubscriber<OrderPlacedEvent> { subscribedEventTypes(): string[] { return ['OrderPlaced']; } // Only handle orders over $1000 filter(event: OrderPlacedEvent): boolean { return event.payload.totalAmount > 1000; } async handle(event: OrderPlacedEvent): Promise<void> { // This only runs for high-value orders await this.notifyVIPTeam(event); await this.schedulePriorityFulfillment(event); }} // Example: Subscriber filtered by customer segmentclass EnterpriseCustomerSubscriber implements IFilterableSubscriber<OrderPlacedEvent> { subscribedEventTypes(): string[] { return ['OrderPlaced']; } filter(event: OrderPlacedEvent): boolean { return event.payload.customerSegment === 'enterprise'; } async handle(event: OrderPlacedEvent): Promise<void> { await this.assignAccountManager(event); }}Middleware allows cross-cutting concerns like logging, metrics, and tracing to be applied uniformly:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
/** * Event Middleware: Cross-cutting concerns for all events */type EventMiddleware = ( event: unknown, next: () => Promise<void>) => Promise<void>; class MiddlewareEventBus implements IEventBus { private readonly middlewares: EventMiddleware[] = []; use(middleware: EventMiddleware): void { this.middlewares.push(middleware); } async publish<TEvent>(event: TEvent): Promise<void> { // Build middleware chain const executeSubscribers = async () => { const subscribers = this.getSubscribers(event); await Promise.allSettled( subscribers.map(s => s.handle(event)) ); }; // Execute middleware chain const chain = this.buildMiddlewareChain(event, executeSubscribers); await chain(); } private buildMiddlewareChain( event: unknown, finalHandler: () => Promise<void> ): () => Promise<void> { let current = finalHandler; // Build chain from end to start for (let i = this.middlewares.length - 1; i >= 0; i--) { const middleware = this.middlewares[i]; const next = current; current = () => middleware(event, next); } return current; }} // Example middlewaresconst loggingMiddleware: EventMiddleware = async (event, next) => { console.log(`[EVENT] Publishing: ${(event as any).eventType}`); const start = Date.now(); try { await next(); console.log(`[EVENT] Completed: ${(event as any).eventType} in ${Date.now() - start}ms`); } catch (error) { console.error(`[EVENT] Failed: ${(event as any).eventType}`, error); throw error; }}; const metricsMiddleware: EventMiddleware = async (event, next) => { metrics.increment(`events.published.${(event as any).eventType}`); const timer = metrics.timer(`events.duration.${(event as any).eventType}`); try { await next(); timer.stop(); } catch (error) { metrics.increment(`events.failed.${(event as any).eventType}`); throw error; }}; const tracingMiddleware: EventMiddleware = async (event, next) => { const span = tracer.startSpan(`event:${(event as any).eventType}`); span.setTag('event.id', (event as any).eventId); try { await next(); span.setStatus('ok'); } catch (error) { span.setStatus('error'); span.setTag('error.message', (error as Error).message); throw error; } finally { span.finish(); }}; // Usageconst eventBus = new MiddlewareEventBus();eventBus.use(loggingMiddleware);eventBus.use(metricsMiddleware);eventBus.use(tracingMiddleware);When events can't be processed after all retries, they should go to a dead letter queue (DLQ) for investigation:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
/** * Dead Letter Queue Interface * * Stores failed events for investigation and replay. */interface IDeadLetterQueue { /** * Stores a failed event with failure context. */ store(entry: DeadLetterEntry): Promise<void>; /** * Retrieves failed events for investigation. */ list(options: ListOptions): Promise<DeadLetterEntry[]>; /** * Replays a dead letter entry. */ replay(entryId: string): Promise<void>; /** * Removes an entry after investigation. */ remove(entryId: string): Promise<void>;} interface DeadLetterEntry { id: string; event: unknown; eventType: string; subscriberName: string; failedAt: Date; errorMessage: string; errorStack?: string; attemptCount: number; originalPublishTime: Date;} class EventBusWithDLQ implements IEventBus { private readonly maxRetries = 3; constructor( private readonly deadLetterQueue: IDeadLetterQueue, private readonly logger: ILogger ) {} async publish<TEvent extends DomainEvent>(event: TEvent): Promise<void> { const subscribers = this.getSubscribers(event); await Promise.allSettled( subscribers.map(sub => this.executeWithRetryAndDLQ(sub, event) ) ); } private async executeWithRetryAndDLQ<TEvent extends DomainEvent>( subscriber: ISubscriber<TEvent>, event: TEvent ): Promise<void> { let lastError: Error | null = null; for (let attempt = 1; attempt <= this.maxRetries; attempt++) { try { await subscriber.handle(event); return; // Success } catch (error) { lastError = error as Error; this.logger.warn( `Attempt ${attempt}/${this.maxRetries} failed for ${subscriber.constructor.name}`, error ); if (attempt < this.maxRetries) { await this.sleep(Math.pow(2, attempt) * 100); // Exponential backoff } } } // All retries exhausted - send to DLQ await this.deadLetterQueue.store({ id: generateUUID(), event: event, eventType: event.eventType, subscriberName: subscriber.constructor.name, failedAt: new Date(), errorMessage: lastError?.message || 'Unknown error', errorStack: lastError?.stack, attemptCount: this.maxRetries, originalPublishTime: event.occurredAt }); this.logger.error( `Event ${event.eventId} sent to DLQ for subscriber ${subscriber.constructor.name}` ); } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); }}When events need to flow between separate processes, services, or machines, you need a distributed event bus—typically implemented using a message broker like RabbitMQ, Apache Kafka, AWS SNS/SQS, or Google Pub/Sub.
| Aspect | In-Process | Distributed |
|---|---|---|
| Scope | Single process/application | Multiple processes, services, machines |
| Latency | Microseconds (local method calls) | Milliseconds (network + serialization) |
| Serialization | None (objects passed by reference) | Required (JSON, Avro, Protobuf) |
| Durability | Events lost on process crash | Events persisted to disk |
| Ordering | Deterministic | Often requires partitioning |
| Scalability | Limited by single process | Horizontally scalable |
| Complexity | Simple to implement | Requires infrastructure |
A well-designed system abstracts the event bus behind an interface, allowing you to swap implementations without changing publishers or subscribers:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
/** * Event Bus Abstraction * * Same interface works for both in-process and distributed. * Swap implementations without changing business code. */ // Same interface for all implementationsinterface IEventBus { publish<TEvent>(event: TEvent): Promise<void>; subscribe(subscriber: ISubscriber<unknown>): () => void; shutdown(): Promise<void>;} // In-Process Implementationclass InProcessEventBus implements IEventBus { /* ... */ } // RabbitMQ Implementationclass RabbitMQEventBus implements IEventBus { constructor(private readonly connection: amqp.Connection) {} async publish<TEvent extends DomainEvent>(event: TEvent): Promise<void> { const channel = await this.connection.createChannel(); // Serialize event for network transport const message = JSON.stringify(event); // Publish to exchange await channel.publish( 'domain-events', // Exchange name event.eventType, // Routing key Buffer.from(message), { persistent: true, // Survive broker restart contentType: 'application/json', messageId: event.eventId, timestamp: event.occurredAt.getTime() } ); } subscribe(subscriber: ISubscriber<unknown>): () => void { // Create queue bound to relevant event types const eventTypes = subscriber.subscribedEventTypes(); this.setupConsumer(subscriber, eventTypes); return () => this.teardownConsumer(subscriber); } private async setupConsumer( subscriber: ISubscriber<unknown>, eventTypes: string[] ): Promise<void> { const channel = await this.connection.createChannel(); const queueName = `${subscriber.constructor.name}-queue`; // Create durable queue await channel.assertQueue(queueName, { durable: true }); // Bind to each event type for (const eventType of eventTypes) { await channel.bindQueue(queueName, 'domain-events', eventType); } // Consume messages await channel.consume(queueName, async (msg) => { if (!msg) return; try { const event = JSON.parse(msg.content.toString()); await subscriber.handle(event); channel.ack(msg); // Acknowledge success } catch (error) { channel.nack(msg, false, true); // Requeue for retry } }); } async shutdown(): Promise<void> { await this.connection.close(); }} // Kafka Implementationclass KafkaEventBus implements IEventBus { constructor( private readonly producer: KafkaProducer, private readonly consumerGroup: string ) {} async publish<TEvent extends DomainEvent>(event: TEvent): Promise<void> { await this.producer.send({ topic: 'domain-events', messages: [{ key: event.aggregateId, // Partition by aggregate for ordering value: JSON.stringify(event), headers: { 'event-type': event.eventType, 'event-id': event.eventId } }] }); } // Kafka consumer implementation...} // Dependency Injection allows swapping implementationsfunction configureEventBus(config: Config): IEventBus { switch (config.eventBusType) { case 'in-process': return new InProcessEventBus(); case 'rabbitmq': return new RabbitMQEventBus(config.rabbitConnection); case 'kafka': return new KafkaEventBus(config.kafkaProducer, config.consumerGroup); default: throw new Error(`Unknown event bus type: ${config.eventBusType}`); }}Many successful systems start with an in-process event bus and migrate to a distributed one as they grow. The abstraction makes this migration much easier—your publishers and subscribers don't change, only the infrastructure configuration.
Event-driven code requires specific testing strategies. The decoupling that makes pub-sub powerful also makes it trickier to test.
For unit testing, use a test double that captures published events for later assertion:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
/** * Test Double Event Bus * * Captures published events for assertions. * Optionally executes real subscribers for integration tests. */class TestEventBus implements IEventBus { private readonly publishedEvents: unknown[] = []; private readonly subscribers: Map<string, Set<ISubscriber<unknown>>> = new Map(); private executeSubscribers = false; /** * Enable subscriber execution for integration tests. */ withSubscriberExecution(): this { this.executeSubscribers = true; return this; } async publish<TEvent extends { eventType: string }>(event: TEvent): Promise<void> { // Capture the event this.publishedEvents.push(event); // Optionally execute subscribers if (this.executeSubscribers) { const subs = this.subscribers.get(event.eventType) || new Set(); await Promise.allSettled( [...subs].map(s => s.handle(event)) ); } } subscribe(subscriber: ISubscriber<unknown>): () => void { for (const eventType of subscriber.subscribedEventTypes()) { if (!this.subscribers.has(eventType)) { this.subscribers.set(eventType, new Set()); } this.subscribers.get(eventType)!.add(subscriber); } return () => this.unsubscribe(subscriber); } unsubscribe(subscriber: ISubscriber<unknown>): void { for (const eventType of subscriber.subscribedEventTypes()) { this.subscribers.get(eventType)?.delete(subscriber); } } async shutdown(): Promise<void> { // No-op for test double } // ============ Test Assertions ============ /** * Returns all published events. */ getPublishedEvents(): unknown[] { return [...this.publishedEvents]; } /** * Returns published events of a specific type. */ getEventsOfType<TEvent>(eventType: string): TEvent[] { return this.publishedEvents.filter( e => (e as any).eventType === eventType ) as TEvent[]; } /** * Asserts that an event of the given type was published. */ expectEventPublished(eventType: string): void { const found = this.publishedEvents.some( e => (e as any).eventType === eventType ); if (!found) { throw new Error(`Expected event '${eventType}' to be published, but it wasn't`); } } /** * Asserts that no event of the given type was published. */ expectEventNotPublished(eventType: string): void { const found = this.publishedEvents.some( e => (e as any).eventType === eventType ); if (found) { throw new Error(`Expected event '${eventType}' NOT to be published, but it was`); } } /** * Asserts the exact number of events of a type were published. */ expectEventCount(eventType: string, count: number): void { const actual = this.getEventsOfType(eventType).length; if (actual !== count) { throw new Error( `Expected ${count} '${eventType}' events, but found ${actual}` ); } } /** * Clears all captured events (for test isolation). */ reset(): void { this.publishedEvents.length = 0; }} // ============ Example Tests ============ describe('OrderService', () => { let eventBus: TestEventBus; let orderService: OrderService; beforeEach(() => { eventBus = new TestEventBus(); orderService = new OrderService(eventBus, mockOrderRepository); }); afterEach(() => { eventBus.reset(); }); it('publishes OrderPlaced event when order is created', async () => { // Act const order = await orderService.placeOrder({ customerId: 'cust-123', items: [{ productId: 'prod-1', quantity: 2 }] }); // Assert eventBus.expectEventPublished('OrderPlaced'); const events = eventBus.getEventsOfType<OrderPlacedEvent>('OrderPlaced'); expect(events).toHaveLength(1); expect(events[0].payload.orderId).toBe(order.id); expect(events[0].payload.customerId).toBe('cust-123'); }); it('does not publish OrderPlaced when validation fails', async () => { // Act & Assert await expect( orderService.placeOrder({ customerId: '', items: [] }) ).rejects.toThrow('Invalid order'); eventBus.expectEventNotPublished('OrderPlaced'); });}); describe('EmailSubscriber Integration', () => { let eventBus: TestEventBus; let emailService: MockEmailService; beforeEach(() => { eventBus = new TestEventBus().withSubscriberExecution(); emailService = new MockEmailService(); // Register real subscriber for integration test eventBus.subscribe(new EmailNotificationSubscriber(emailService)); }); it('sends email when OrderPlaced event is published', async () => { // Arrange const event = createOrderPlacedEvent({ orderId: 'order-123', customerEmail: 'test@example.com' }); // Act await eventBus.publish(event); // Assert expect(emailService.sentEmails).toHaveLength(1); expect(emailService.sentEmails[0].to).toBe('test@example.com'); });});Always call reset() between tests to clear captured events. Leaked state from previous tests causes flaky, hard-to-debug test failures.
The event bus is the critical infrastructure that enables the Publish-Subscribe pattern's decoupling benefits. Understanding its design and implementation is essential for building robust event-driven systems.
What's next:
With both the participants (publishers, subscribers) and the infrastructure (event bus) understood, we'll explore the deeper architectural benefit they provide: coupling and decoupling. Understanding these concepts is key to applying pub-sub effectively.
You now understand the event bus as the central mediator in publish-subscribe systems. You can implement an in-process event bus, understand delivery semantics, and know when to transition to distributed implementations.