Loading content...
The semaphore-based bounded buffer we explored in the previous page is correct and widely applicable. However, real-world systems have diverse requirements—high throughput, low latency, distributed deployment, persistence—that demand specialized implementation strategies.
This page explores the spectrum of Producer-Consumer implementations, from high-performance circular buffers to lock-free algorithms to distributed message queues. Understanding these strategies equips you to select the right implementation for your specific context, whether you're building a high-frequency trading system or a cloud-native microservices architecture.
This page covers implementation strategies across the performance spectrum: circular buffers for cache efficiency, lock-free queues for ultra-low latency, partitioned buffers for parallelism, and distributed queues for scale and durability.
The circular buffer (also called ring buffer) is the most common implementation for bounded Producer-Consumer buffers. It uses a fixed-size array with wrap-around indexing, avoiding the memory allocation overhead of linked lists.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
/** * Thread-safe Circular Buffer (Ring Buffer) * * Uses modulo arithmetic for wrap-around indexing. * Fixed memory footprint, cache-friendly access pattern. */class CircularBuffer<T> { private readonly buffer: (T | undefined)[]; private readonly capacity: number; // Head points to next slot to read // Tail points to next slot to write private head = 0; private tail = 0; private count = 0; // Synchronization private readonly lock = new Mutex(); private readonly notFull: ConditionVariable; private readonly notEmpty: ConditionVariable; constructor(capacity: number) { // Power of 2 capacity allows bitwise AND instead of modulo // (optimization: capacity & (size-1) instead of % size) this.capacity = capacity; this.buffer = new Array(capacity); this.notFull = new ConditionVariable(this.lock); this.notEmpty = new ConditionVariable(this.lock); } async produce(item: T): Promise<void> { await this.lock.acquire(); try { // Wait while buffer is full while (this.count === this.capacity) { await this.notFull.wait(); } // Add item at tail position this.buffer[this.tail] = item; // Advance tail with wrap-around this.tail = (this.tail + 1) % this.capacity; this.count++; // Signal consumers this.notEmpty.signal(); } finally { await this.lock.release(); } } async consume(): Promise<T> { await this.lock.acquire(); try { // Wait while buffer is empty while (this.count === 0) { await this.notEmpty.wait(); } // Remove item from head position const item = this.buffer[this.head]!; this.buffer[this.head] = undefined; // Help GC // Advance head with wrap-around this.head = (this.head + 1) % this.capacity; this.count--; // Signal producers this.notFull.signal(); return item; } finally { await this.lock.release(); } } // Visualization of buffer state: // capacity=8, head=2, tail=6, count=4 // // Index: 0 1 2 3 4 5 6 7 // Buffer: [ ] [ ] [A] [B] [C] [D] [ ] [ ] // ^head ^tail // // Items A,B,C,D are in the buffer (count=4) // Consumer will read A next (at head) // Producer will write next item at position 6 (at tail)}When capacity is a power of 2 (8, 16, 32, ...), you can replace modulo (%) with bitwise AND (&). Example: index % 16 becomes index & 15. This is significantly faster because modulo requires integer division while AND is a single CPU instruction.
For extreme performance requirements—microsecond latency, millions of operations per second—lock-based synchronization introduces unacceptable overhead. Lock-free implementations use atomic operations (compare-and-swap) instead of locks, eliminating blocking and associated latency.
Lock-free algorithms are:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
/** * Single-Producer, Single-Consumer (SPSC) Lock-Free Queue * * When exactly one producer and one consumer are guaranteed, * we can eliminate ALL locking. The indices provide natural * synchronization via visibility ordering. * * This is the fastest possible bounded queue implementation. */class SPSCQueue<T> { private readonly buffer: (T | undefined)[]; private readonly mask: number; // capacity - 1 for power of 2 // Volatile indices - changes visible across threads // Each is ONLY written by one thread: // - head written by consumer only // - tail written by producer only private head = 0; // Consumer reads from here private tail = 0; // Producer writes to here constructor(capacity: number) { // Capacity MUST be power of 2 for this implementation if ((capacity & (capacity - 1)) !== 0) { throw new Error("Capacity must be power of 2"); } this.buffer = new Array(capacity); this.mask = capacity - 1; } /** * Producer: Try to add item * Returns false if queue is full (non-blocking) */ tryProduce(item: T): boolean { const currentTail = this.tail; const nextTail = (currentTail + 1) & this.mask; // Check if queue is full // Full when: nextTail == head (would overwrite unread item) if (nextTail === this.head) { return false; // Full, don't block } // Write item before updating tail // Memory barrier ensures item is visible before tail update this.buffer[currentTail] = item; // Update tail (signals consumer that item is ready) this.tail = nextTail; return true; } /** * Consumer: Try to remove item * Returns undefined if queue is empty (non-blocking) */ tryConsume(): T | undefined { const currentHead = this.head; // Check if queue is empty // Empty when: head == tail (nothing to read) if (currentHead === this.tail) { return undefined; // Empty, don't block } // Read item before updating head const item = this.buffer[currentHead]; this.buffer[currentHead] = undefined; // Help GC // Update head (signals producer that slot is free) this.head = (currentHead + 1) & this.mask; return item; } // Why this works without locks: // 1. Only producer writes tail, only consumer writes head // 2. Producer reads head (to check full), consumer reads tail (to check empty) // 3. No simultaneous writes to any variable // 4. Memory ordering ensures writes are visible before index updates}The SPSC queue shown above only works correctly with exactly ONE producer thread and ONE consumer thread. Multiple producers or consumers will cause data races. For MPMC (Multi-Producer Multi-Consumer), you need more complex algorithms like Michael-Scott queue.
When a single queue becomes a bottleneck, partitioning distributes load across multiple independent queues. This is the secret behind high-throughput messaging systems like Apache Kafka.
partition = hash(key) % numPartitions1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
/** * Partitioned Buffer for High-Throughput Processing * * Distributes load across multiple independent queues. * Each partition can be consumed in parallel. */class PartitionedBuffer<K, V> { private readonly partitions: CircularBuffer<V>[]; private readonly numPartitions: number; constructor(numPartitions: number, partitionCapacity: number) { this.numPartitions = numPartitions; this.partitions = Array.from( { length: numPartitions }, () => new CircularBuffer<V>(partitionCapacity) ); } /** * Produce to specific partition based on key * Messages with same key always go to same partition */ async produce(key: K, value: V): Promise<void> { const partitionId = this.getPartition(key); await this.partitions[partitionId].produce(value); } /** * Consume from specific partition * Each consumer is assigned one or more partitions */ async consumeFromPartition(partitionId: number): Promise<V> { if (partitionId < 0 || partitionId >= this.numPartitions) { throw new Error(`Invalid partition: ${partitionId}`); } return await this.partitions[partitionId].consume(); } /** * Hash key to partition ID * Same key always maps to same partition */ private getPartition(key: K): number { // Simple hash function (in production, use murmur3 or similar) const hash = this.hashCode(key); // Ensure non-negative with bitwise AND return (hash & 0x7FFFFFFF) % this.numPartitions; } private hashCode(key: K): number { const str = String(key); let hash = 0; for (let i = 0; i < str.length; i++) { hash = ((hash << 5) - hash) + str.charCodeAt(i); hash = hash & hash; // Convert to 32-bit integer } return hash; } getNumPartitions(): number { return this.numPartitions; }} // Usage: Order processing with customer ID as partition keyconst orderQueue = new PartitionedBuffer<string, Order>(8, 1000); // Producer: Orders for same customer go to same partitionasync function processNewOrder(order: Order) { await orderQueue.produce(order.customerId, order);} // Consumers: Each consumer handles specific partitionsasync function consumerForPartition(partitionId: number) { while (true) { const order = await orderQueue.consumeFromPartition(partitionId); await processOrder(order); // Orders for same customer processed by same consumer // Maintains order per-customer }} // Start 8 consumers, one per partitionfor (let i = 0; i < orderQueue.getNumPartitions(); i++) { consumerForPartition(i);}| Strategy | How It Works | Use Case |
|---|---|---|
| Hash Partitioning | partition = hash(key) % N | Even distribution, order per key |
| Range Partitioning | key ranges mapped to partitions | Range queries, time-series data |
| Round-Robin | Cycle through partitions | Load balancing, no ordering needed |
| Custom Partitioning | Application-specific logic | Geographic routing, priority lanes |
When producers and consumers run on different machines, or when durability and fault-tolerance are required, distributed message queues implement the Producer-Consumer pattern across a network.
| System | Architecture | Ordering | Best For |
|---|---|---|---|
| Apache Kafka | Append-only log, partitioned | Per-partition FIFO | High-throughput event streaming, log aggregation |
| RabbitMQ | Traditional broker with exchanges | Queue-level FIFO | Task queues, RPC, flexible routing |
| Amazon SQS | Managed cloud queue service | Best-effort (standard), FIFO (FIFO queues) | AWS integration, serverless |
| Redis Streams | In-memory log with persistence | Per-stream FIFO | Real-time, low-latency, caching integration |
| Apache Pulsar | Segment-based storage | Per-partition FIFO | Multi-tenancy, geo-replication |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
// Kafka Producer-Consumer Example // Producer: Send order events to Kafkaimport { Kafka } from 'kafkajs'; const kafka = new Kafka({ clientId: 'order-service', brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092']}); const producer = kafka.producer(); async function sendOrder(order: Order) { await producer.connect(); await producer.send({ topic: 'orders', messages: [{ // Key determines partition (orders for same customer → same partition) key: order.customerId, value: JSON.stringify(order), // Optional headers for metadata headers: { 'source': 'checkout-service', 'timestamp': Date.now().toString() } }] });} // Consumer: Process orders from Kafkaconst consumer = kafka.consumer({ groupId: 'order-processing-group'}); async function startConsumer() { await consumer.connect(); // Subscribe to topic await consumer.subscribe({ topic: 'orders', fromBeginning: false // Start from current offset }); // Process messages await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const order = JSON.parse(message.value!.toString()); console.log(`Processing order from partition ${partition}`); await processOrder(order); // Kafka auto-commits offset after processing // (configurable for at-least-once vs exactly-once) } });} // Start multiple consumers in same group for parallel processing// Kafka automatically assigns partitions to consumersWhen producers outpace consumers, the system must handle the overflow. Backpressure is the mechanism by which a system signals capacity constraints upstream and handles excess load. Different strategies suit different requirements.
| Strategy | Description | Data Loss? | Best For |
|---|---|---|---|
| Blocking | Producer blocks until buffer has space | No | Durable systems, can afford latency |
| Drop Newest | Reject new items when buffer is full | Yes (new) | Latest-wins scenarios, telemetry |
| Drop Oldest | Discard oldest items to make room | Yes (old) | Real-time feeds, live data |
| Buffering | Use unbounded buffer (with limits) | Eventually | Bursty traffic with eventual catch-up |
| Sampling | Accept only a percentage of items | Yes | Monitoring, analytics, high volume |
| Load Shedding | Reject with error, let caller retry | Controlled | APIs, interactive systems |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
/** * Backpressure Policies for Bounded Buffers */ type BackpressurePolicy = 'BLOCK' | 'DROP_NEWEST' | 'DROP_OLDEST' | 'ERROR'; class PolicyBuffer<T> { private buffer: T[] = []; private readonly capacity: number; private readonly policy: BackpressurePolicy; constructor(capacity: number, policy: BackpressurePolicy) { this.capacity = capacity; this.policy = policy; } produce(item: T): ProduceResult { if (this.buffer.length < this.capacity) { this.buffer.push(item); return { success: true }; } // Buffer is full - apply backpressure policy switch (this.policy) { case 'BLOCK': // In async version, would await until space available throw new Error('Blocking not shown in sync example'); case 'DROP_NEWEST': // Reject the new item, keep buffer unchanged return { success: false, dropped: item, reason: 'Buffer full, dropped newest' }; case 'DROP_OLDEST': // Remove oldest item, add new item const dropped = this.buffer.shift(); this.buffer.push(item); return { success: true, dropped, reason: 'Buffer full, dropped oldest' }; case 'ERROR': // Throw error - caller must handle throw new BufferFullError('Buffer capacity exceeded'); default: throw new Error(`Unknown policy: ${this.policy}`); } }} // Usage Examples: // Logging: Drop oldest, keep most recent logsconst logBuffer = new PolicyBuffer<LogEntry>(1000, 'DROP_OLDEST'); // Metrics: Drop newest, existing samples are valuableconst metricsBuffer = new PolicyBuffer<Metric>(5000, 'DROP_NEWEST'); // Critical transactions: Error, never lose data silentlyconst transactionBuffer = new PolicyBuffer<Transaction>(100, 'ERROR');The right backpressure policy depends on your data. Financial transactions: never drop, error loudly. Live video frames: drop oldest, newest matters. Metrics: sample or drop newest, trends matter more than every point. Know your data's value profile.
With so many implementation options, how do you choose? Here's a decision framework based on your requirements:
| Requirement | Recommended Implementation |
|---|---|
| Simple in-process buffering | Language's built-in blocking queue (ArrayBlockingQueue, queue.Queue) |
| Ultra-low latency (<1μs) | Lock-free SPSC ring buffer |
| High throughput, moderate latency | Lock-free MPMC queue or partitioned buffers |
| Durability required | Distributed queue (Kafka, RabbitMQ, SQS) |
| Cross-service communication | Distributed queue with appropriate guarantees |
| Exactly-once processing | Kafka with transactions, or idempotent consumers |
| Global ordering | Single partition (limits throughput) |
| Per-key ordering | Partitioned queue with key-based routing |
| Fire-and-forget | Async queue with DROP_NEWEST policy |
What's next:
The final page explores use cases and examples—concrete scenarios where Producer-Consumer patterns solve real problems, from web servers to data pipelines to game engines.
You now understand the spectrum of Producer-Consumer implementations: from simple circular buffers to lock-free queues to distributed messaging systems. This knowledge enables you to select and implement the right solution for any concurrency coordination challenge.