Loading content...
The solution to the Producer-Consumer problem has been studied and refined for over 50 years, since Edsger Dijkstra first formalized it in 1965. The bounded buffer with proper synchronization remains the gold standard because it elegantly solves every challenge we identified in the previous page.
The core insight of the solution is deceptively simple: use counting to track available slots and items, and use condition variables to efficiently wait for those counts to change. This combination prevents race conditions, handles rate mismatches, avoids deadlock, and ensures fairness.
In this page, we'll build this solution from first principles, understanding not just what to implement but why each component is necessary. By the end, you'll be able to implement correct Producer-Consumer systems and recognize when existing implementations are flawed.
This page covers the canonical bounded buffer solution: understanding semaphores and condition variables, implementing the classic two-semaphore solution, handling edge cases correctly, and ensuring both correctness and efficiency.
A bounded buffer is a fixed-capacity data structure that sits between producers and consumers. Its bounded nature is critical—it prevents unbounded memory growth when producers are faster than consumers, and its fixed size allows us to reason precisely about its state.
A correctly implemented bounded buffer maintains these invariants at all times:
At any moment, a bounded buffer is in one of three states:
| State | Condition | Producer Action | Consumer Action |
|---|---|---|---|
| Empty | item_count = 0 | Can produce immediately | Must wait for items |
| Partial | 0 < item_count < capacity | Can produce immediately | Can consume immediately |
| Full | item_count = capacity | Must wait for space | Can consume immediately |
The boundedness is not merely a practical constraint—it's essential for correctness:
Buffer size is a crucial design decision. Too small: producers block excessively, reducing throughput. Too large: high memory usage, increased latency, delayed backpressure. The optimal size depends on production/consumption rate variance. A common heuristic: buffer enough to absorb brief rate spikes without excessive memory use.
The classic solution to the Producer-Consumer problem uses semaphores, a synchronization primitive invented by Dijkstra. A semaphore is essentially a non-negative integer counter with two atomic operations.
wait() (also called P, down, or acquire):
signal() (also called V, up, or release):
Semaphores solve the Producer-Consumer problem because they encode counting into the synchronization primitive. Instead of checking "is buffer full?" and then acting (which creates a race), we atomically "try to decrement the empty-slot counter and block if it's already zero."
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
// Conceptual Semaphore Implementation (not production-ready)class Semaphore { private permits: number; private waitQueue: Array<() => void> = []; constructor(initialPermits: number) { if (initialPermits < 0) throw new Error("Permits cannot be negative"); this.permits = initialPermits; } /** * Acquire a permit (wait/P/down) * Blocks if no permits are available */ async acquire(): Promise<void> { // In a real implementation, this entire method is atomic if (this.permits > 0) { this.permits--; return; } // No permits available - block until signaled return new Promise<void>((resolve) => { this.waitQueue.push(resolve); }); } /** * Release a permit (signal/V/up) * Wakes one waiting thread if any */ release(): void { this.permits++; // Wake one waiting thread if any const waiter = this.waitQueue.shift(); if (waiter) { this.permits--; // Give the permit to the waiter waiter(); } } /** * Check available permits (for debugging only) */ availablePermits(): number { return this.permits; }} // Usage:// Semaphore with 3 permits = 3 threads can acquire simultaneously// 4th thread blocks until one releasesThe classic Producer-Consumer solution uses three semaphores:
The brilliance of this design is that semaphores elegantly encode both counting and waiting. Producers wait on empty (no slots) and signal full (added an item). Consumers wait on full (no items) and signal empty (freed a slot).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
/** * Classic Producer-Consumer with Semaphores * * This implementation is correct, deadlock-free, and handles * all the edge cases we discussed in the problem page. */class BoundedBuffer<T> { private buffer: T[] = []; private readonly capacity: number; // Semaphores for coordination private empty: Semaphore; // Counts empty slots (producer waits) private full: Semaphore; // Counts filled slots (consumer waits) private mutex: Semaphore; // Mutual exclusion for buffer access constructor(capacity: number) { this.capacity = capacity; // Initially: all slots are empty, no items this.empty = new Semaphore(capacity); // capacity empty slots this.full = new Semaphore(0); // 0 items this.mutex = new Semaphore(1); // Binary for mutual exclusion } /** * Producer: Add item to buffer * * Order of operations is CRITICAL: * 1. Wait for empty slot (blocks if buffer full) * 2. Acquire mutex (protect buffer modification) * 3. Add item to buffer * 4. Release mutex * 5. Signal that item is available */ async produce(item: T): Promise<void> { // Step 1: Wait for an empty slot // This blocks if buffer is full (empty count is 0) await this.empty.acquire(); // Step 2: Lock the buffer // This ensures only one producer/consumer modifies buffer await this.mutex.acquire(); try { // Step 3: Add item (protected by mutex) this.buffer.push(item); } finally { // Step 4: Release mutex (always, even on error) this.mutex.release(); } // Step 5: Signal that an item is available // This wakes a waiting consumer, if any this.full.release(); } /** * Consumer: Remove item from buffer * * Mirror of producer, with opposite semaphores: * 1. Wait for item (blocks if buffer empty) * 2. Acquire mutex (protect buffer modification) * 3. Remove item from buffer * 4. Release mutex * 5. Signal that slot is available */ async consume(): Promise<T> { // Step 1: Wait for an item // This blocks if buffer is empty (full count is 0) await this.full.acquire(); // Step 2: Lock the buffer await this.mutex.acquire(); let item: T; try { // Step 3: Remove item (protected by mutex) item = this.buffer.shift()!; } finally { // Step 4: Release mutex (always, even on error) this.mutex.release(); } // Step 5: Signal that a slot is available // This wakes a waiting producer, if any this.empty.release(); return item; }} // Usage Example:const buffer = new BoundedBuffer<string>(10); // Producer threadasync function producer(id: number) { for (let i = 0; i < 100; i++) { const item = `Item ${i} from Producer ${id}`; await buffer.produce(item); console.log(`Producer ${id} added: ${item}`); }} // Consumer threadasync function consumer(id: number) { while (true) { const item = await buffer.consume(); console.log(`Consumer ${id} processed: ${item}`); // Process the item... }}The order of wait operations (empty before mutex for producers, full before mutex for consumers) is CRITICAL. Reversing them causes deadlock! If a producer acquires mutex first and then waits on empty, and a consumer needs mutex to release empty, neither can proceed.
Let's verify that the semaphore solution satisfies all the correctness requirements we established:
| Requirement | How Solution Satisfies It |
|---|---|
| Mutual Exclusion | The mutex semaphore ensures only one thread modifies the buffer at any time. All buffer operations are wrapped in mutex acquire/release. |
| Buffer Bounds Respected | Producers can only proceed after acquiring an 'empty' permit—impossible if buffer is full. Consumers can only proceed after acquiring a 'full' permit—impossible if buffer is empty. |
| No Deadlock | The order of semaphore operations prevents circular wait. Threads wait on counting semaphores BEFORE acquiring mutex, never holding mutex while waiting for capacity/items. |
| No Starvation | Semaphore implementations typically use FIFO ordering for waiters. Each release wakes the longest-waiting thread, ensuring bounded wait times. |
| Correct Signaling | Producers release 'full' after adding an item, waking waiting consumers. Consumers release 'empty' after removing, waking waiting producers. |
We can prove correctness by showing that the invariant empty + full = capacity is always maintained:
Initially:
empty = capacity, full = 0empty + full = capacity ✓After produce():
empty decremented by 1 (acquired)full incremented by 1 (released)empty + full unchanged ✓After consume():
full decremented by 1 (acquired)empty incremented by 1 (released)empty + full unchanged ✓Since the invariant holds initially and every operation preserves it, it always holds. This proves:
full ≤ capacity (since full = capacity - empty ≤ capacity)full ≥ 0 (semaphore count can't go negative)Reasoning about concurrent code is notoriously difficult. Invariant-based proofs provide confidence that code is correct. When writing concurrent code, always identify the invariants and verify that every operation preserves them.
While semaphores provide an elegant solution, many modern languages and frameworks use condition variables instead. Condition variables offer a more general mechanism for waiting on arbitrary conditions, not just counting.
A condition variable is always used with a lock/mutex. It provides:
The key advantage: you can wait on any boolean condition, not just counter values.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.locks.*; /** * Producer-Consumer with Condition Variables * * This is often preferred in Java because it allows * separate conditions for producers and consumers. */public class BoundedBuffer<T> { private final Queue<T> buffer = new LinkedList<>(); private final int capacity; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); // For producers private final Condition notEmpty = lock.newCondition(); // For consumers public BoundedBuffer(int capacity) { this.capacity = capacity; } /** * Producer: Add item, wait if buffer is full */ public void produce(T item) throws InterruptedException { lock.lock(); try { // WHILE loop, not IF - guard against spurious wakeups while (buffer.size() == capacity) { // Buffer is full, wait for consumer to make room notFull.await(); // Atomically release lock and wait } buffer.add(item); // Signal consumers that an item is available notEmpty.signal(); } finally { lock.unlock(); } } /** * Consumer: Remove item, wait if buffer is empty */ public T consume() throws InterruptedException { lock.lock(); try { // WHILE loop, not IF - guard against spurious wakeups while (buffer.isEmpty()) { // Buffer is empty, wait for producer to add item notEmpty.await(); // Atomically release lock and wait } T item = buffer.poll(); // Signal producers that space is available notFull.signal(); return item; } finally { lock.unlock(); } } /** * Non-blocking try methods with timeout */ public boolean tryProduce(T item, long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { while (buffer.size() == capacity) { if (!notFull.await(timeout, unit)) { return false; // Timed out } } buffer.add(item); notEmpty.signal(); return true; } finally { lock.unlock(); } }}When waiting on a condition variable, ALWAYS use a while loop, not an if statement. Reason: Spurious wakeups can occur (thread wakes without notify), and multiple threads might wake but only one condition is satisfied. The while loop re-checks the condition after waking.
In practice, you rarely implement Producer-Consumer from scratch. Modern languages provide blocking queue data structures that encapsulate the entire pattern, thoroughly tested and optimized.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
import java.util.concurrent.*; // Java provides several BlockingQueue implementations: // 1. ArrayBlockingQueue - Bounded, array-backedBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100); // 2. LinkedBlockingQueue - Optionally bounded, linked-nodeBlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(100);BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>(); // 3. PriorityBlockingQueue - Priority orderingBlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>(); // 4. SynchronousQueue - No capacity, direct handoffBlockingQueue<String> syncQueue = new SynchronousQueue<>(); // Usage - All blocking operations built-in: // ProducerarrayQueue.put("item"); // Blocks if fullarrayQueue.offer("item"); // Returns false if fullarrayQueue.offer("item", 5, TimeUnit.SECONDS); // Blocks with timeout // ConsumerString item = arrayQueue.take(); // Blocks if emptyString item2 = arrayQueue.poll(); // Returns null if emptyString item3 = arrayQueue.poll(5, TimeUnit.SECONDS); // Blocks with timeout // Complete Producer-Consumer with BlockingQueue:BlockingQueue<Order> orderQueue = new ArrayBlockingQueue<>(1000); // Producer threadnew Thread(() -> { while (running) { Order order = receiveOrder(); orderQueue.put(order); // Blocks if queue full }}).start(); // Consumer threadnew Thread(() -> { while (running) { Order order = orderQueue.take(); // Blocks if queue empty processOrder(order); }}).start();In production code, ALWAYS use your language's built-in blocking queue implementations. They are battle-tested, optimized, and handle edge cases you might miss. Custom implementations are for learning or when you need specialized behavior not provided by standard libraries.
A production-ready bounded buffer must handle several edge cases beyond the basic implementation:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
import java.util.concurrent.*;import java.util.concurrent.atomic.*; /** * Production-ready bounded buffer with graceful shutdown */public class ProductionBuffer<T> { private final BlockingQueue<T> buffer; private final AtomicBoolean shutdown = new AtomicBoolean(false); // Metrics private final AtomicLong producedCount = new AtomicLong(0); private final AtomicLong consumedCount = new AtomicLong(0); private final AtomicLong producerWaitTimeNs = new AtomicLong(0); private final AtomicLong consumerWaitTimeNs = new AtomicLong(0); public ProductionBuffer(int capacity) { this.buffer = new ArrayBlockingQueue<>(capacity); } /** * Produce with timeout and shutdown awareness */ public boolean produce(T item, long timeout, TimeUnit unit) throws InterruptedException { if (shutdown.get()) { throw new IllegalStateException("Buffer is shutdown"); } long startTime = System.nanoTime(); try { boolean success = buffer.offer(item, timeout, unit); if (success) { producedCount.incrementAndGet(); } return success; } finally { producerWaitTimeNs.addAndGet(System.nanoTime() - startTime); } } /** * Consume with timeout and shutdown awareness */ public T consume(long timeout, TimeUnit unit) throws InterruptedException { long startTime = System.nanoTime(); try { T item = buffer.poll(timeout, unit); if (item != null) { consumedCount.incrementAndGet(); } return item; // null means timeout } finally { consumerWaitTimeNs.addAndGet(System.nanoTime() - startTime); } } /** * Graceful shutdown - wake all waiting threads */ public void shutdown() { shutdown.set(true); // Insert poison pills to wake consumers // Or use interrupt on all producer/consumer threads } public boolean isShutdown() { return shutdown.get(); } // Metrics for monitoring public int getCurrentSize() { return buffer.size(); } public long getProducedCount() { return producedCount.get(); } public long getConsumedCount() { return consumedCount.get(); } public long getAverageProducerWaitNs() { long produced = producedCount.get(); return produced > 0 ? producerWaitTimeNs.get() / produced : 0; }}We've covered the complete solution to the Producer-Consumer problem. Let's consolidate the key points:
What's next:
With the core solution understood, the next page explores implementation strategies—different approaches to implementing Producer-Consumer in various contexts: circular buffers, lock-free implementations, and distributed queues.
You now understand the bounded buffer solution to the Producer-Consumer problem: semaphores for counting and waiting, condition variables for complex conditions, and built-in blocking queues for production use. This knowledge is foundational for any concurrent system design.