Loading learning content...
The publish-subscribe pattern isn't just an academic concept—it's the backbone of modern distributed systems at companies like Netflix, Uber, LinkedIn, and Shopify. Understanding where and why pub-sub excels helps you recognize opportunities to apply it in your own systems.
This page explores the most impactful use cases for pub-sub, examining real-world architectures and the specific characteristics that make pub-sub the right choice. We'll move from common patterns to advanced applications, building a comprehensive understanding of when and how to leverage this powerful paradigm.
By the end of this page, you will understand how pub-sub enables event-driven microservices, real-time analytics pipelines, CQRS architectures, notification systems, system integration, and IoT data ingestion. You'll see concrete examples and learn to identify when pub-sub is the right solution.
The most foundational use case for pub-sub: enabling loosely coupled microservices that communicate through events rather than direct API calls. This architecture is the foundation of companies like Uber (using Kafka for ride events) and Shopify (using Kafka for order processing).
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
// Event-Driven Order Processing Architecture // Domain Events: The contract between servicesinterface OrderCreatedEvent { eventType: 'ORDER_CREATED'; orderId: string; customerId: string; items: Array<{ sku: string; quantity: number; price: number }>; totalAmount: number; shippingAddress: Address; createdAt: string;} interface PaymentCompletedEvent { eventType: 'PAYMENT_COMPLETED'; orderId: string; paymentId: string; amount: number; method: 'card' | 'paypal' | 'bank'; completedAt: string;} interface FulfillmentStartedEvent { eventType: 'FULFILLMENT_STARTED'; orderId: string; warehouseId: string; estimatedShipDate: string; trackingNumber?: string;} // Order Service: Publishes order events (knows nothing about consumers)class OrderService { async createOrder(orderData: CreateOrderRequest): Promise<Order> { const order = await this.orderRepository.create(orderData); // Publish event - fire and forget await this.eventBus.publish('order-events', { eventType: 'ORDER_CREATED', orderId: order.id, customerId: order.customerId, items: order.items, totalAmount: order.totalAmount, shippingAddress: order.shippingAddress, createdAt: new Date().toISOString(), }); return order; // Return immediately; downstream processing is async }} // Payment Service: Subscribes to order events, publishes payment eventsclass PaymentEventHandler { @Subscribe('order-events', { filter: "eventType = 'ORDER_CREATED'" }) async handleOrderCreated(event: OrderCreatedEvent): Promise<void> { // Get saved payment method for customer const paymentMethod = await this.paymentMethodService.getDefault(event.customerId); // Process payment const payment = await this.paymentGateway.charge({ customerId: event.customerId, amount: event.totalAmount, method: paymentMethod, orderId: event.orderId, }); // Publish payment result await this.eventBus.publish('payment-events', { eventType: 'PAYMENT_COMPLETED', orderId: event.orderId, paymentId: payment.id, amount: event.totalAmount, method: paymentMethod.type, completedAt: new Date().toISOString(), }); }} // Fulfillment Service: Subscribes to payment eventsclass FulfillmentEventHandler { @Subscribe('payment-events', { filter: "eventType = 'PAYMENT_COMPLETED'" }) async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> { const order = await this.orderService.getById(event.orderId); // Find optimal warehouse and reserve inventory const warehouse = await this.inventoryService.reserveAndAllocate(order.items); // Start fulfillment process await this.fulfillmentService.start({ orderId: event.orderId, warehouseId: warehouse.id, items: order.items, shippingAddress: order.shippingAddress, }); }}This example shows choreography: services react to events independently. For complex workflows with error handling and compensation, consider orchestration (e.g., saga pattern) where a central coordinator manages the flow. Pub-sub supports both patterns.
Pub-sub is the de facto standard for feeding real-time analytics systems. Every user interaction, system event, and business transaction can flow through a pub-sub backbone to multiple analytics destinations simultaneously.
Why Pub-Sub for Analytics:
Common Analytics Event Schema (Example):
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
// Unified analytics event schema for multi-destination processing interface AnalyticsEvent { // Event identification eventId: string; // Unique, idempotent key eventType: string; // page_view, click, purchase, etc. eventVersion: string; // v1.2 - for schema evolution // Timing timestamp: string; // ISO 8601, event occurrence time processedAt?: string; // When ingested into pipeline // User context userId?: string; // Authenticated user ID anonymousId: string; // Device/session identifier sessionId: string; // Current session // Device/environment platform: 'web' | 'ios' | 'android' | 'api'; userAgent?: string; ipAddress?: string; // May be hashed for privacy geolocation?: { country: string; region?: string; city?: string }; // Event payload (varies by eventType) properties: Record<string, unknown>; // Context for analysis context: { page?: { url: string; title: string; referrer?: string }; campaign?: { source: string; medium: string; name: string }; experiment?: { name: string; variant: string }; };} // Example: Page view eventconst pageViewEvent: AnalyticsEvent = { eventId: 'evt_7k3j4h5g', eventType: 'page_view', eventVersion: 'v2.1', timestamp: '2024-01-15T14:30:00.123Z', userId: 'user_abc123', anonymousId: 'anon_xyz789', sessionId: 'sess_456def', platform: 'web', userAgent: 'Mozilla/5.0...', geolocation: { country: 'US', region: 'CA', city: 'San Francisco' }, properties: { pageType: 'product', productId: 'prod_123', productCategory: 'electronics', }, context: { page: { url: '/products/123', title: 'Widget Pro', referrer: '/search' }, campaign: { source: 'google', medium: 'cpc', name: 'spring_sale' }, experiment: { name: 'checkout_redesign', variant: 'B' }, },};Use a schema registry (Confluent Schema Registry, AWS Glue) to manage analytics event schemas. This enables schema validation at publish time, automatic schema evolution, and compatibility checking when consumers upgrade. Critical for pipelines processing billions of events.
Command Query Responsibility Segregation (CQRS) separates read and write models. Event Sourcing stores state changes as events rather than current state. Pub-sub is the natural connective tissue for both patterns, enabling write-side events to propagate to read-side projections.
How It Works:
Why Pub-Sub Enables CQRS:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
// CQRS: Multiple read projections from one event stream // Domain events (published after command handling)type ProductEvent = | { type: 'PRODUCT_CREATED'; productId: string; name: string; price: number; category: string } | { type: 'PRODUCT_UPDATED'; productId: string; name?: string; price?: number } | { type: 'PRODUCT_STOCK_CHANGED'; productId: string; stockDelta: number } | { type: 'PRODUCT_DISCONTINUED'; productId: string }; // Projection 1: Product List (for catalog browsing)class ProductListProjection { private db: PostgreSQL; async handleEvent(event: ProductEvent): Promise<void> { switch (event.type) { case 'PRODUCT_CREATED': await this.db.query(` INSERT INTO product_list (id, name, price, category, stock, active) VALUES ($1, $2, $3, $4, 0, true) `, [event.productId, event.name, event.price, event.category]); break; case 'PRODUCT_UPDATED': const updates = []; if (event.name) updates.push(`name = '${event.name}'`); if (event.price) updates.push(`price = ${event.price}`); await this.db.query(` UPDATE product_list SET ${updates.join(', ')} WHERE id = $1 `, [event.productId]); break; case 'PRODUCT_STOCK_CHANGED': await this.db.query(` UPDATE product_list SET stock = stock + $1 WHERE id = $2 `, [event.stockDelta, event.productId]); break; case 'PRODUCT_DISCONTINUED': await this.db.query(` UPDATE product_list SET active = false WHERE id = $1 `, [event.productId]); break; } }} // Projection 2: Search Index (for full-text search)class ProductSearchProjection { private elastic: ElasticsearchClient; async handleEvent(event: ProductEvent): Promise<void> { switch (event.type) { case 'PRODUCT_CREATED': await this.elastic.index({ index: 'products', id: event.productId, body: { name: event.name, price: event.price, category: event.category, active: true, }, }); break; case 'PRODUCT_UPDATED': await this.elastic.update({ index: 'products', id: event.productId, body: { doc: { name: event.name, price: event.price } }, }); break; case 'PRODUCT_DISCONTINUED': await this.elastic.update({ index: 'products', id: event.productId, body: { doc: { active: false } }, }); break; } }} // Projection 3: Cache (for fast product lookups)class ProductCacheProjection { private redis: RedisClient; async handleEvent(event: ProductEvent): Promise<void> { const key = `product:${event.productId}`; switch (event.type) { case 'PRODUCT_CREATED': await this.redis.hset(key, { name: event.name, price: event.price.toString(), category: event.category, }); break; case 'PRODUCT_DISCONTINUED': await this.redis.del(key); break; // Stock and price updates handled similarly } }}When a single event needs to trigger multiple notification channels—email, push notification, SMS, in-app message, Slack—pub-sub provides the clean abstraction for multi-channel delivery without coupling the origin system to delivery mechanisms.
Notification System Architecture:
Key Design Decisions:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
// Multi-Channel Notification System with Pub-Sub interface NotificationRequest { notificationId: string; type: 'order_shipped' | 'security_alert' | 'social_activity' | 'marketing'; userId: string; priority: 'urgent' | 'high' | 'normal' | 'low'; data: Record<string, unknown>; scheduledFor?: string; // Optional delayed delivery} // Origin: Order Service publishes notification requestasync function notifyOrderShipped(order: Order): Promise<void> { await pubsub.publish('notification-requests', { notificationId: `notif_${order.id}_shipped`, type: 'order_shipped', userId: order.customerId, priority: 'high', data: { orderId: order.id, trackingNumber: order.trackingNumber, carrier: order.carrier, estimatedDelivery: order.estimatedDelivery, items: order.items.map(i => ({ name: i.name, quantity: i.quantity })), }, }); // Origin service done - doesn't know about emails, push, etc.} // Notification Router: Determines channels and dispatchesclass NotificationRouter { @Subscribe('notification-requests') async handleNotification(request: NotificationRequest): Promise<void> { // 1. Look up user preferences const prefs = await this.userService.getNotificationPreferences(request.userId); const user = await this.userService.getById(request.userId); // 2. Apply template for each channel const template = this.templateEngine.getTemplate(request.type); // 3. Dispatch to enabled channels const dispatches: Promise<void>[] = []; if (prefs.email.enabled && this.shouldSend(prefs.email, request)) { dispatches.push(this.publishToChannel('email', { ...request, to: user.email, subject: template.emailSubject(request.data), body: template.emailBody(request.data), })); } if (prefs.push.enabled && user.pushTokens?.length > 0) { for (const token of user.pushTokens) { dispatches.push(this.publishToChannel('push', { ...request, token, title: template.pushTitle(request.data), body: template.pushBody(request.data), deepLink: template.deepLink(request.data), })); } } if (prefs.sms.enabled && request.priority === 'urgent') { dispatches.push(this.publishToChannel('sms', { ...request, phone: user.phone, message: template.smsMessage(request.data), })); } await Promise.all(dispatches); } private async publishToChannel(channel: string, payload: any): Promise<void> { await pubsub.publish(`notifications.${channel}`, payload); }} // Channel Handler: Emailclass EmailNotificationHandler { @Subscribe('notifications.email') async handleEmail(notification: EmailNotification): Promise<void> { await this.sendGrid.send({ to: notification.to, subject: notification.subject, html: notification.body, }); await this.trackingService.recordDelivery({ notificationId: notification.notificationId, channel: 'email', status: 'delivered', }); }}When integrating disparate systems—legacy applications, third-party SaaS, partner systems, different cloud environments—pub-sub serves as a universal API that enables asynchronous, resilient communication without tight coupling.
Define a canonical event model that represents domain concepts (Customer, Order, Product) independent of any specific system's schema. Adapters translate between system-specific formats and canonical events. This prevents your event bus from becoming polluted with system-specific quirks.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
// Event Bridge: Salesforce Adapter Example // Canonical customer event (system-agnostic)interface CustomerUpdatedEvent { eventType: 'CUSTOMER_UPDATED'; source: string; customerId: string; timestamp: string; changes: { email?: string; phone?: string; address?: { street: string; city: string; state: string; country: string; postalCode: string; }; segment?: 'enterprise' | 'midmarket' | 'smb'; };} // Salesforce-specific webhook payloadinterface SalesforceAccountUpdate { attributes: { type: 'Account'; url: string }; Id: string; Name: string; BillingStreet: string; BillingCity: string; BillingState: string; BillingCountry: string; BillingPostalCode: string; Phone: string; Industry: string; Type: string; // Enterprise, Channel Partner, etc.} class SalesforceAdapter { // Inbound: Salesforce webhook → Canonical event async handleSalesforceWebhook(payload: SalesforceAccountUpdate): Promise<void> { const canonicalEvent: CustomerUpdatedEvent = { eventType: 'CUSTOMER_UPDATED', source: 'salesforce', customerId: this.mapSalesforceIdToCanonical(payload.Id), timestamp: new Date().toISOString(), changes: { phone: payload.Phone, address: { street: payload.BillingStreet, city: payload.BillingCity, state: payload.BillingState, country: payload.BillingCountry, postalCode: payload.BillingPostalCode, }, segment: this.mapSalesforceTypeToSegment(payload.Type), }, }; await this.eventBus.publish('customer-events', canonicalEvent); } // Outbound: Canonical event → Salesforce API @Subscribe('customer-events', { filter: "source != 'salesforce'" }) async syncToSalesforce(event: CustomerUpdatedEvent): Promise<void> { // Don't echo back events that originated from Salesforce const salesforceId = this.mapCanonicalIdToSalesforce(event.customerId); await this.salesforceClient.updateAccount(salesforceId, { Phone: event.changes.phone, BillingStreet: event.changes.address?.street, BillingCity: event.changes.address?.city, // ... map other fields }); }}Internet of Things (IoT) deployments generate massive volumes of telemetry data from distributed devices. Pub-sub provides the scalable ingestion layer that buffers, routes, and processes this firehose of data.
IoT Ingestion Challenges Solved by Pub-Sub:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
// IoT Telemetry Processing Pipeline interface TelemetryEvent { deviceId: string; deviceType: 'sensor' | 'vehicle' | 'meter' | 'wearable'; timestamp: string; readings: { metric: string; value: number; unit: string; }[]; metadata: { firmwareVersion: string; batteryLevel?: number; signalStrength?: number; };} // Stream Processor: Real-time aggregation and routingclass TelemetryProcessor { @Subscribe('telemetry.raw') async processTelemetry(event: TelemetryEvent): Promise<void> { // Enrich with device metadata const device = await this.deviceRegistry.get(event.deviceId); // Check for anomalies for (const reading of event.readings) { const threshold = await this.thresholdService.get(device.type, reading.metric); if (reading.value > threshold.critical) { // Publish to alerts topic await this.eventBus.publish('alerts.critical', { alertId: generateId(), deviceId: event.deviceId, metric: reading.metric, value: reading.value, threshold: threshold.critical, severity: 'critical', timestamp: event.timestamp, }); } } // Aggregate for time-series storage const aggregatedReading = { deviceId: event.deviceId, timestamp: event.timestamp, readings: event.readings.reduce((acc, r) => ({ ...acc, [r.metric]: r.value, }), {}), }; await this.timeSeriesDb.write(aggregatedReading); // Publish processed event for other consumers await this.eventBus.publish('telemetry.processed', { ...event, deviceMeta: device, processedAt: new Date().toISOString(), }); }} // Anomaly Detection: Subscribes to processed telemetryclass AnomalyDetector { @Subscribe('telemetry.processed') async detectAnomalies(event: TelemetryEvent): Promise<void> { // Run ML model for predictive maintenance const prediction = await this.mlModel.predict({ deviceId: event.deviceId, readings: event.readings, historicalAvg: await this.getHistoricalAverages(event.deviceId), }); if (prediction.maintenanceNeeded && prediction.confidence > 0.85) { await this.eventBus.publish('maintenance.predicted', { deviceId: event.deviceId, predictedFailureDate: prediction.estimatedFailureDate, confidence: prediction.confidence, recommendedAction: prediction.action, }); } }}Pub-sub is powerful, but it's not the right tool for every situation. Understanding its limitations helps you avoid misapplying the pattern.
| Scenario | Recommended Pattern | Why |
|---|---|---|
| Get user profile by ID | Synchronous API | Need immediate response; simple query |
| Process order asynchronously | Pub-Sub | Multiple consumers; resilience needed |
| Distribute image resize jobs | Message Queue | Single consumer group; work distribution |
| Real-time game state updates | WebSocket / direct | Sub-millisecond latency required |
| Batch ETL processing | Pub-Sub (replay) | Large volume; replay for recovery |
| Two-phase commit transaction | Orchestrated calls | Atomic guarantee required |
Most real systems use multiple patterns. Synchronous APIs for user-facing queries, pub-sub for cross-service integration, queues for background job processing. Choose the right tool for each specific communication need rather than forcing everything into one pattern.
We've explored the major production use cases for publish-subscribe messaging. Let's consolidate the key insights:
Module Complete:
You've now completed the comprehensive study of publish-subscribe messaging. From the foundational concepts of one-to-many messaging through topics, subscriptions, fan-out patterns, filtering, and real-world use cases, you have the knowledge to design and implement robust pub-sub architectures for any scale.
Congratulations! You've mastered the publish-subscribe pattern—one of the most important building blocks of modern distributed systems. From one-to-many messaging to topics and subscriptions, fan-out patterns, message filtering, and production use cases, you now have the conceptual foundation to design event-driven architectures that scale.