Loading learning content...
The Producer-Consumer pattern is one of the most ubiquitous patterns in software engineering. Once you learn to recognize it, you'll see it everywhere—from the operating system kernel to cloud-scale distributed systems. This page explores concrete, real-world examples across diverse domains.
For each use case, we'll examine:
These examples will cement your understanding and help you identify when to apply this pattern in your own work.
This page covers real-world Producer-Consumer applications: web server request handling, logging systems, data pipelines, streaming media, and event-driven architectures. Each example illustrates specific pattern variations and design considerations.
Every web server implements the Producer-Consumer pattern. The network stack accepts incoming connections (producer), and worker threads process requests (consumers). This is perhaps the most common real-world application of the pattern.
| Component | Role | Key Considerations |
|---|---|---|
| Accept Thread | Producer - accepts incoming TCP connections | Single thread, very fast, never blocks on workers |
| Connection Queue | Buffer - holds pending connections | Bounded to prevent memory exhaustion under DDoS |
| Worker Pool | Consumers - process HTTP requests | Fixed or elastic pool size based on load |
| Response Path | Return path to client | May use separate buffers for writes |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
/** * Simplified Web Server using Producer-Consumer Pattern * * Demonstrates the core structure of production servers * like Nginx, Tomcat, or Node.js worker threads. */import net from 'net';import { Worker } from 'worker_threads'; class WebServer { private readonly requestQueue: BlockingQueue<net.Socket>; private readonly workers: Worker[]; private readonly acceptSocket: net.Server; constructor( private readonly port: number, private readonly workerCount: number, private readonly maxPendingConnections: number ) { // Bounded queue prevents memory exhaustion under load this.requestQueue = new BlockingQueue<net.Socket>(maxPendingConnections); // Create worker pool (consumers) this.workers = Array.from({ length: workerCount }, (_, i) => { const worker = new Worker('./request-handler.js', { workerData: { workerId: i } }); this.startConsumer(worker); return worker; }); // Create accept socket (producer) this.acceptSocket = net.createServer(); } start(): void { // Producer: Accept incoming connections this.acceptSocket.on('connection', async (socket) => { try { // Attempt to queue the connection // This blocks if queue is full (backpressure!) const queued = await this.requestQueue.offer(socket, 5000); if (!queued) { // Queue full - reject connection gracefully socket.write('HTTP/1.1 503 Service Unavailable\r\n\r\n'); socket.end(); console.log('Connection rejected: queue full'); } } catch (error) { socket.destroy(); } }); this.acceptSocket.listen(this.port, () => { console.log(`Server listening on port ${this.port}`); console.log(`Worker pool size: ${this.workerCount}`); console.log(`Max pending connections: ${this.maxPendingConnections}`); }); } private startConsumer(worker: Worker): void { // Consumer: Process requests from queue const processLoop = async () => { while (true) { // Take next connection from queue (blocks if empty) const socket = await this.requestQueue.take(); // Parse and handle the HTTP request try { await this.handleRequest(socket, worker); } catch (error) { console.error('Request handling failed:', error); socket.destroy(); } } }; processLoop(); } private async handleRequest(socket: net.Socket, worker: Worker): Promise<void> { // Read HTTP request const request = await this.readRequest(socket); // Process in worker thread const response = await this.processInWorker(worker, request); // Send response socket.write(`HTTP/1.1 200 OK\r\nContent-Length: ${response.length}\r\n\r\n`); socket.write(response); socket.end(); }} // Configuration for different scenarios: // High-concurrency API serverconst apiServer = new WebServer( 8080, // port 100, // workers (matches CPU cores * 10 for I/O bound) 10000 // queue size (handle bursts)); // Low-latency serviceconst lowLatencyServer = new WebServer( 8081, 16, // fewer workers to reduce context switching 100 // small queue to fail fast);Production web servers use bounded queues specifically for backpressure. When the queue fills, new connections get 503 errors or the accept thread blocks. This is intentional—it's better to reject requests cleanly than to accept them and crash from memory exhaustion.
Logging is a perfect Producer-Consumer application. Application code (often in critical paths) produces log entries, and a background consumer writes them to files, networks, or aggregation services. This decoupling is essential for performance.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
/** * Asynchronous Logger with Producer-Consumer Pattern * * Log calls never block application code. * Background thread handles disk I/O. */interface LogEntry { timestamp: Date; level: 'DEBUG' | 'INFO' | 'WARN' | 'ERROR'; message: string; context?: Record<string, any>;} class AsyncLogger { private readonly buffer: CircularBuffer<LogEntry>; private readonly writeStream: NodeJS.WritableStream; private running = true; // Metrics private logsWritten = 0; private logsDropped = 0; constructor( filePath: string, bufferSize: number = 10000 ) { this.buffer = new CircularBuffer<LogEntry>(bufferSize); this.writeStream = fs.createWriteStream(filePath, { flags: 'a' }); // Start consumer thread this.startConsumer(); } // Producer: Application calls this (synchronous, non-blocking) log(level: LogEntry['level'], message: string, context?: Record<string, any>): void { const entry: LogEntry = { timestamp: new Date(), level, message, context }; // Try to add to buffer (non-blocking) const success = this.buffer.tryProduce(entry); if (!success) { // Buffer full - drop log entry (acceptable for logs) this.logsDropped++; // In production, might want to increment a metric // or write to stderr every Nth drop if (this.logsDropped % 1000 === 0) { console.error(`Logger dropped ${this.logsDropped} entries`); } } } // Convenience methods debug(message: string, context?: Record<string, any>): void { this.log('DEBUG', message, context); } info(message: string, context?: Record<string, any>): void { this.log('INFO', message, context); } warn(message: string, context?: Record<string, any>): void { this.log('WARN', message, context); } error(message: string, context?: Record<string, any>): void { this.log('ERROR', message, context); } // Consumer: Background writer private async startConsumer(): Promise<void> { const batchSize = 100; const maxWaitMs = 100; // Flush at least every 100ms while (this.running) { const batch: LogEntry[] = []; const deadline = Date.now() + maxWaitMs; // Collect batch of entries while (batch.length < batchSize && Date.now() < deadline) { const entry = await this.buffer.tryConsumeWithTimeout( deadline - Date.now() ); if (entry) { batch.push(entry); } } // Write batch to file if (batch.length > 0) { await this.writeBatch(batch); } } } private async writeBatch(batch: LogEntry[]): Promise<void> { // Format all entries const lines = batch.map(entry => `[${entry.timestamp.toISOString()}] ${entry.level}: ${entry.message}` + (entry.context ? ` ${JSON.stringify(entry.context)}` : '') ).join('\n') + '\n'; // Single write for entire batch (efficient!) await new Promise<void>((resolve, reject) => { this.writeStream.write(lines, (err) => { if (err) reject(err); else { this.logsWritten += batch.length; resolve(); } }); }); } // Graceful shutdown - flush remaining logs async shutdown(): Promise<void> { this.running = false; // Drain remaining entries while (true) { const entry = this.buffer.tryConsume(); if (!entry) break; await this.writeBatch([entry]); } this.writeStream.end(); console.log(`Logger shutdown: ${this.logsWritten} written, ${this.logsDropped} dropped`); }} // Usage in application code:const logger = new AsyncLogger('/var/log/app.log', 50000); // These never block, even during disk I/O stallslogger.info('Request received', { path: '/api/users', method: 'GET' });logger.debug('Database query', { sql: 'SELECT...', duration: 45 });logger.error('Request failed', { error: 'Connection reset' });Async loggers can lose log entries during crashes or ungraceful shutdowns. Critical logs (errors, audit trails) may need synchronous writing or flush-on-write. Most loggers offer per-message control over sync vs async.
Data pipelines chain multiple Producer-Consumer stages together. Each stage consumes from the previous stage's output queue and produces to the next stage's input queue. This creates a pipeline architecture that enables parallel processing and decoupled scaling.
A typical ETL (Extract-Transform-Load) pipeline:
| Stage | Consumes From | Produces To | Parallelism |
|---|---|---|---|
| Extract | External source (database, API) | Raw data queue | Limited by source |
| Validate | Raw data queue | Valid records queue + error queue | Highly parallel |
| Transform | Valid records queue | Transformed data queue | CPU-bound, parallel |
| Enrich | Transformed data queue | Enriched data queue | I/O-bound (lookups) |
| Load | Enriched data queue | Destination (database, warehouse) | Limited by destination |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
/** * Chained Producer-Consumer Pipeline * * Each stage is independently scalable. * Buffers between stages handle rate mismatches. */interface PipelineStage<TIn, TOut> { name: string; process(item: TIn): Promise<TOut | null>; // null = filter out parallelism: number;} class Pipeline<TIn, TOut> { private stages: Array<{ stage: PipelineStage<any, any>; inputQueue: BlockingQueue<any>; workers: Promise<void>[]; }> = []; private running = true; constructor( private inputQueue: BlockingQueue<TIn>, private outputQueue: BlockingQueue<TOut>, private bufferSize: number = 1000 ) {} addStage<TMid>(stage: PipelineStage<any, TMid>): Pipeline<TIn, TOut> { // Create queue between this stage and next const outputQueue = new BlockingQueue<TMid>(this.bufferSize); // Previous stage's output is this stage's input const inputQueue = this.stages.length > 0 ? this.stages[this.stages.length - 1].inputQueue : this.inputQueue; this.stages.push({ stage, inputQueue: outputQueue, // This stage writes to outputQueue workers: [] }); return this; } async start(): Promise<void> { // Start workers for each stage for (let i = 0; i < this.stages.length; i++) { const { stage, inputQueue } = this.stages[i]; // Previous stage's output queue is this stage's input const stageInputQueue = i === 0 ? this.inputQueue : this.stages[i - 1].inputQueue; // Last stage outputs to final output queue const stageOutputQueue = i === this.stages.length - 1 ? this.outputQueue : inputQueue; // Start parallel workers for this stage for (let w = 0; w < stage.parallelism; w++) { this.stages[i].workers.push( this.runWorker(stage, stageInputQueue, stageOutputQueue, w) ); } console.log(`Started ${stage.parallelism} workers for stage: ${stage.name}`); } } private async runWorker<TStageIn, TStageOut>( stage: PipelineStage<TStageIn, TStageOut>, inputQueue: BlockingQueue<TStageIn>, outputQueue: BlockingQueue<TStageOut>, workerId: number ): Promise<void> { while (this.running) { try { const item = await inputQueue.take(); const result = await stage.process(item); if (result !== null) { await outputQueue.put(result); } } catch (error) { console.error(`[${stage.name}] Worker ${workerId} error:`, error); } } }} // Example: Order Processing Pipelineinterface RawOrder { id: string; data: string; }interface ValidatedOrder { id: string; items: Item[]; customer: Customer; }interface EnrichedOrder extends ValidatedOrder { inventory: InventoryStatus[]; }interface ProcessedOrder extends EnrichedOrder { status: 'ready' | 'backordered'; } const orderPipeline = new Pipeline<RawOrder, ProcessedOrder>( rawOrderQueue, // Input: raw orders from API processedQueue, // Output: ready for fulfillment 5000 // Buffer size between stages); orderPipeline .addStage<ValidatedOrder>({ name: 'Validate', parallelism: 4, // CPU-bound, match cores async process(raw: RawOrder) { const parsed = JSON.parse(raw.data); if (!isValidOrder(parsed)) return null; // Filter invalid return { id: raw.id, ...parsed }; } }) .addStage<EnrichedOrder>({ name: 'Enrich', parallelism: 16, // I/O-bound (inventory lookup), more parallelism async process(order: ValidatedOrder) { const inventory = await inventoryService.check(order.items); return { ...order, inventory }; } }) .addStage<ProcessedOrder>({ name: 'Process', parallelism: 4, async process(order: EnrichedOrder) { const allAvailable = order.inventory.every(i => i.available); return { ...order, status: allAvailable ? 'ready' : 'backordered' }; } }); await orderPipeline.start();Video and audio streaming are classic Producer-Consumer applications. Network data arrives in bursts (producer), while playback must be smooth and continuous (consumer). The buffer absorbs network variability to provide seamless playback.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
/** * Video Player Buffer with Adaptive Strategies * * Demonstrates real-time Producer-Consumer with * timing constraints. */interface VideoFrame { sequenceNumber: number; timestamp: number; // Playback timestamp in ms data: Uint8Array; keyFrame: boolean;} class VideoPlayerBuffer { private readonly frameBuffer: RingBuffer<VideoFrame>; private readonly targetBufferMs: number; // Target buffer duration private readonly minBufferMs: number; // Minimum before rebuffer private currentPlaybackTime = 0; private state: 'buffering' | 'playing' | 'paused' = 'buffering'; // Adaptive bitrate control private downloadRate = 0; // bytes per second private readonly qualityLevels: QualityLevel[]; private currentQuality = 0; constructor( bufferCapacity: number = 300, // ~300 frames = 10 seconds at 30fps targetBufferMs: number = 5000, // 5 seconds buffer target minBufferMs: number = 1000 // 1 second minimum ) { this.frameBuffer = new RingBuffer<VideoFrame>(bufferCapacity); this.targetBufferMs = targetBufferMs; this.minBufferMs = minBufferMs; } // Producer: Network downloader adds frames async receiveFrame(frame: VideoFrame): Promise<void> { // Add frame to buffer const added = this.frameBuffer.tryAdd(frame); if (!added) { // Buffer full - skip frame (shouldn't happen if balanced) console.warn('Frame buffer full, dropping frame:', frame.sequenceNumber); return; } // Check if we have enough to start/resume playback if (this.state === 'buffering') { const bufferedMs = this.getBufferedDuration(); if (bufferedMs >= this.targetBufferMs) { this.state = 'playing'; this.onPlaybackReady(); } else { // Update UI with buffering progress this.onBufferingProgress(bufferedMs / this.targetBufferMs); } } // Update download rate estimate (for adaptive bitrate) this.updateDownloadRate(frame.data.length); } // Consumer: Playback loop consumes frames at constant rate startPlaybackLoop(): void { const frameInterval = 1000 / 30; // 30fps = 33.33ms per frame setInterval(() => { if (this.state !== 'playing') return; // Get next frame const frame = this.frameBuffer.tryRemove(); if (!frame) { // Buffer underrun! Need to rebuffer this.state = 'buffering'; this.onRebuffer(); return; } // Render the frame this.renderFrame(frame); this.currentPlaybackTime = frame.timestamp; // Check buffer health this.checkBufferHealth(); }, frameInterval); } private checkBufferHealth(): void { const bufferedMs = this.getBufferedDuration(); if (bufferedMs < this.minBufferMs) { // Buffer critically low - consider: // 1. Dropping to lower quality // 2. Warning user this.onBufferLow(bufferedMs); this.maybeReduceQuality(); } else if (bufferedMs > this.targetBufferMs * 1.5) { // Buffer healthy - can try higher quality this.maybeIncreaseQuality(); } } private maybeReduceQuality(): void { // Adaptive Bitrate: Switch to lower quality if buffer draining if (this.currentQuality > 0) { this.currentQuality--; const newQuality = this.qualityLevels[this.currentQuality]; this.requestQualityChange(newQuality); console.log('Reducing quality to', newQuality.name); } } private maybeIncreaseQuality(): void { // Can we sustain higher quality? const nextQuality = this.qualityLevels[this.currentQuality + 1]; if (nextQuality && this.downloadRate > nextQuality.bitrate * 1.2) { this.currentQuality++; this.requestQualityChange(nextQuality); console.log('Increasing quality to', nextQuality.name); } } private getBufferedDuration(): number { if (this.frameBuffer.isEmpty()) return 0; const firstFrame = this.frameBuffer.peek()!; const lastFrame = this.frameBuffer.peekLast()!; return lastFrame.timestamp - firstFrame.timestamp; } // Event callbacks private onPlaybackReady(): void { /* Start playing */ } private onBufferingProgress(progress: number): void { /* Update UI */ } private onRebuffer(): void { /* Show buffering spinner */ } private onBufferLow(ms: number): void { /* Warning state */ } private renderFrame(frame: VideoFrame): void { /* Actually render */ } private requestQualityChange(quality: QualityLevel): void { /* Request new stream */ } private updateDownloadRate(bytes: number): void { /* Moving average */ }} // The key insight:// - Producer (network): Variable, bursty delivery// - Buffer: Absorbs variability, maintains 5s cushion// - Consumer (playback): Strict 30fps constant rate// - Adaptive: Adjusts quality based on buffer healthThe buffer in media streaming is often called a 'jitter buffer' because its primary purpose is to absorb network jitter (variability in packet arrival times). A larger jitter buffer means smoother playback but higher latency. Live streams use smaller buffers than on-demand content.
Modern microservices architectures often use event-driven patterns, where the Producer-Consumer pattern enables loose coupling between services. Events are published to topics (produced), and interested services subscribe and process them (consumed).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
/** * Event-Driven Order Processing System * * Multiple microservices communicate through events. * Each service is a consumer of some events and producer of others. */ // Event Typesinterface OrderCreated { type: 'OrderCreated'; orderId: string; customerId: string; items: Item[]; timestamp: Date;} interface PaymentProcessed { type: 'PaymentProcessed'; orderId: string; success: boolean; transactionId: string; timestamp: Date;} interface InventoryReserved { type: 'InventoryReserved'; orderId: string; items: { sku: string; reserved: boolean }[]; timestamp: Date;} interface OrderShipped { type: 'OrderShipped'; orderId: string; trackingNumber: string; carrier: string; timestamp: Date;} type OrderEvent = OrderCreated | PaymentProcessed | InventoryReserved | OrderShipped; // Event Bus (Kafka-like abstraction)class EventBus { private readonly kafka: KafkaClient; async publish(topic: string, event: OrderEvent): Promise<void> { await this.kafka.produce(topic, { key: event.orderId, // Same order → same partition → ordering value: JSON.stringify(event), headers: { 'event-type': event.type, 'timestamp': event.timestamp.toISOString() } }); } subscribe(topic: string, groupId: string, handler: (event: OrderEvent) => Promise<void>): void { this.kafka.consume(topic, groupId, async (message) => { const event = JSON.parse(message.value) as OrderEvent; await handler(event); }); }} // Order Service (Producer of OrderCreated)class OrderService { constructor(private eventBus: EventBus) {} async createOrder(customerId: string, items: Item[]): Promise<string> { const orderId = generateOrderId(); // Save to database await this.saveOrder(orderId, customerId, items); // Publish event (produce) await this.eventBus.publish('orders', { type: 'OrderCreated', orderId, customerId, items, timestamp: new Date() }); return orderId; }} // Payment Service (Consumer of OrderCreated, Producer of PaymentProcessed)class PaymentService { constructor(private eventBus: EventBus) { // Subscribe to OrderCreated events this.eventBus.subscribe('orders', 'payment-service', async (event) => { if (event.type === 'OrderCreated') { await this.handleOrderCreated(event); } }); } private async handleOrderCreated(event: OrderCreated): Promise<void> { // Process payment const result = await this.processPayment(event.customerId, event.items); // Publish result (produce) await this.eventBus.publish('payments', { type: 'PaymentProcessed', orderId: event.orderId, success: result.success, transactionId: result.transactionId, timestamp: new Date() }); }} // Inventory Service (Consumer of PaymentProcessed, Producer of InventoryReserved)class InventoryService { constructor(private eventBus: EventBus) { this.eventBus.subscribe('payments', 'inventory-service', async (event) => { if (event.type === 'PaymentProcessed' && event.success) { await this.handlePaymentSuccess(event); } }); } private async handlePaymentSuccess(event: PaymentProcessed): Promise<void> { const order = await this.getOrder(event.orderId); const reservations = await this.reserveInventory(order.items); await this.eventBus.publish('inventory', { type: 'InventoryReserved', orderId: event.orderId, items: reservations, timestamp: new Date() }); }} // Notification Service (Consumer of multiple event types)class NotificationService { constructor(private eventBus: EventBus) { // Listen to all order-related events this.eventBus.subscribe('orders', 'notification-service', this.handleEvent.bind(this)); this.eventBus.subscribe('payments', 'notification-service', this.handleEvent.bind(this)); this.eventBus.subscribe('shipments', 'notification-service', this.handleEvent.bind(this)); } private async handleEvent(event: OrderEvent): Promise<void> { switch (event.type) { case 'OrderCreated': await this.sendEmail('Order Confirmed', event.orderId); break; case 'PaymentProcessed': if (!event.success) { await this.sendEmail('Payment Failed', event.orderId); } break; case 'OrderShipped': await this.sendEmail('Order Shipped', event.orderId, event.trackingNumber); break; } }} // The pattern:// - Each service consumes events it cares about// - Each service produces events about its domain// - Events flow through message broker (Kafka)// - Services are completely decoupled// - New services can subscribe without modifying producers| Event | Produced By | Consumed By (examples) |
|---|---|---|
| OrderCreated | Order Service | Payment Service, Inventory Check, Analytics |
| PaymentProcessed | Payment Service | Inventory Service, Order Service, Notifications |
| InventoryReserved | Inventory Service | Shipping Service, Order Service |
| OrderShipped | Shipping Service | Notification Service, Order Service, Analytics |
The print spooler is a classic operating system example of Producer-Consumer. Applications produce print jobs; the spooler queues them; the printer consumes them one at a time. This decoupling allows applications to return immediately while the slow printer works.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
/** * Print Spooler: Classic Producer-Consumer Example * * Demonstrates OS-level coordination pattern. */interface PrintJob { id: string; submittedAt: Date; priority: 'low' | 'normal' | 'high'; document: Buffer; copies: number; user: string; status: 'queued' | 'printing' | 'completed' | 'failed';} class PrintSpooler { // Priority queue - high priority jobs first private readonly jobQueue: PriorityBlockingQueue<PrintJob>; private readonly printer: Printer; private readonly jobStore: Map<string, PrintJob> = new Map(); private running = true; constructor(printer: Printer, maxQueueSize: number = 1000) { this.jobQueue = new PriorityBlockingQueue<PrintJob>( maxQueueSize, (a, b) => this.comparePriority(a, b) ); this.printer = printer; this.startSpoolLoop(); } // Producer: Application submits print job async submitJob(job: Omit<PrintJob, 'id' | 'submittedAt' | 'status'>): Promise<string> { const printJob: PrintJob = { ...job, id: generateJobId(), submittedAt: new Date(), status: 'queued' }; // Try to add to queue const queued = await this.jobQueue.offer(printJob, 5000); if (!queued) { throw new Error('Print queue is full. Please try again later.'); } this.jobStore.set(printJob.id, printJob); console.log(`Job ${printJob.id} queued by ${job.user}`); return printJob.id; } // Check job status getJobStatus(jobId: string): PrintJob | undefined { return this.jobStore.get(jobId); } // Cancel job if still queued cancelJob(jobId: string): boolean { const job = this.jobStore.get(jobId); if (job && job.status === 'queued') { this.jobQueue.remove(job); job.status = 'failed'; return true; } return false; } // Consumer: Background print loop private async startSpoolLoop(): Promise<void> { while (this.running) { // Wait for next job (blocks if queue empty) const job = await this.jobQueue.take(); job.status = 'printing'; console.log(`Printing job ${job.id}...`); try { // Actually send to printer for (let copy = 0; copy < job.copies; copy++) { await this.printer.print(job.document); } job.status = 'completed'; console.log(`Job ${job.id} completed`); } catch (error) { job.status = 'failed'; console.error(`Job ${job.id} failed:`, error); // Optionally requeue for retry // await this.jobQueue.offer(job, 0); } } } private comparePriority(a: PrintJob, b: PrintJob): number { const priorityOrder = { high: 0, normal: 1, low: 2 }; const priorityDiff = priorityOrder[a.priority] - priorityOrder[b.priority]; if (priorityDiff !== 0) return priorityDiff; // Same priority: FIFO by submission time return a.submittedAt.getTime() - b.submittedAt.getTime(); }} // Usage:const spooler = new PrintSpooler(officePrinter); // Applications produce print jobsconst jobId = await spooler.submitJob({ document: generatePDF(), copies: 2, priority: 'normal', user: 'alice'}); // Check statusconsole.log(spooler.getJobStatus(jobId));// { id: '...', status: 'queued', ... }The print spooler demonstrates a priority queue variant of Producer-Consumer. Instead of strict FIFO, items are ordered by priority. This is common in task scheduling, where urgent work should preempt batch processing.
The Producer-Consumer pattern appears in countless forms across software engineering. Recognizing it helps you apply proven solutions rather than reinventing coordination mechanisms.
You have mastered the Producer-Consumer pattern: the fundamental problem of coordinating producers and consumers, the bounded buffer solution with semaphores and condition variables, various implementation strategies from circular buffers to distributed queues, and real-world applications across diverse domains. This pattern is foundational for concurrent system design.