Loading learning content...
Theory becomes valuable when applied. The patterns we've covered—point-to-point messaging, queue semantics, acknowledgment, and dead letter queues—aren't abstract concepts. They're the building blocks of systems you interact with every day.
When you upload a video and see "Processing..." — that's a message queue. When you buy a product and get an order confirmation before it ships — that's a message queue. When your bank processes overnight transactions — message queues.
In this final page, we'll explore comprehensive real-world use cases, showing how message queues solve specific problems in production systems. Each use case demonstrates pattern selection, implementation considerations, and the architectural reasoning that drives queue-based solutions.
By the end of this page, you will understand when message queues are the right solution, how they apply in e-commerce, media processing, notification systems, analytics pipelines, and inter-service communication, plus the specific patterns and configurations for each use case.
Before diving into specific use cases, let's establish the decision framework. Message queues aren't always the right solution—they add complexity. But they're essential when facing certain patterns:
Ask: 'Can this operation complete after the user request returns?' If yes, it's a candidate for queuing. If the user is staring at a loading spinner waiting for this specific operation, keep it synchronous.
E-commerce order processing is the canonical message queue use case. When a customer clicks "Place Order," multiple downstream operations must occur: payment processing, inventory reservation, fulfillment initiation, email confirmation, analytics tracking. These operations have different latencies, failure modes, and scaling characteristics.
The Problem: If order placement synchronously waits for all downstream operations, the user faces a 10+ second wait and any failure cancels the order.
The Solution: Accept the order, queue downstream work, return immediately to the user. Background workers process each concern independently.
Messaging Model: Often a combination of point-to-point and pub-sub.
Delivery Guarantee: At-least-once with idempotency. Losing an order is unacceptable. Duplicate processing must be handled.
Ordering: Orders for the same customer may need to be processed in order (FIFO with customer ID as group key) to prevent race conditions in inventory.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
// E-Commerce Order Processing Queue Architecture // Order placed - immediate response to userasync function placeOrder(orderRequest: OrderRequest): Promise<OrderResponse> { // Synchronous: Create order record const order = await db.orders.create({ id: generateOrderId(), customerId: orderRequest.customerId, items: orderRequest.items, status: 'PENDING', createdAt: new Date() }); // Asynchronous: Queue for processing await orderTopic.publish({ orderId: order.id, customerId: order.customerId, items: order.items, totalAmount: calculateTotal(order.items), timestamp: new Date().toISOString() }); // Return immediately - user sees confirmation return { orderId: order.id, status: 'RECEIVED', message: 'Your order is being processed' };} // Payment Worker (point-to-point, exactly-one processing)async function processPayment(message: OrderMessage): Promise<void> { // Idempotency check const existing = await db.payments.findByOrderId(message.orderId); if (existing) { console.log('Payment already processed, skipping'); return; } const result = await paymentGateway.charge({ orderId: message.orderId, amount: message.totalAmount, customerId: message.customerId }); await db.payments.create({ orderId: message.orderId, transactionId: result.transactionId, status: result.status }); if (result.status === 'FAILED') { // Publish failure event for compensation await orderFailedTopic.publish({ orderId: message.orderId, reason: 'PAYMENT_FAILED' }); }} // Email Worker (can be slower, less critical)async function sendConfirmationEmail(message: OrderMessage): Promise<void> { const customer = await db.customers.findById(message.customerId); await emailService.send({ to: customer.email, template: 'order-confirmation', data: { orderId: message.orderId, items: message.items, estimatedDelivery: calculateDeliveryDate() } });}| Queue/Topic | Type | Delivery | Max Retries | Notes |
|---|---|---|---|---|
| order-events | Topic (SNS) | Fan-out | N/A | Broadcasts to all consumers |
| payment-queue | FIFO Queue | Exactly-once | 3 | Critical: no duplicate charges |
| inventory-queue | FIFO Queue | At-least-once | 5 | Dedupe on order+item ID |
| notification-queue | Standard Queue | At-least-once | 3 | Higher throughput ok |
| analytics-queue | Standard Queue | At-most-once | 1 | Loss acceptable |
When users upload videos, images, or documents, processing often takes seconds to minutes. Synchronous processing would result in request timeouts and poor user experience. Queues enable upload-and-confirm workflows where processing happens in the background.
The Problem: Video transcoding, image resizing, and document conversion are CPU-intensive and time-consuming. Users can't wait for completion.
The Solution: Accept the upload, create a processing job, queue it for background workers. Notify users (via websocket/push) when complete.
Complex media processing often involves multiple stages: thumbnail generation, multiple resolution transcodes, metadata extraction. Each stage can be a separate queue, enabling pipeline parallelism.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
// Video Processing Multi-Stage Pipeline interface VideoJob { jobId: string; videoId: string; sourceUrl: string; userId: string; requestedAt: Date;} // Stage 1: Video Validation & Metadata Extractionasync function validateVideo(job: VideoJob): Promise<void> { const metadata = await ffprobe(job.sourceUrl); // Validate video meets requirements if (metadata.duration > MAX_DURATION) { await rejectVideo(job, 'Video exceeds maximum duration'); return; } // Store metadata await db.videos.update(job.videoId, { duration: metadata.duration, resolution: `${metadata.width}x${metadata.height}`, codec: metadata.codec }); // Queue for thumbnail generation and transcoding await thumbnailQueue.send({ ...job, metadata }); // Queue for each target resolution for (const resolution of ['1080p', '720p', '480p']) { await transcodeQueue.send({ ...job, metadata, targetResolution: resolution }); }} // Stage 2: Thumbnail Generation (fast, parallel)async function generateThumbnails(job: VideoJob & { metadata: VideoMetadata }): Promise<void> { const timestamps = calculateThumbnailTimestamps(job.metadata.duration, 5); const thumbnails = await Promise.all( timestamps.map(ts => ffmpeg.extractFrame(job.sourceUrl, ts) ) ); const urls = await Promise.all( thumbnails.map((thumb, i) => storage.upload(`${job.videoId}/thumb_${i}.jpg`, thumb) ) ); await db.videos.update(job.videoId, { thumbnails: urls }); await checkCompletion(job.jobId);} // Stage 3: Transcode (slow, resource-intensive)async function transcodeVideo( job: VideoJob & { targetResolution: string }): Promise<void> { // Long-running operation - extend visibility periodically const heartbeat = startHeartbeat(job.receiptHandle, 60); try { const outputPath = await ffmpeg.transcode({ input: job.sourceUrl, resolution: job.targetResolution, codec: 'h264', preset: 'medium' }); const url = await storage.upload( `${job.videoId}/${job.targetResolution}.mp4`, outputPath ); await db.videoRenditions.create({ videoId: job.videoId, resolution: job.targetResolution, url, status: 'READY' }); await checkCompletion(job.jobId); } finally { heartbeat.stop(); }} // Aggregation: Check if all stages completeasync function checkCompletion(jobId: string): Promise<void> { const job = await db.jobs.findById(jobId); const renditions = await db.videoRenditions.findByVideo(job.videoId); const expectedRenditions = 3; // 1080p, 720p, 480p const completedRenditions = renditions.filter(r => r.status === 'READY').length; if (completedRenditions === expectedRenditions) { await db.videos.update(job.videoId, { status: 'READY' }); // Notify user await notificationService.send(job.userId, { type: 'VIDEO_READY', videoId: job.videoId, message: 'Your video is ready to view' }); }}Video transcoding can take 30+ minutes. Set initial visibility timeout to 5 minutes, then extend every 2-3 minutes during processing. This prevents duplicate transcoding while allowing recovery from actual failures.
Sending notifications (email, SMS, push) is naturally asynchronous. Users don't need instant confirmation that a notification was sent—they need the notification itself to arrive eventually. Queues provide rate limiting, retry logic, and failure isolation for high-volume notification systems.
The Problem: Sending millions of notifications without overwhelming email providers, respecting rate limits, and handling delivery failures.
The Solution: Queue notifications, process at sustainable rates, retry failures with exponential backoff, dead-letter undeliverables for investigation.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
// High-Volume Notification System with Rate Limiting interface NotificationRequest { userId: string; templateId: string; channels: ('email' | 'push' | 'sms')[]; data: Record<string, unknown>; priority: 'high' | 'normal' | 'low';} // Router: Fan out to channel-specific queuesasync function routeNotification(request: NotificationRequest): Promise<void> { const user = await db.users.findById(request.userId); const template = await db.templates.findById(request.templateId); // Check user preferences and channel availability for (const channel of request.channels) { if (!user.preferences[channel]?.enabled) continue; const channelQueue = getQueueForChannel(channel); await channelQueue.send({ ...request, channel, recipient: getRecipient(user, channel), content: renderTemplate(template, channel, request.data) }); }} // Email Worker with rate limitingclass EmailWorker { private rateLimiter = new RateLimiter({ tokensPerSecond: 50, // SES limit: ~14/sec per email address bucketSize: 100 }); async process(notification: EmailNotification): Promise<void> { // Wait for rate limit token await this.rateLimiter.acquire(); try { const result = await ses.sendEmail({ Source: 'noreply@example.com', Destination: { ToAddresses: [notification.recipient] }, Message: { Subject: { Data: notification.content.subject }, Body: { Html: { Data: notification.content.html } } } }).promise(); await db.notificationLogs.create({ notificationId: notification.id, channel: 'email', status: 'SENT', providerMessageId: result.MessageId }); } catch (error) { if (isPermanentError(error)) { // Bad email, unsubscribed, etc. - don't retry await db.notificationLogs.create({ notificationId: notification.id, status: 'FAILED_PERMANENT', error: error.message }); // Optionally disable email for this user } else { // Transient error - will be retried throw error; } } }} // Batch sending for broadcastsasync function sendBroadcast( templateId: string, filters: UserFilters): Promise<BroadcastJob> { const job = await db.broadcastJobs.create({ templateId, status: 'QUEUING', startedAt: new Date() }); // Stream users matching filters, queue in batches let queued = 0; const userStream = db.users.streamByFilters(filters); for await (const userBatch of chunk(userStream, 100)) { const messages = userBatch.map(user => ({ userId: user.id, templateId, channels: user.preferences.channels, data: { broadcastJobId: job.id }, priority: 'low' as const })); await notificationQueue.sendBatch(messages); queued += messages.length; await db.broadcastJobs.update(job.id, { queued }); } await db.broadcastJobs.update(job.id, { status: 'QUEUED', totalQueued: queued }); return job;}Use separate queues or priority levels for different notification types. Password reset emails should skip ahead of marketing newsletters. Implement high/normal/low priority queues or use native priority queue features.
Analytics systems collect massive volumes of events—page views, clicks, transactions—from thousands of sources simultaneously. Direct writes to analytics databases would overwhelm them during traffic spikes. Queues buffer the firehose.
The Problem: Handle 100,000+ events per second during peak traffic without losing data or overwhelming the data warehouse.
The Solution: Queue all events immediately, batch-process into data warehouse, accept eventual consistency for analytics.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
// High-Volume Analytics Event Pipeline interface AnalyticsEvent { eventId: string; eventType: string; userId?: string; sessionId: string; timestamp: Date; properties: Record<string, unknown>; context: { userAgent: string; ip: string; page: string; };} // Collector: High-throughput event ingestionclass EventCollector { private buffer: AnalyticsEvent[] = []; private flushInterval: NodeJS.Timeout; constructor(private queue: MessageQueue) { // Batch events for efficient queue writes this.flushInterval = setInterval(() => this.flush(), 100); } track(event: AnalyticsEvent): void { this.buffer.push(event); // Flush immediately if buffer is large if (this.buffer.length >= 500) { this.flush(); } } private async flush(): Promise<void> { if (this.buffer.length === 0) return; const events = this.buffer.splice(0, 500); try { // Send as single batch message await this.queue.send({ events, collectorId: COLLECTOR_ID, batchTimestamp: new Date().toISOString() }); } catch (error) { // On failure, write to local disk for recovery await this.writeToFallback(events); console.error('Queue unavailable, wrote to fallback'); } }} // Batch Processor: Write to data warehouse in efficient batchesclass AnalyticsBatchProcessor { private eventBuffer: AnalyticsEvent[] = []; private lastFlush = Date.now(); async process(message: { events: AnalyticsEvent[] }): Promise<void> { this.eventBuffer.push(...message.events); // Write to warehouse every 10K events or 60 seconds if (this.eventBuffer.length >= 10000 || Date.now() - this.lastFlush > 60000) { await this.writeToWarehouse(); } } private async writeToWarehouse(): Promise<void> { const events = this.eventBuffer.splice(0, 10000); if (events.length === 0) return; // Batch insert to data warehouse await dataWarehouse.batchInsert('events', events, { partitionKey: 'timestamp', format: 'parquet' }); this.lastFlush = Date.now(); metrics.increment('events_written', events.length); }} // Stream Processor: Real-time aggregationsclass RealTimeProcessor { private counters = new Map<string, number>(); async process(message: { events: AnalyticsEvent[] }): Promise<void> { for (const event of message.events) { // Real-time counters const key = `${event.eventType}:${getMinuteBucket(event.timestamp)}`; this.counters.set(key, (this.counters.get(key) || 0) + 1); } // Periodically flush to real-time store await this.flushCounters(); } private async flushCounters(): Promise<void> { for (const [key, count] of this.counters) { await redis.incrBy(`analytics:${key}`, count); } this.counters.clear(); }}| Concern | Configuration | Rationale |
|---|---|---|
| Delivery | At-most-once | Minor data loss acceptable for analytics; prioritize throughput |
| Retention | 24-48 hours | Enough time to recover from processing outages |
| Batching | 500 events/msg | Reduce queue overhead, increase write efficiency |
| Ordering | Not required | Analytics aggregations are order-independent |
| DLQ | Optional | Dead-letter for investigation but don't block pipeline |
Many applications need to run tasks at specific times or after delays: sending reminder emails 24 hours after signup, expiring temporary data, or processing scheduled reports. Queues with delay capabilities handle this elegantly.
The Problem: Cron jobs are single points of failure and don't distribute work. Database polling is inefficient and hard to scale.
The Solution: Queue tasks with delay/schedule time. Workers pick up tasks when their scheduled time arrives.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
// Delayed and Scheduled Task System interface ScheduledTask { taskId: string; taskType: string; payload: unknown; scheduledFor: Date; retryCount: number; maxRetries: number;} // Scheduling a delayed taskasync function scheduleReminder(userId: string, eventId: string): Promise<void> { const event = await db.events.findById(eventId); const reminderTime = new Date(event.startTime.getTime() - 24 * 60 * 60 * 1000); await taskQueue.send({ taskId: generateTaskId(), taskType: 'send-event-reminder', payload: { userId, eventId }, scheduledFor: reminderTime, retryCount: 0, maxRetries: 3 }, { // AWS SQS: DelaySeconds, max 15 minutes // For longer delays, use SQS + DynamoDB pattern or Step Functions delaySeconds: calculateDelay(reminderTime) });} // Pattern for delays > 15 minutes (AWS SQS limitation)// Use a delay loop queue or scheduled re-queueasync function scheduleWithLongDelay(task: ScheduledTask): Promise<void> { const delay = task.scheduledFor.getTime() - Date.now(); const maxDelay = 15 * 60 * 1000; // 15 minutes in ms if (delay <= maxDelay) { // Within SQS delay limit, schedule directly await taskQueue.send(task, { delaySeconds: Math.floor(delay / 1000) }); } else { // Store in DynamoDB, schedule checkpoint await db.scheduledTasks.create(task); // Queue a "check" message that will re-evaluate await delayCheckQueue.send({ taskId: task.taskId }, { delaySeconds: maxDelay / 1000 }); }} // Delay check workerasync function checkDelayedTask(message: { taskId: string }): Promise<void> { const task = await db.scheduledTasks.findById(message.taskId); if (!task || task.status === 'cancelled') return; const now = Date.now(); const delay = task.scheduledFor.getTime() - now; if (delay <= 0) { // Time to execute await executeTaskQueue.send(task); await db.scheduledTasks.delete(task.taskId); } else { // Re-schedule the check await scheduleWithLongDelay(task); }} // Task execution with retry logicasync function executeTask(task: ScheduledTask): Promise<void> { const handler = taskHandlers[task.taskType]; if (!handler) { console.error('Unknown task type:', task.taskType); return; } try { await handler(task.payload); metrics.increment('tasks_completed', { type: task.taskType }); } catch (error) { if (task.retryCount < task.maxRetries) { // Exponential backoff retry const backoffSeconds = Math.pow(2, task.retryCount) * 60; await taskQueue.send({ ...task, retryCount: task.retryCount + 1 }, { delaySeconds: backoffSeconds }); } else { // Max retries exceeded await dlq.send({ task, error: error.message }); metrics.increment('tasks_failed', { type: task.taskType }); } }}For complex scheduling needs, consider AWS Step Functions, Azure Durable Functions, or Temporal. They handle delays, retries, and state management natively. Queues with delays work for simple cases; workflow engines excel at complex orchestration.
In microservices architectures, services must communicate while remaining loosely coupled. Synchronous HTTP calls create tight coupling and propagate failures. Message queues enable event-driven integration where services react to events without direct dependencies.
The Problem: Service A calling Service B synchronously means A fails when B fails. Changes to B's API break A.
The Solution: Service A publishes events. Service B subscribes to events it cares about. Neither knows about the other directly.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
// Event-Driven Microservices Integration // Domain Events - Shared contractinterface DomainEvent { eventId: string; eventType: string; aggregateId: string; aggregateType: string; occurredAt: Date; version: number; payload: unknown;} interface OrderCreatedEvent extends DomainEvent { eventType: 'OrderCreated'; aggregateType: 'Order'; payload: { customerId: string; items: OrderItem[]; totalAmount: number; shippingAddress: Address; };} // Order Service: Publishes eventsclass OrderService { async createOrder(command: CreateOrderCommand): Promise<Order> { // Business logic const order = Order.create(command); await this.orderRepository.save(order); // Publish event (outbox pattern for reliability) const event: OrderCreatedEvent = { eventId: generateEventId(), eventType: 'OrderCreated', aggregateId: order.id, aggregateType: 'Order', occurredAt: new Date(), version: 1, payload: { customerId: order.customerId, items: order.items, totalAmount: order.total, shippingAddress: order.shippingAddress } }; await this.eventPublisher.publish('order-events', event); return order; }} // Inventory Service: Subscribes and reactsclass InventoryEventHandler { @EventHandler('OrderCreated') async onOrderCreated(event: OrderCreatedEvent): Promise<void> { // Reserve inventory for the order for (const item of event.payload.items) { await this.inventoryService.reserve({ orderId: event.aggregateId, productId: item.productId, quantity: item.quantity }); } // Publish our own event await this.eventPublisher.publish('inventory-events', { eventType: 'InventoryReserved', aggregateId: event.aggregateId, payload: { success: true } }); } @EventHandler('OrderCancelled') async onOrderCancelled(event: OrderCancelledEvent): Promise<void> { // Release previously reserved inventory await this.inventoryService.releaseReservation(event.aggregateId); }} // Shipping Service: Subscribes to multiple event typesclass ShippingEventHandler { @EventHandler('PaymentCompleted') async onPaymentCompleted(event: PaymentCompletedEvent): Promise<void> { // Payment done, create shipment const order = await this.orderClient.getOrder(event.aggregateId); await this.shippingService.createShipment({ orderId: event.aggregateId, address: order.shippingAddress, items: order.items }); }} // Event subscription setupasync function setupEventSubscriptions(): Promise<void> { // Each service has its own queue subscribed to shared topics await eventBus.subscribe('order-events', 'inventory-service-queue', { filter: { eventType: ['OrderCreated', 'OrderCancelled'] } }); await eventBus.subscribe('order-events', 'notification-service-queue', { filter: { eventType: ['OrderCreated', 'OrderShipped'] } }); await eventBus.subscribe('payment-events', 'shipping-service-queue', { filter: { eventType: ['PaymentCompleted'] } });}Message queues are the backbone of reliable, scalable distributed systems. They appear wherever work can be deferred, load needs buffering, or services need decoupling.
| Use Case | Primary Pattern | Key Configuration | Success Metric |
|---|---|---|---|
| E-Commerce Orders | Fan-out + P2P | At-least-once, FIFO per customer | Zero lost orders |
| Media Processing | Pipeline stages | Long visibility, heartbeat | Completion rate > 99.9% |
| Notifications | Priority queues | Rate limiting, retry backoff | Delivery latency SLA |
| Analytics | High-throughput batch | At-most-once, minimal latency | Events/second at peak |
| Scheduled Tasks | Delayed delivery | Long delays, idempotency | Tasks executed on time |
| Microservices | Event-driven | Outbox pattern, filtering | Service independence |
You've completed the Message Queues module! You now understand point-to-point messaging, queue semantics, acknowledgment patterns, dead letter queues, and real-world applications. These patterns form the foundation for building reliable, scalable asynchronous systems.