Loading content...
In the world of software architecture, few patterns are as fundamental and universally applicable as the Publish-Subscribe (Pub-Sub) pattern. At its core, Pub-Sub is elegantly simple: one component publishes information, and other components subscribe to receive that information—without either side needing to know about the other directly.
This deceptively simple concept forms the backbone of everything from GUI event handling to distributed microservices communication. Understanding the precise roles, responsibilities, and design principles of publishers and subscribers is essential for building systems that are scalable, maintainable, and resilient to change.
By the end of this page, you will have a complete understanding of publisher and subscriber roles: their responsibilities, contracts, design principles, and how they achieve complete decoupling. You'll be able to design components that participate correctly in Pub-Sub systems at any scale.
A Publisher is any component that produces events or messages and makes them available to interested parties. The publisher's defining characteristic is that it has no knowledge of who, if anyone, will receive its messages. This is the fundamental principle that enables loose coupling.
A publisher commits to a specific contract that defines what it will communicate and how:
Understanding this contract is crucial because consumers depend on it. Changing a publisher's contract without careful consideration can break subscribers across the entire system.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
/** * Publisher Interface: The Contract for Event Producers * * A publisher is responsible for: * 1. Producing well-structured events * 2. Determining when events should be published * 3. Ensuring events contain all necessary information * 4. NOT knowing who receives the events */interface IPublisher<TEvent> { /** * Publishes an event to all interested subscribers. * The publisher does NOT maintain a list of subscribers directly. * Instead, it delegates to an intermediary (event bus/dispatcher). */ publish(event: TEvent): void;} /** * Domain Event: A well-structured event with required metadata */interface DomainEvent { readonly eventId: string; // Unique identifier readonly eventType: string; // Type discriminator readonly occurredAt: Date; // When the event happened readonly aggregateId: string; // Source entity identifier readonly version: number; // Event schema version readonly payload: unknown; // Event-specific data} /** * Example: OrderService as a Publisher * * This service publishes events about order lifecycle changes. * It has NO knowledge of who is listening. */class OrderService implements IPublisher<DomainEvent> { constructor( private readonly eventBus: IEventBus, private readonly orderRepository: IOrderRepository ) {} async placeOrder(command: PlaceOrderCommand): Promise<Order> { // Business logic: create and validate order const order = new Order(command.customerId, command.items); order.calculateTotals(); order.validate(); // Persist the order await this.orderRepository.save(order); // Publish event - publisher doesn't know or care who receives this this.publish({ eventId: generateUUID(), eventType: "OrderPlaced", occurredAt: new Date(), aggregateId: order.id, version: 1, payload: { orderId: order.id, customerId: order.customerId, items: order.items, totalAmount: order.totalAmount, currency: order.currency } }); return order; } publish(event: DomainEvent): void { // Delegate to event bus - maintains single responsibility this.eventBus.publish(event); }}A publisher's sole event-related responsibility is to create and emit events that accurately represent what happened. It should never contain logic about what should happen as a result of the event—that's the subscriber's domain.
Designing effective publishers requires adherence to several key principles that ensure the events they produce are useful, reliable, and maintainable.
Publishers should emit events that describe what happened (past tense), not what should happen (imperative). This subtle distinction has profound implications for system design.
SendEmailToCustomerUpdateInventoryStockNotifyWarehouseCalculateShippingCostOrderPlacedPaymentReceivedItemShippedCustomerRegisteredEvents should be self-contained. Subscribers should not need to call back to the publisher or perform additional queries to understand the event. This principle, sometimes called event enrichment, reduces coupling and improves performance.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
// ❌ BAD: Insufficient context - subscribers must query for detailsinterface OrderPlacedEventPoor { orderId: string; // Only an ID - subscribers need to query for everything else} // ✅ GOOD: Self-contained event with full contextinterface OrderPlacedEvent { // Core identifiers orderId: string; customerId: string; // Order details - no need to query items: Array<{ productId: string; productName: string; // Include name, not just ID quantity: number; unitPrice: number; currency: string; }>; // Calculated totals subtotal: number; taxAmount: number; shippingCost: number; totalAmount: number; currency: string; // Customer context for personalization customerEmail: string; customerName: string; shippingAddress: Address; // Temporal information orderedAt: Date; estimatedDelivery: Date; // Metadata orderSource: 'web' | 'mobile' | 'api' | 'phone'; promotionCodes: string[];} // ✅ EVEN BETTER: Snapshot of state at event timeinterface OrderPlacedEventWithSnapshot { // The event itself orderId: string; occurredAt: Date; // Snapshot of order at this point in time orderSnapshot: OrderSnapshot; // Snapshot of customer at this point in time // Critical: prices and addresses may change later customerSnapshot: { id: string; name: string; email: string; membershipTier: string; address: Address; }; // Snapshot of catalog prices at order time itemSnapshots: Array<{ productId: string; name: string; priceAtOrderTime: number; // Captured price, not current price catalogPriceAtOrderTime: number; }>;} /** * Why snapshots matter: * * If a subscriber processes an OrderPlaced event days later * (due to backlog, retry, or replay), the current product * prices and customer address may have changed. The snapshot * ensures the event represents the exact state at occurrence time. */Once an event is created and published, it should never be modified. Events represent historical facts—and history doesn't change. This immutability is crucial for:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
/** * Immutable Event Design * * All properties are readonly, ensuring the event cannot be * modified after creation. This is enforced at the type level. */interface ImmutableOrderPlacedEvent { readonly eventId: string; readonly eventType: 'OrderPlaced'; readonly occurredAt: Date; readonly aggregateId: string; readonly version: number; readonly payload: Readonly<{ orderId: string; customerId: string; items: ReadonlyArray<Readonly<OrderItem>>; totalAmount: number; }>;} /** * Factory function to create immutable events * Uses Object.freeze for runtime immutability enforcement */function createOrderPlacedEvent( orderId: string, customerId: string, items: OrderItem[], totalAmount: number): ImmutableOrderPlacedEvent { const event = { eventId: generateUUID(), eventType: 'OrderPlaced' as const, occurredAt: new Date(), aggregateId: orderId, version: 1, payload: { orderId, customerId, items: Object.freeze([...items]), // Deep freeze array totalAmount } }; // Freeze the entire object tree return deepFreeze(event);} function deepFreeze<T extends object>(obj: T): Readonly<T> { Object.keys(obj).forEach(key => { const value = (obj as any)[key]; if (value && typeof value === 'object') { deepFreeze(value); } }); return Object.freeze(obj);} // Attempting to modify throws in strict modeconst event = createOrderPlacedEvent('order-123', 'cust-456', [], 100);// event.payload.totalAmount = 200; // ❌ TypeScript error + runtime errorPublishers must version their events. When the event structure needs to change, create a new version rather than modifying the existing one. This allows gradual migration and prevents breaking existing subscribers.
A Subscriber (also called a listener, handler, or consumer) is a component that expresses interest in specific events and reacts when those events occur. Like publishers, subscribers have no direct knowledge of who produces the events they consume.
A subscriber commits to:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
/** * Subscriber Interface: The Contract for Event Consumers * * A subscriber is responsible for: * 1. Declaring which events it handles * 2. Processing events when they arrive * 3. Handling failures gracefully * 4. Being idempotent when necessary * 5. NOT knowing who produces the events */interface ISubscriber<TEvent> { /** * Returns the event types this subscriber handles. * Used by the event bus for routing. */ subscribedEventTypes(): string[]; /** * Handles an incoming event. * Must be idempotent for reliable message delivery. */ handle(event: TEvent): Promise<void>; /** * Optional: Priority for ordering among subscribers * Lower numbers = higher priority = earlier execution */ priority?: number;} /** * Example: EmailNotificationSubscriber * * Listens for order events and sends appropriate emails. * Has NO knowledge of who publishes these events. */class EmailNotificationSubscriber implements ISubscriber<DomainEvent> { constructor( private readonly emailService: IEmailService, private readonly templateEngine: ITemplateEngine, private readonly customerRepository: ICustomerRepository ) {} subscribedEventTypes(): string[] { return [ 'OrderPlaced', 'OrderShipped', 'OrderDelivered', 'OrderCancelled' ]; } async handle(event: DomainEvent): Promise<void> { // Route to specific handler based on event type switch (event.eventType) { case 'OrderPlaced': await this.handleOrderPlaced(event); break; case 'OrderShipped': await this.handleOrderShipped(event); break; case 'OrderDelivered': await this.handleOrderDelivered(event); break; case 'OrderCancelled': await this.handleOrderCancelled(event); break; } } private async handleOrderPlaced(event: DomainEvent): Promise<void> { const payload = event.payload as OrderPlacedPayload; // Use event data directly - no need to query publisher const html = await this.templateEngine.render('order-confirmation', { orderId: payload.orderId, customerName: payload.customerName, items: payload.items, totalAmount: payload.totalAmount, estimatedDelivery: payload.estimatedDelivery }); await this.emailService.send({ to: payload.customerEmail, subject: `Order Confirmation #${payload.orderId}`, html }); } private async handleOrderShipped(event: DomainEvent): Promise<void> { const payload = event.payload as OrderShippedPayload; const html = await this.templateEngine.render('order-shipped', { orderId: payload.orderId, trackingNumber: payload.trackingNumber, carrier: payload.carrier, estimatedDelivery: payload.estimatedDelivery }); await this.emailService.send({ to: payload.customerEmail, subject: `Your Order #${payload.orderId} Has Shipped!`, html }); } // Additional handlers... private async handleOrderDelivered(event: DomainEvent): Promise<void> { /* ... */ } private async handleOrderCancelled(event: DomainEvent): Promise<void> { /* ... */ }}Well-designed subscribers are essential for robust event-driven systems. Several principles guide their design.
In distributed systems, events may be delivered more than once due to network issues, retries, or at-least-once delivery semantics. Subscribers must handle duplicate events gracefully.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
/** * Idempotent Subscriber Pattern * * Uses event ID tracking to ensure each event is processed exactly once, * even if delivered multiple times. */class IdempotentInventorySubscriber implements ISubscriber<DomainEvent> { constructor( private readonly inventoryService: IInventoryService, private readonly processedEvents: IProcessedEventStore ) {} subscribedEventTypes(): string[] { return ['OrderPlaced', 'OrderCancelled']; } async handle(event: DomainEvent): Promise<void> { // Idempotency check: have we processed this exact event before? if (await this.processedEvents.exists(event.eventId)) { console.log(`Event ${event.eventId} already processed, skipping`); return; // Skip duplicate } try { // Process the event await this.processEvent(event); // Mark as processed (with TTL for storage efficiency) await this.processedEvents.markProcessed(event.eventId, { processedAt: new Date(), eventType: event.eventType, ttlDays: 7 // Keep for 7 days for deduplication }); } catch (error) { // Don't mark as processed - allow retry throw error; } } private async processEvent(event: DomainEvent): Promise<void> { switch (event.eventType) { case 'OrderPlaced': await this.reserveInventory(event); break; case 'OrderCancelled': await this.releaseInventory(event); break; } } private async reserveInventory(event: DomainEvent): Promise<void> { const payload = event.payload as OrderPlacedPayload; for (const item of payload.items) { // Idempotent operation: uses orderId as reservation key // Multiple calls with same orderId won't double-reserve await this.inventoryService.reserve({ productId: item.productId, quantity: item.quantity, reservationKey: `${payload.orderId}:${item.productId}`, expiresAt: addHours(new Date(), 24) }); } } private async releaseInventory(event: DomainEvent): Promise<void> { const payload = event.payload as OrderCancelledPayload; // Release by reservation key - idempotent await this.inventoryService.releaseByOrderId(payload.orderId); }} /** * Processed Event Store: Tracks which events have been handled */interface IProcessedEventStore { exists(eventId: string): Promise<boolean>; markProcessed(eventId: string, metadata: ProcessedMetadata): Promise<void>;} // Implementation options:// - Redis SET with TTL// - Database table with unique constraint on eventId// - Bloom filter for approximate deduplication (memory-efficient)Each subscriber should have one clear purpose. Avoid creating "kitchen sink" subscribers that handle many unrelated concerns. This makes testing, maintenance, and scaling much easier.
| Approach | Pattern | Pros | Cons |
|---|---|---|---|
| Single Purpose | One subscriber per concern (Email, Inventory, Analytics separately) | Easy to test, modify, and scale independently | More classes to manage |
| Multi-Purpose | One subscriber handles email, inventory, and analytics | Fewer classes | Hard to test, violates SRP, can't scale concerns independently |
A failing subscriber should not affect other subscribers. Each subscriber operates in isolation—if email sending fails, inventory updates should still proceed.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
/** * Resilient Subscriber with Proper Error Handling * * Implements retry logic, circuit breaking, and graceful degradation. */class ResilientAnalyticsSubscriber implements ISubscriber<DomainEvent> { private readonly maxRetries = 3; private readonly retryDelayMs = 1000; constructor( private readonly analyticsClient: IAnalyticsClient, private readonly deadLetterQueue: IDeadLetterQueue, private readonly circuitBreaker: ICircuitBreaker, private readonly logger: ILogger ) {} subscribedEventTypes(): string[] { return ['OrderPlaced', 'OrderShipped', 'OrderDelivered']; } async handle(event: DomainEvent): Promise<void> { // Check circuit breaker state if (!this.circuitBreaker.isAllowed()) { this.logger.warn(`Circuit open, queueing event ${event.eventId} for later`); await this.deadLetterQueue.enqueue(event, 'circuit-open'); return; } let lastError: Error | null = null; for (let attempt = 1; attempt <= this.maxRetries; attempt++) { try { await this.trackEvent(event); this.circuitBreaker.recordSuccess(); return; // Success - exit retry loop } catch (error) { lastError = error as Error; this.circuitBreaker.recordFailure(); if (attempt < this.maxRetries) { // Exponential backoff await this.sleep(this.retryDelayMs * Math.pow(2, attempt - 1)); } } } // All retries exhausted - send to dead letter queue this.logger.error(`Failed to process event ${event.eventId} after ${this.maxRetries} attempts`, lastError); await this.deadLetterQueue.enqueue(event, lastError?.message || 'unknown-error'); // Don't throw - other subscribers should continue processing // The event is safely stored in DLQ for later investigation/replay } private async trackEvent(event: DomainEvent): Promise<void> { const payload = event.payload as OrderEventPayload; await this.analyticsClient.track({ eventName: event.eventType, userId: payload.customerId, properties: { orderId: payload.orderId, amount: payload.totalAmount, currency: payload.currency, itemCount: payload.items?.length }, timestamp: event.occurredAt }); } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); }}A Dead Letter Queue (DLQ) stores events that couldn't be processed after all retries. This prevents event loss and allows for manual investigation, bug fixes, and replay. Never silently discard failed events in production systems.
The relationship between publishers and subscribers is fundamentally asymmetric and mediated by an intermediary (the event bus or message broker). This architectural boundary is what enables the pattern's key benefits.
| Characteristic | Publisher | Subscriber |
|---|---|---|
| Knowledge Direction | Knows event structure | Knows event structure |
| Knows Other Party? | No - publishes blindly | No - receives from bus |
| Cardinality | Usually 1 per event type | Many per event type |
| Timing Control | Controls when events fire | Reacts to events |
| Data Flow | Produces data | Consumes data |
| Failure Impact | Failures don't affect delivery | Failures affect only that subscriber |
A single published event can trigger multiple independent reactions. This is fundamentally different from traditional method calls, where the caller explicitly invokes each callee.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
/** * One-to-Many Broadcasting Example * * OrderService publishes once. Multiple subscribers react independently. */ // Publisher: Publishes OrderPlaced event ONCEclass OrderService { async placeOrder(command: PlaceOrderCommand): Promise<Order> { const order = await this.createOrder(command); // Single publish - doesn't know who listens this.eventBus.publish(createOrderPlacedEvent(order)); return order; }} // Subscriber 1: Sends confirmation emailclass EmailSubscriber implements ISubscriber<DomainEvent> { async handle(event: DomainEvent): Promise<void> { if (event.eventType === 'OrderPlaced') { await this.sendConfirmationEmail(event.payload); } }} // Subscriber 2: Reserves inventoryclass InventorySubscriber implements ISubscriber<DomainEvent> { async handle(event: DomainEvent): Promise<void> { if (event.eventType === 'OrderPlaced') { await this.reserveItems(event.payload.items); } }} // Subscriber 3: Tracks analyticsclass AnalyticsSubscriber implements ISubscriber<DomainEvent> { async handle(event: DomainEvent): Promise<void> { if (event.eventType === 'OrderPlaced') { await this.trackPurchase(event.payload); } }} // Subscriber 4: Notifies warehouseclass WarehouseSubscriber implements ISubscriber<DomainEvent> { async handle(event: DomainEvent): Promise<void> { if (event.eventType === 'OrderPlaced') { await this.createPickList(event.payload); } }} // Subscriber 5: Updates search indexclass SearchIndexSubscriber implements ISubscriber<DomainEvent> { async handle(event: DomainEvent): Promise<void> { if (event.eventType === 'OrderPlaced') { await this.updateCustomerOrderHistory(event.payload); } }} // Subscriber 6: Fraud detectionclass FraudDetectionSubscriber implements ISubscriber<DomainEvent> { async handle(event: DomainEvent): Promise<void> { if (event.eventType === 'OrderPlaced') { await this.analyzeForFraud(event.payload); } }} /** * RESULT: * * OrderService publishes ONE event. * SIX different subsystems react: * - Email sent * - Inventory reserved * - Analytics tracked * - Warehouse notified * - Search updated * - Fraud analyzed * * OrderService knows about NONE of these subscribers. * Adding a 7th subscriber requires ZERO changes to OrderService. */Notice that the OrderService has no idea about emails, inventory, analytics, or fraud detection. It simply announces 'an order was placed'. This is true decoupling—you can add, remove, or modify subscribers without touching the publisher.
Understanding common mistakes helps avoid them. Here are critical anti-patterns in publisher and subscriber design.
SendEmail or ReserveInventory couple publisher to subscriber actions.123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
// ❌ ANTI-PATTERN: Publisher knows about subscribersclass CoupledOrderService { constructor( private readonly emailService: EmailService, // Direct coupling! private readonly inventoryService: InventoryService, private readonly analyticsService: AnalyticsService ) {} async placeOrder(command: PlaceOrderCommand): Promise<Order> { const order = await this.createOrder(command); // Direct calls to subscribers - defeats pub-sub entirely await this.emailService.sendConfirmation(order); await this.inventoryService.reserve(order.items); await this.analyticsService.track('OrderPlaced', order); return order; }} // ❌ ANTI-PATTERN: Non-idempotent subscriberclass DangerousInventorySubscriber { async handle(event: DomainEvent): Promise<void> { // No duplicate check! If event is delivered twice: // - First delivery: reserves 5 units // - Second delivery: reserves 5 MORE units // Result: 10 units reserved instead of 5! await this.inventoryService.decrementStock( event.payload.productId, event.payload.quantity ); }} // ❌ ANTI-PATTERN: Subscriber queries publisherclass TightlyCoupledSubscriber { constructor(private readonly orderService: OrderService) {} async handle(event: DomainEvent): Promise<void> { // Event only contains orderId - must query for details // This is temporal coupling and creates a dependency const order = await this.orderService.getOrder(event.payload.orderId); // If order data changed since publish, we get wrong data! await this.processOrder(order); }} // ❌ ANTI-PATTERN: Subscriber swallows failuresclass DangerousSubscriber { async handle(event: DomainEvent): Promise<void> { try { await this.riskyOperation(event); } catch (error) { // Silently swallowed - event appears processed but wasn't! // No logging, no retry, no dead letter queue // Data is now permanently inconsistent } }}Understanding publisher and subscriber roles is foundational to building effective event-driven systems. Let's consolidate the key principles:
What's next:
With a solid understanding of publisher and subscriber roles, we'll explore the critical component that mediates between them: the Event Bus (or Dispatcher). This intermediary is what actually enables the decoupling we've discussed.
You now understand the fundamental roles in the Publish-Subscribe pattern. Publishers create and emit events without knowing who listens. Subscribers react to events without knowing who publishes them. This mutual ignorance is the source of the pattern's power.