Loading content...
We've explored the roles (publishers, subscribers), the infrastructure (event bus), and the benefits (decoupling). Now comes the practical question: how do you actually implement this?
There's no single "right" implementation. The best approach depends on your scale, consistency requirements, team expertise, and infrastructure constraints. This page covers the full spectrum of implementation approaches, from simple patterns you can add to any application today to enterprise-grade distributed messaging systems.
By the end of this page, you will understand multiple implementation approaches for pub-sub systems, when to use each, and how to evolve from simple to sophisticated as your needs grow. You'll have practical code examples for each approach.
Pub-Sub implementations exist on a spectrum from simple to complex, each with different trade-offs.
| Approach | Complexity | Durability | Distribution | Best For |
|---|---|---|---|---|
| Observer Pattern | 🟢 Minimal | None | In-process only | UI events, simple callbacks |
| In-Memory Event Bus | 🟢 Low | None | In-process only | Monoliths, domain events |
| Database-Backed Queue | 🟡 Medium | Transactional | Multi-process | Transactional outbox pattern |
| Redis Pub/Sub | 🟡 Medium | At-most-once | Multi-service | Real-time notifications |
| RabbitMQ | 🟠 Higher | Durable queues | Multi-service | Work queues, task processing |
| Apache Kafka | 🔴 High | Log-based | Multi-data center | Event streaming, analytics |
| Cloud Services | 🟡 Medium | Managed | Global | Serverless, managed infra |
Don't start with Kafka. Most applications should begin with an in-memory event bus and add infrastructure only when concrete needs arise. Premature complexity is a common pitfall.
The Observer Pattern is the simplest form of pub-sub and the foundation for more sophisticated approaches. A subject maintains a list of observers and notifies them of state changes.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
/** * Observer Pattern Implementation * * The subject directly manages its observers. * Simpler than full pub-sub but more coupled. */ // Generic observer interfaceinterface Observer<T> { update(data: T): void;} // Generic subject (observable)class Subject<T> { private observers: Set<Observer<T>> = new Set(); attach(observer: Observer<T>): void { this.observers.add(observer); } detach(observer: Observer<T>): void { this.observers.delete(observer); } protected notify(data: T): void { for (const observer of this.observers) { observer.update(data); } }} // Concrete subject: Stock price trackerinterface PriceUpdate { symbol: string; price: number; timestamp: Date;} class StockTicker extends Subject<PriceUpdate> { private prices: Map<string, number> = new Map(); updatePrice(symbol: string, price: number): void { this.prices.set(symbol, price); // Notify all observers of the price change this.notify({ symbol, price, timestamp: new Date() }); } getPrice(symbol: string): number | undefined { return this.prices.get(symbol); }} // Concrete observersclass PriceDisplay implements Observer<PriceUpdate> { update(data: PriceUpdate): void { console.log(`[DISPLAY] ${data.symbol}: $${data.price.toFixed(2)}`); }} class PriceLogger implements Observer<PriceUpdate> { private log: PriceUpdate[] = []; update(data: PriceUpdate): void { this.log.push(data); console.log(`[LOG] Recorded price update #${this.log.length}`); }} class PriceAlertService implements Observer<PriceUpdate> { constructor(private readonly thresholds: Map<string, number>) {} update(data: PriceUpdate): void { const threshold = this.thresholds.get(data.symbol); if (threshold && data.price < threshold) { console.log(`[ALERT] ${data.symbol} dropped below $${threshold}!`); } }} // Usageconst ticker = new StockTicker(); // Attach observersconst display = new PriceDisplay();const logger = new PriceLogger();const alerts = new PriceAlertService(new Map([['AAPL', 150]])); ticker.attach(display);ticker.attach(logger);ticker.attach(alerts); // Price updates notify all observersticker.updatePrice('AAPL', 155); // Display, Logger notifiedticker.updatePrice('AAPL', 145); // Display, Logger, and Alert notified! // Detach when doneticker.detach(display);In Observer, the subject knows its observers directly. In Pub-Sub, an intermediary (event bus) handles routing. Observer is simpler but more coupled. Use Observer for simple cases; use Pub-Sub as complexity grows.
An in-memory event bus provides full pub-sub decoupling within a single process. Publishers and subscribers don't know each other—they communicate through the bus.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
/** * Production-Ready In-Memory Event Bus * * Features: * - Type-safe event routing * - Async handler execution * - Error isolation * - Middleware support * - Graceful shutdown */ type EventHandler<T = unknown> = (event: T) => Promise<void> | void; type Middleware = (event: unknown, next: () => Promise<void>) => Promise<void>; type Unsubscribe = () => void; interface DomainEvent { eventType: string; eventId: string; occurredAt: Date; [key: string]: unknown; } class InMemoryEventBus { private handlers = new Map<string, Set<EventHandler>>(); private wildcardHandlers = new Set<EventHandler>(); private middlewares: Middleware[] = []; private isShuttingDown = false; private pendingHandlers = 0; /** * Register middleware to run on every event. */ use(middleware: Middleware): this { this.middlewares.push(middleware); return this; } /** * Subscribe to a specific event type. */ on<T extends DomainEvent>(eventType: string, handler: EventHandler<T>): Unsubscribe { if (!this.handlers.has(eventType)) { this.handlers.set(eventType, new Set()); } this.handlers.get(eventType)!.add(handler as EventHandler); return () => { this.handlers.get(eventType)?.delete(handler as EventHandler); }; } /** * Subscribe to all events (useful for logging, metrics). */ onAny(handler: EventHandler<DomainEvent>): Unsubscribe { this.wildcardHandlers.add(handler); return () => this.wildcardHandlers.delete(handler); } /** * Publish an event to all interested handlers. */ async publish<T extends DomainEvent>(event: T): Promise<void> { if (this.isShuttingDown) { throw new Error('Event bus is shutting down'); } const typeHandlers = this.handlers.get(event.eventType) || new Set(); const allHandlers = [...typeHandlers, ...this.wildcardHandlers]; if (allHandlers.length === 0) { return; // No handlers, nothing to do } // Build middleware chain const executeHandlers = async () => { this.pendingHandlers += allHandlers.length; const results = await Promise.allSettled( allHandlers.map(async handler => { try { await handler(event); } finally { this.pendingHandlers--; } }) ); // Log failures but don't throw (isolation) for (const result of results) { if (result.status === 'rejected') { console.error('Handler failed:', result.reason); } } }; // Execute through middleware chain await this.executeWithMiddleware(event, executeHandlers); } private async executeWithMiddleware( event: unknown, finalHandler: () => Promise<void> ): Promise<void> { let index = 0; const next = async (): Promise<void> => { if (index < this.middlewares.length) { const middleware = this.middlewares[index++]; await middleware(event, next); } else { await finalHandler(); } }; await next(); } /** * Gracefully shutdown, waiting for pending handlers. */ async shutdown(timeoutMs = 5000): Promise<void> { this.isShuttingDown = true; const start = Date.now(); while (this.pendingHandlers > 0) { if (Date.now() - start > timeoutMs) { console.warn(`Shutdown timeout with ${this.pendingHandlers} pending handlers`); break; } await new Promise(resolve => setTimeout(resolve, 100)); } this.handlers.clear(); this.wildcardHandlers.clear(); }} // ========== Usage Example ========== const eventBus = new InMemoryEventBus(); // Add logging middlewareeventBus.use(async (event, next) => { const e = event as DomainEvent; console.log(`[EVENT] ${e.eventType} - ${e.eventId}`); const start = Date.now(); await next(); console.log(`[EVENT] ${e.eventType} completed in ${Date.now() - start}ms`);}); // Add metrics middlewareeventBus.use(async (event, next) => { const e = event as DomainEvent; metrics.increment(`events.${e.eventType}.published`); await next();}); // Subscribe to specific eventsconst unsubscribe = eventBus.on<OrderPlacedEvent>('OrderPlaced', async (event) => { console.log(`Processing order ${event.orderId}`); await sendConfirmationEmail(event);}); // Subscribe to all events (for audit log)eventBus.onAny(async (event) => { await auditLog.record(event);}); // Publish eventsawait eventBus.publish({ eventType: 'OrderPlaced', eventId: 'evt-123', occurredAt: new Date(), orderId: 'order-456', customerId: 'cust-789'}); // Cleanupunsubscribe();await eventBus.shutdown();The Transactional Outbox Pattern solves a critical problem: how to ensure that database changes and event publishing are atomic. Without this, you risk publishing events for transactions that later roll back, or committing transactions without publishing events.
12345678910111213141516171819202122232425262728293031323334353637383940
/** * The Dual Write Problem * * Publishing to a message broker and saving to database * are two separate operations that can fail independently. */ // ❌ PROBLEM: Dual writes can lead to inconsistencyasync function placeOrderDangerous(command: PlaceOrderCommand): Promise<Order> { const order = new Order(command); // Step 1: Save to database await orderRepository.save(order); // Succeeds // Step 2: Publish to message broker await messageBroker.publish(orderPlacedEvent); // FAILS! // Result: Order is saved but event was never published // Downstream systems never learn about the order // Database and event stream are now INCONSISTENT return order;} // Also problematic in the reverse order:async function placeOrderStillDangerous(command: PlaceOrderCommand): Promise<Order> { const order = new Order(command); // Step 1: Publish first await messageBroker.publish(orderPlacedEvent); // Succeeds // Step 2: Save to database await orderRepository.save(order); // FAILS! // Result: Event published but order never saved // Downstream systems process a non-existent order // Even worse inconsistency! return order;}Store events in an "outbox" table within the same database transaction as the domain change. A separate process reads the outbox and publishes to the message broker.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
/** * Transactional Outbox Pattern * * Events are stored in the database as part of the same transaction. * A separate publisher process reads and publishes them. */ // Outbox table schema (SQL)/*CREATE TABLE outbox ( id UUID PRIMARY KEY, aggregate_type VARCHAR(255) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMP DEFAULT NOW(), published_at TIMESTAMP NULL); CREATE INDEX idx_outbox_unpublished ON outbox (created_at) WHERE published_at IS NULL;*/ interface OutboxEntry { id: string; aggregateType: string; aggregateId: string; eventType: string; payload: object; createdAt: Date; publishedAt: Date | null;} class OrderService { constructor( private readonly db: Database, private readonly orderRepository: IOrderRepository ) {} async placeOrder(command: PlaceOrderCommand): Promise<Order> { // Single transaction for both operations return this.db.transaction(async (tx) => { // Create and save order const order = new Order(command); await this.orderRepository.save(order, tx); // Create event for outbox (same transaction!) const event = { eventType: 'OrderPlaced', eventId: generateUUID(), occurredAt: new Date(), aggregateId: order.id, payload: { orderId: order.id, customerId: order.customerId, items: order.items, totalAmount: order.totalAmount } }; // Write to outbox table (same transaction!) await tx.query(` INSERT INTO outbox (id, aggregate_type, aggregate_id, event_type, payload) VALUES ($1, $2, $3, $4, $5) `, [ event.eventId, 'Order', order.id, event.eventType, JSON.stringify(event.payload) ]); // Transaction commits: order AND outbox entry are persisted atomically return order; }); }} /** * Outbox Publisher: Background process that reads outbox and publishes */class OutboxPublisher { private isRunning = false; constructor( private readonly db: Database, private readonly messageBroker: IMessageBroker ) {} async start(): Promise<void> { this.isRunning = true; while (this.isRunning) { await this.publishPendingEvents(); await this.sleep(100); // Poll interval } } async stop(): Promise<void> { this.isRunning = false; } private async publishPendingEvents(): Promise<void> { // Fetch unpublished events const entries = await this.db.query<OutboxEntry>(` SELECT * FROM outbox WHERE published_at IS NULL ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED -- Allow concurrent publishers `); for (const entry of entries) { try { // Publish to message broker await this.messageBroker.publish({ eventType: entry.eventType, eventId: entry.id, aggregateId: entry.aggregateId, payload: entry.payload }); // Mark as published await this.db.query(` UPDATE outbox SET published_at = NOW() WHERE id = $1 `, [entry.id]); } catch (error) { console.error(`Failed to publish ${entry.id}:`, error); // Will retry on next poll } } } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); }} /** * Cleanup: Periodically delete old published events */async function cleanupOutbox(db: Database): Promise<void> { await db.query(` DELETE FROM outbox WHERE published_at IS NOT NULL AND published_at < NOW() - INTERVAL '7 days' `);}The outbox pattern guarantees that if the domain change commits, the event WILL be published (eventually). If the transaction rolls back, no event is written. This is at-least-once delivery at the database level.
When you need events to flow between services or processes, you need a message broker—a dedicated piece of infrastructure that handles routing, persistence, and delivery. Here are the major options.
Redis provides simple pub-sub for real-time, ephemeral messaging. Events are not persisted—if no subscriber is listening, the message is lost.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
/** * Redis Pub/Sub Implementation * * Best for: Real-time notifications, ephemeral events * NOT for: Durable messaging, guaranteed delivery */ import Redis from 'ioredis'; class RedisPubSubEventBus { private publisher: Redis; private subscriber: Redis; private handlers = new Map<string, Set<(event: unknown) => void>>(); constructor(redisUrl: string) { // Need separate connections for pub and sub this.publisher = new Redis(redisUrl); this.subscriber = new Redis(redisUrl); // Handle incoming messages this.subscriber.on('message', (channel, message) => { const handlers = this.handlers.get(channel); if (handlers) { const event = JSON.parse(message); handlers.forEach(handler => handler(event)); } }); } async publish<T extends { eventType: string }>(event: T): Promise<void> { const channel = event.eventType; const message = JSON.stringify(event); await this.publisher.publish(channel, message); } async subscribe( eventType: string, handler: (event: unknown) => void ): Promise<() => void> { if (!this.handlers.has(eventType)) { this.handlers.set(eventType, new Set()); await this.subscriber.subscribe(eventType); } this.handlers.get(eventType)!.add(handler); return () => { this.handlers.get(eventType)?.delete(handler); }; } async shutdown(): Promise<void> { await this.subscriber.quit(); await this.publisher.quit(); }} // Usageconst eventBus = new RedisPubSubEventBus('redis://localhost:6379'); await eventBus.subscribe('OrderPlaced', async (event) => { console.log('Order placed:', event);}); await eventBus.publish({ eventType: 'OrderPlaced', orderId: '123', customerId: '456'});RabbitMQ is a traditional message broker with durable queues, acknowledgments, and sophisticated routing via exchanges.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
/** * RabbitMQ Implementation * * Best for: Work queues, durable messaging, complex routing * Features: Message acknowledgment, dead letter queues, TTL */ import amqp from 'amqplib'; class RabbitMQEventBus { private connection: amqp.Connection | null = null; private channel: amqp.Channel | null = null; private readonly exchangeName = 'domain-events'; async connect(url: string): Promise<void> { this.connection = await amqp.connect(url); this.channel = await this.connection.createChannel(); // Create topic exchange for flexible routing await this.channel.assertExchange(this.exchangeName, 'topic', { durable: true // Survives broker restart }); } async publish<T extends { eventType: string; eventId: string }>( event: T ): Promise<void> { const routingKey = event.eventType; const message = Buffer.from(JSON.stringify(event)); this.channel!.publish(this.exchangeName, routingKey, message, { persistent: true, // Survives broker restart contentType: 'application/json', messageId: event.eventId, timestamp: Date.now() }); } async subscribe( eventPattern: string, // e.g., 'Order.*' or 'Order.Placed' queueName: string, handler: (event: unknown) => Promise<void> ): Promise<void> { // Create durable queue await this.channel!.assertQueue(queueName, { durable: true, deadLetterExchange: 'dlx' // Failed messages go here }); // Bind queue to exchange with routing pattern await this.channel!.bindQueue(queueName, this.exchangeName, eventPattern); // Consume messages await this.channel!.consume(queueName, async (msg) => { if (!msg) return; try { const event = JSON.parse(msg.content.toString()); await handler(event); // Acknowledge successful processing this.channel!.ack(msg); } catch (error) { console.error('Handler failed:', error); // Negative ack - requeue for retry // After max retries, goes to dead letter queue this.channel!.nack(msg, false, false); } }); } async shutdown(): Promise<void> { await this.channel?.close(); await this.connection?.close(); }} // Usageconst eventBus = new RabbitMQEventBus();await eventBus.connect('amqp://localhost'); // Subscribe with pattern matchingawait eventBus.subscribe('Order.*', 'order-notifications', async (event) => { console.log('Order event:', event);}); // This matches 'Order.*' patternawait eventBus.publish({ eventType: 'Order.Placed', eventId: 'evt-123', orderId: '456'});Kafka is a distributed event streaming platform designed for high throughput and durability. Events are stored in an append-only log that consumers read from.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
/** * Apache Kafka Implementation * * Best for: Event streaming, high throughput, event sourcing * Features: Persistent log, replay capability, partitioning */ import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs'; class KafkaEventBus { private kafka: Kafka; private producer: Producer; private consumers: Map<string, Consumer> = new Map(); constructor(brokers: string[]) { this.kafka = new Kafka({ clientId: 'my-app', brokers }); this.producer = this.kafka.producer(); } async connect(): Promise<void> { await this.producer.connect(); } async publish<T extends { eventType: string; aggregateId: string }>( event: T, topic = 'domain-events' ): Promise<void> { await this.producer.send({ topic, messages: [{ // Partition by aggregateId for ordering guarantees key: event.aggregateId, value: JSON.stringify(event), headers: { 'event-type': event.eventType } }] }); } async subscribe( topic: string, groupId: string, handler: (event: unknown) => Promise<void> ): Promise<void> { const consumer = this.kafka.consumer({ groupId }); await consumer.connect(); await consumer.subscribe({ topic, fromBeginning: false }); await consumer.run({ eachMessage: async ({ message }: EachMessagePayload) => { const event = JSON.parse(message.value!.toString()); try { await handler(event); // Kafka auto-commits offset after successful processing } catch (error) { console.error('Handler failed:', error); // In production, implement retry logic or dead letter throw error; // Causes reprocessing } } }); this.consumers.set(groupId, consumer); } async shutdown(): Promise<void> { for (const consumer of this.consumers.values()) { await consumer.disconnect(); } await this.producer.disconnect(); }} // Usageconst eventBus = new KafkaEventBus(['localhost:9092']);await eventBus.connect(); // Multiple instances share work via consumer groupawait eventBus.subscribe('domain-events', 'order-processors', async (event) => { console.log('Processing event:', event);}); // Events partitioned by aggregateId for orderingawait eventBus.publish({ eventType: 'OrderPlaced', aggregateId: 'order-123', customerId: 'cust-456'});| Feature | Redis Pub/Sub | RabbitMQ | Kafka |
|---|---|---|---|
| Durability | ❌ None | ✅ Queue-based | ✅ Log-based |
| Ordering | ✅ Per channel | ✅ Per queue | ✅ Per partition |
| Replay | ❌ No | ❌ No | ✅ Yes |
| Throughput | 🟢 High | 🟡 Medium | 🟢 Very High |
| Complexity | 🟢 Low | 🟡 Medium | 🔴 High |
| Best Use Case | Real-time, ephemeral | Work queues, RPC | Event streaming |
Cloud providers offer managed pub-sub services that eliminate operational overhead. These are often the best choice for teams without dedicated infrastructure expertise.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
/** * AWS SNS (topics) + SQS (queues) Implementation * * SNS: Fan-out to multiple subscribers * SQS: Durable queues with at-least-once delivery */ import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs'; class AWSSNSSQSEventBus { private sns: SNSClient; private sqs: SQSClient; constructor(region: string) { this.sns = new SNSClient({ region }); this.sqs = new SQSClient({ region }); } async publish<T extends { eventType: string }>( event: T, topicArn: string ): Promise<void> { await this.sns.send(new PublishCommand({ TopicArn: topicArn, Message: JSON.stringify(event), MessageAttributes: { eventType: { DataType: 'String', StringValue: event.eventType } } })); } async consume( queueUrl: string, handler: (event: unknown) => Promise<void> ): Promise<void> { while (true) { const response = await this.sqs.send(new ReceiveMessageCommand({ QueueUrl: queueUrl, WaitTimeSeconds: 20, // Long polling MaxNumberOfMessages: 10 })); for (const message of response.Messages || []) { try { // SNS wraps the message const snsMessage = JSON.parse(message.Body!); const event = JSON.parse(snsMessage.Message); await handler(event); // Delete after successful processing await this.sqs.send(new DeleteMessageCommand({ QueueUrl: queueUrl, ReceiptHandle: message.ReceiptHandle })); } catch (error) { console.error('Handler failed:', error); // Message becomes visible again after visibility timeout // After max retries, goes to dead letter queue } } } }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
/** * Google Cloud Pub/Sub Implementation * * Fully managed, globally distributed, at-least-once delivery */ import { PubSub, Subscription } from '@google-cloud/pubsub'; class GCPPubSubEventBus { private pubsub: PubSub; constructor(projectId: string) { this.pubsub = new PubSub({ projectId }); } async publish<T extends { eventType: string }>( event: T, topicName: string ): Promise<void> { const topic = this.pubsub.topic(topicName); await topic.publishMessage({ json: event, attributes: { eventType: event.eventType } }); } async subscribe( subscriptionName: string, handler: (event: unknown) => Promise<void> ): Promise<void> { const subscription = this.pubsub.subscription(subscriptionName); subscription.on('message', async (message) => { try { const event = JSON.parse(message.data.toString()); await handler(event); message.ack(); // Acknowledge success } catch (error) { console.error('Handler failed:', error); message.nack(); // Retry later } }); subscription.on('error', (error) => { console.error('Subscription error:', error); }); }}Cloud pub-sub services handle scaling, durability, and availability automatically. They're often more cost-effective than running your own Kafka or RabbitMQ clusters, especially at smaller scales.
With so many options, how do you choose? Here's a decision framework based on common scenarios.
| If you need... | Consider... |
|---|---|
| Simple in-app decoupling | In-Memory Event Bus |
| Guaranteed consistency with DB | Transactional Outbox Pattern |
| Real-time notifications (lossy OK) | Redis Pub/Sub or WebSockets |
| Work queues with retries | RabbitMQ or SQS |
| Event replay and streaming | Apache Kafka |
| Minimal ops overhead | Cloud Pub/Sub (SNS/SQS, GCP, Azure) |
| Cross-region distribution | Kafka or Cloud Pub/Sub |
The Publish-Subscribe pattern can be implemented at many levels of sophistication. Choose the simplest approach that meets your needs and evolve as requirements grow.
You now have a comprehensive understanding of the Publish-Subscribe pattern: the roles of publishers and subscribers, the event bus that mediates between them, how pub-sub achieves decoupling, and the spectrum of implementation approaches from simple to sophisticated.