Loading learning content...
The classic bounded buffer (producer-consumer) problem—with one producer, one consumer, and a fixed-size buffer—is deliberately simplified for pedagogy. Real systems face richer challenges:
This page systematically explores bounded buffer variations that model these real-world requirements. Each variation builds on the basic pattern while introducing new synchronization challenges.
By the end of this page, you will:
• Understand multi-producer/multi-consumer synchronization • Implement priority-based bounded buffers • Handle non-blocking (bounded-wait) buffer operations • Design solutions for multiple coordinated buffers • Recognize which variation applies to which real-world scenarios
The MPMC buffer is the workhorse of concurrent systems. Unlike the single-producer single-consumer (SPSC) case, MPMC requires careful coordination to prevent multiple producers from corrupting the buffer simultaneously, and multiple consumers from consuming the same item.
Key Challenges:
The Classic Solution:
We use two semaphores for buffer space/items and a mutex for slot access:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
/* * Multi-Producer Multi-Consumer Bounded Buffer * * Uses semaphores for counting available slots/items * and a mutex for protecting the buffer indices. */ #include <semaphore.h>#include <pthread.h>#include <stdlib.h> #define BUFFER_SIZE 10 typedef struct { int data[BUFFER_SIZE]; // Circular buffer int head; // Next slot to write int tail; // Next slot to read sem_t empty_slots; // Available slots for producers sem_t full_slots; // Available items for consumers pthread_mutex_t mutex; // Protects head, tail, data access} MPMCBuffer; void mpmc_init(MPMCBuffer* buf) { buf->head = 0; buf->tail = 0; // Initialize semaphores sem_init(&buf->empty_slots, 0, BUFFER_SIZE); // All slots empty sem_init(&buf->full_slots, 0, 0); // No items yet pthread_mutex_init(&buf->mutex, NULL);} /* ============================================ * PRODUCER OPERATION * ============================================ */void mpmc_produce(MPMCBuffer* buf, int item) { /* * Step 1: Acquire an empty slot (blocks if full) * * This is done OUTSIDE the mutex to allow parallelism. * Multiple producers can pass this concurrently if slots exist. */ sem_wait(&buf->empty_slots); /* * Step 2: Lock and insert item * * Short critical section: only actual buffer manipulation. */ pthread_mutex_lock(&buf->mutex); buf->data[buf->head] = item; buf->head = (buf->head + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); /* * Step 3: Signal that an item is available */ sem_post(&buf->full_slots);} /* ============================================ * CONSUMER OPERATION * ============================================ */int mpmc_consume(MPMCBuffer* buf) { /* * Step 1: Wait for an available item (blocks if empty) */ sem_wait(&buf->full_slots); /* * Step 2: Lock and extract item */ pthread_mutex_lock(&buf->mutex); int item = buf->data[buf->tail]; buf->tail = (buf->tail + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); /* * Step 3: Signal that a slot is available */ sem_post(&buf->empty_slots); return item;} /* ============================================ * EXAMPLE: Multiple producers and consumers * ============================================ */MPMCBuffer shared_buffer; void* producer_thread(void* arg) { int producer_id = *(int*)arg; for (int i = 0; i < 100; i++) { int item = producer_id * 1000 + i; // Unique item ID mpmc_produce(&shared_buffer, item); printf("Producer %d: produced %d\n", producer_id, item); } return NULL;} void* consumer_thread(void* arg) { int consumer_id = *(int*)arg; for (int i = 0; i < 100; i++) { int item = mpmc_consume(&shared_buffer); printf("Consumer %d: consumed %d\n", consumer_id, item); } return NULL;} int main() { mpmc_init(&shared_buffer); pthread_t producers[3], consumers[3]; int producer_ids[3] = {0, 1, 2}; int consumer_ids[3] = {0, 1, 2}; // Create 3 producers and 3 consumers for (int i = 0; i < 3; i++) { pthread_create(&producers[i], NULL, producer_thread, &producer_ids[i]); pthread_create(&consumers[i], NULL, consumer_thread, &consumer_ids[i]); } // Wait for completion for (int i = 0; i < 3; i++) { pthread_join(producers[i], NULL); pthread_join(consumers[i], NULL); } return 0;}Semaphore ordering prevents race conditions:
Key insight: The mutex is held only during the actual buffer access—the semaphore waits happen outside, allowing maximum concurrency.
Performance Optimization: Split Mutexes
The single mutex can become a bottleneck under high contention. An optimization uses separate mutexes for producers and consumers when the buffer is large enough that head and tail don't interfere:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
/* * Split Mutex Optimization * * When buffer is large, head (producer) and tail (consumer) * indices are often far apart. Separate mutexes reduce contention. */ typedef struct { int data[BUFFER_SIZE]; volatile int head; // Producers use this volatile int tail; // Consumers use this sem_t empty_slots; sem_t full_slots; pthread_mutex_t producer_mutex; // Protects head pthread_mutex_t consumer_mutex; // Protects tail} MPMCSplitMutex; void produce_split(MPMCSplitMutex* buf, int item) { sem_wait(&buf->empty_slots); pthread_mutex_lock(&buf->producer_mutex); // Only producers compete buf->data[buf->head] = item; buf->head = (buf->head + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->producer_mutex); sem_post(&buf->full_slots);} int consume_split(MPMCSplitMutex* buf) { sem_wait(&buf->full_slots); pthread_mutex_lock(&buf->consumer_mutex); // Only consumers compete int item = buf->data[buf->tail]; buf->tail = (buf->tail + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->consumer_mutex); sem_post(&buf->empty_slots); return item;} /* * This works correctly because: * - Semaphores ensure producers never catch up to consumers (overflow) * - Semaphores ensure consumers never pass producers (underflow) * - Even though head and tail access isn't synchronized, * they operate on guaranteed-disjoint regions of the buffer */Many systems require priority-based consumption: high-priority items should be consumed before lower-priority ones, even if the lower-priority item arrived first.
Applications:
Key Challenges:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
/* * Priority Bounded Buffer * * Items have associated priorities. Consumers receive * the highest-priority available item. * * Implementation: Separate FIFO per priority level, * shared capacity limit across all levels. */ #include <semaphore.h>#include <pthread.h>#include <stdbool.h> #define BUFFER_SIZE 20#define NUM_PRIORITIES 3 // HIGH=0, MEDIUM=1, LOW=2 typedef struct PriorityItem { int data; int priority;} PriorityItem; typedef struct { // Separate circular buffer per priority level PriorityItem buffers[NUM_PRIORITIES][BUFFER_SIZE]; int heads[NUM_PRIORITIES]; int tails[NUM_PRIORITIES]; int counts[NUM_PRIORITIES]; // Items at each priority level int total_count; // Total items across all levels sem_t empty_slots; // Total available slots sem_t full_slots; // Total available items pthread_mutex_t mutex;} PriorityBuffer; void priority_init(PriorityBuffer* buf) { for (int p = 0; p < NUM_PRIORITIES; p++) { buf->heads[p] = 0; buf->tails[p] = 0; buf->counts[p] = 0; } buf->total_count = 0; sem_init(&buf->empty_slots, 0, BUFFER_SIZE); sem_init(&buf->full_slots, 0, 0); pthread_mutex_init(&buf->mutex, NULL);} /* ============================================ * PRODUCER: Insert with Priority * ============================================ */void priority_produce(PriorityBuffer* buf, int item, int priority) { // Wait for any slot to be available sem_wait(&buf->empty_slots); pthread_mutex_lock(&buf->mutex); // Insert into the appropriate priority queue int p = priority; buf->buffers[p][buf->heads[p]].data = item; buf->buffers[p][buf->heads[p]].priority = priority; buf->heads[p] = (buf->heads[p] + 1) % BUFFER_SIZE; buf->counts[p]++; buf->total_count++; pthread_mutex_unlock(&buf->mutex); sem_post(&buf->full_slots);} /* ============================================ * CONSUMER: Get Highest Priority Item * ============================================ */int priority_consume(PriorityBuffer* buf) { // Wait for any item to be available sem_wait(&buf->full_slots); pthread_mutex_lock(&buf->mutex); // Find highest priority level with items int p; for (p = 0; p < NUM_PRIORITIES; p++) { if (buf->counts[p] > 0) { break; // Found highest priority with items } } // Extract from selected priority queue int item = buf->buffers[p][buf->tails[p]].data; buf->tails[p] = (buf->tails[p] + 1) % BUFFER_SIZE; buf->counts[p]--; buf->total_count--; pthread_mutex_unlock(&buf->mutex); sem_post(&buf->empty_slots); return item;} /* ============================================ * ANTI-STARVATION: Aging Mechanism * ============================================ *//* * Problem: Low-priority items may starve indefinitely * if high-priority items keep arriving. * * Solution: Priority aging. Periodically increase the * priority of waiting items. */ void age_priorities(PriorityBuffer* buf) { pthread_mutex_lock(&buf->mutex); // Move items from low to high priority after aging for (int p = NUM_PRIORITIES - 1; p > 0; p--) { // If items at priority p have been waiting "too long" // promote them to priority p-1 // In practice, items would have timestamps // and we'd check: if (now - item.enqueue_time > AGE_THRESHOLD) // For simplicity, we promote all items from level p to p-1 while (buf->counts[p] > 0 && buf->counts[p-1] < BUFFER_SIZE) { // Move one item int item = buf->buffers[p][buf->tails[p]].data; buf->tails[p] = (buf->tails[p] + 1) % BUFFER_SIZE; buf->counts[p]--; buf->buffers[p-1][buf->heads[p-1]].data = item; buf->buffers[p-1][buf->heads[p-1]].priority = p-1; buf->heads[p-1] = (buf->heads[p-1] + 1) % BUFFER_SIZE; buf->counts[p-1]++; } } pthread_mutex_unlock(&buf->mutex);}Without aging, low-priority starvation is guaranteed under sufficient high-priority load. Real systems must implement one of:
• Priority aging: Increase priority of waiting items over time • Weighted selection: Probabilistically favor high priority, not absolutely • Priority quotas: Reserve some capacity for each priority level
| Approach | Complexity | Starvation | Fairness |
|---|---|---|---|
| Separate queues per priority | O(P) search per consume | Possible | Strict priority order |
| Single priority heap | O(log N) per operation | Possible | Strict priority order |
| Multi-level feedback | O(P) with aging | Prevented | Priority with eventual guarantee |
| Weighted random selection | O(1) with precomputation | Probabilistic | Soft priorities |
Standard bounded buffers block when full (for producers) or empty (for consumers). Many applications need non-blocking or bounded-wait alternatives:
Applications:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
/* * Bounded Buffer with Non-Blocking and Timed Operations */ #include <semaphore.h>#include <pthread.h>#include <time.h>#include <errno.h>#include <stdbool.h> typedef struct { int data[BUFFER_SIZE]; int head, tail; sem_t empty_slots; sem_t full_slots; pthread_mutex_t mutex;} FlexibleBuffer; /* ============================================ * NON-BLOCKING PRODUCE (TRY) * ============================================ */bool try_produce(FlexibleBuffer* buf, int item) { /* * sem_trywait returns immediately with: * - 0 if semaphore was successfully decremented * - -1 with errno=EAGAIN if semaphore was zero */ if (sem_trywait(&buf->empty_slots) != 0) { // No empty slots available return false; } pthread_mutex_lock(&buf->mutex); buf->data[buf->head] = item; buf->head = (buf->head + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); sem_post(&buf->full_slots); return true;} /* ============================================ * NON-BLOCKING CONSUME (TRY) * ============================================ */bool try_consume(FlexibleBuffer* buf, int* item) { if (sem_trywait(&buf->full_slots) != 0) { // No items available return false; } pthread_mutex_lock(&buf->mutex); *item = buf->data[buf->tail]; buf->tail = (buf->tail + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); sem_post(&buf->empty_slots); return true;} /* ============================================ * TIMED PRODUCE (WAIT UP TO TIMEOUT) * ============================================ */bool timed_produce(FlexibleBuffer* buf, int item, int timeout_ms) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); // Add timeout ts.tv_sec += timeout_ms / 1000; ts.tv_nsec += (timeout_ms % 1000) * 1000000; if (ts.tv_nsec >= 1000000000) { ts.tv_sec++; ts.tv_nsec -= 1000000000; } /* * sem_timedwait blocks until: * - Semaphore becomes available (returns 0) * - Timeout expires (returns -1, errno=ETIMEDOUT) * - Interrupted by signal (returns -1, errno=EINTR) */ if (sem_timedwait(&buf->empty_slots, &ts) != 0) { if (errno == ETIMEDOUT) { return false; // Timed out } // Handle other errors as needed return false; } pthread_mutex_lock(&buf->mutex); buf->data[buf->head] = item; buf->head = (buf->head + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); sem_post(&buf->full_slots); return true;} /* ============================================ * BOUNDED RETRY PRODUCE * ============================================ */bool retry_produce(FlexibleBuffer* buf, int item, int max_retries, int backoff_ms) { for (int attempt = 0; attempt <= max_retries; attempt++) { if (try_produce(buf, item)) { return true; // Success } // Exponential backoff on failure int sleep_time = backoff_ms * (1 << attempt); usleep(sleep_time * 1000); } return false; // All retries exhausted}Choosing the Right Semantic:
| Pattern | Use When | Behavior on Full/Empty |
|---|---|---|
| Blocking (standard) | Throughput is key, delays acceptable | Wait indefinitely |
| Try (non-blocking) | Latency critical, have alternative action | Return failure immediately |
| Timed | Bounded delay acceptable, need eventual result | Wait up to timeout, then fail |
| Bounded retry | Want to give multiple chances with backoff | Retry with increasing delays |
Non-blocking operations enable backpressure: feedback from consumers to producers.
• Load shedding: Discard excess work when buffer full • Rate limiting: Slow producer when try_produce fails frequently • Adaptive scaling: Spin up more consumers when buffer grows
These patterns are fundamental to building resilient, overload-resistant systems.
Complex systems often have multiple buffers that must be coordinated. For example:
Key Challenges:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
/* * Multi-Stage Pipeline with Coordinated Buffers * * A three-stage pipeline: * [Producer] -> [Buffer A] -> [Processor] -> [Buffer B] -> [Consumer] * * The processor is both a consumer of Buffer A * and a producer to Buffer B. */ #include <semaphore.h>#include <pthread.h> #define BUFFER_SIZE 5 typedef struct { int data[BUFFER_SIZE]; int head, tail; sem_t empty, full; pthread_mutex_t mutex;} Buffer; void buffer_init(Buffer* buf) { buf->head = buf->tail = 0; sem_init(&buf->empty, 0, BUFFER_SIZE); sem_init(&buf->full, 0, 0); pthread_mutex_init(&buf->mutex, NULL);} void buffer_put(Buffer* buf, int item) { sem_wait(&buf->empty); pthread_mutex_lock(&buf->mutex); buf->data[buf->head] = item; buf->head = (buf->head + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); sem_post(&buf->full);} int buffer_get(Buffer* buf) { sem_wait(&buf->full); pthread_mutex_lock(&buf->mutex); int item = buf->data[buf->tail]; buf->tail = (buf->tail + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); sem_post(&buf->empty); return item;} Buffer buffer_a, buffer_b; /* ============================================ * PRODUCER: Feeds Buffer A * ============================================ */void* producer_thread(void* arg) { for (int i = 0; i < 100; i++) { int raw_data = i; printf("Producer: generating item %d\n", raw_data); buffer_put(&buffer_a, raw_data); } return NULL;} /* ============================================ * PROCESSOR: Consumes from A, Produces to B * ============================================ */void* processor_thread(void* arg) { for (int i = 0; i < 100; i++) { // Consume from buffer A int raw_data = buffer_get(&buffer_a); // Process (transform the item) int processed_data = raw_data * 10; // Example transformation printf("Processor: %d -> %d\n", raw_data, processed_data); // Produce to buffer B buffer_put(&buffer_b, processed_data); } return NULL;} /* ============================================ * CONSUMER: Consumes from Buffer B * ============================================ */void* consumer_thread(void* arg) { for (int i = 0; i < 100; i++) { int processed_data = buffer_get(&buffer_b); printf("Consumer: received item %d\n", processed_data); } return NULL;} int main() { buffer_init(&buffer_a); buffer_init(&buffer_b); pthread_t producer, processor, consumer; pthread_create(&producer, NULL, producer_thread, NULL); pthread_create(&processor, NULL, processor_thread, NULL); pthread_create(&consumer, NULL, consumer_thread, NULL); pthread_join(producer, NULL); pthread_join(processor, NULL); pthread_join(consumer, NULL); return 0;}Deadlock scenario with multiple processors:
If we have multiple processor threads each trying to atomically "consume from A and produce to B":
Solution: Ensure processors can always complete. Either don't hold resources across blocking waits, or size buffers to guarantee progress.
Fork-Join Buffers:
Some pipelines fan out (one buffer feeding multiple consumers) or fan in (multiple producers feeding one buffer):
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
/* * Fork: One producer, multiple consumers (each gets items) * * Two models: * 1. Work-stealing: Each item goes to ONE consumer (load balancing) * 2. Broadcast: Each item goes to ALL consumers (replication) */ /* Work-Stealing (standard MPMC with multiple consumers) */// This is just MPMC as designed above - consumers compete for items /* Broadcast (each consumer gets a copy) */typedef struct { int data[BUFFER_SIZE]; int head; int tails[MAX_CONSUMERS]; // Each consumer has own tail int consumer_count; sem_t full_slots[MAX_CONSUMERS]; // One per consumer sem_t empty_slots; // Shared: based on slowest consumer pthread_mutex_t mutex;} BroadcastBuffer; void broadcast_put(BroadcastBuffer* buf, int item) { sem_wait(&buf->empty_slots); pthread_mutex_lock(&buf->mutex); buf->data[buf->head] = item; buf->head = (buf->head + 1) % BUFFER_SIZE; pthread_mutex_unlock(&buf->mutex); // Signal ALL consumers for (int i = 0; i < buf->consumer_count; i++) { sem_post(&buf->full_slots[i]); }} int broadcast_get(BroadcastBuffer* buf, int consumer_id) { sem_wait(&buf->full_slots[consumer_id]); pthread_mutex_lock(&buf->mutex); int item = buf->data[buf->tails[consumer_id]]; int old_tail = buf->tails[consumer_id]; buf->tails[consumer_id] = (old_tail + 1) % BUFFER_SIZE; // Check if ALL consumers have advanced past old_tail bool all_advanced = true; for (int i = 0; i < buf->consumer_count; i++) { // Complex logic to check circular distance // Simplified: track min_tail separately } if (all_advanced) { sem_post(&buf->empty_slots); // Slot now truly free } pthread_mutex_unlock(&buf->mutex); return item;} // Note: Broadcast buffers are complex. The slowest consumer// determines overall buffer utilization. If one consumer stalls,// the producer eventually blocks even if other consumers are fast.Real systems often have additional constraints beyond basic capacity limits:
1. Memory Constraint Buffers
Instead of (or in addition to) limiting item count, limit total memory usage. Items may have variable sizes:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
/* * Memory-Bounded Buffer * * Limit is on total bytes, not item count. * Each item has a variable size. */ typedef struct { void* data[MAX_ITEMS]; // Pointers to variable-size items size_t sizes[MAX_ITEMS]; // Size of each item int head, tail, count; size_t max_memory; // Total memory limit size_t current_memory; // Currently used memory pthread_mutex_t mutex; pthread_cond_t space_available; // Signaled when memory freed pthread_cond_t items_available; // Signaled when items added} MemoryBuffer; bool memory_put(MemoryBuffer* buf, void* item, size_t size) { pthread_mutex_lock(&buf->mutex); // Wait until enough memory is available while (buf->current_memory + size > buf->max_memory) { pthread_cond_wait(&buf->space_available, &buf->mutex); } buf->data[buf->head] = item; buf->sizes[buf->head] = size; buf->head = (buf->head + 1) % MAX_ITEMS; buf->count++; buf->current_memory += size; pthread_cond_signal(&buf->items_available); pthread_mutex_unlock(&buf->mutex); return true;} void* memory_get(MemoryBuffer* buf, size_t* size) { pthread_mutex_lock(&buf->mutex); while (buf->count == 0) { pthread_cond_wait(&buf->items_available, &buf->mutex); } void* item = buf->data[buf->tail]; *size = buf->sizes[buf->tail]; buf->tail = (buf->tail + 1) % MAX_ITEMS; buf->count--; buf->current_memory -= *size; pthread_cond_broadcast(&buf->space_available); // Wake all waiting producers pthread_mutex_unlock(&buf->mutex); return item;}2. Time-Windowed Buffers
Items expire after a time window. Useful for:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
/* * Time-Windowed Buffer * * Only items younger than window_ms are valid. * Old items are automatically discarded. */ typedef struct { int data[BUFFER_SIZE]; long timestamps[BUFFER_SIZE]; // Epoch ms when item was added int head, tail, count; long window_ms; // Items older than this are expired pthread_mutex_t mutex; pthread_cond_t items_available;} WindowedBuffer; long current_time_ms() { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return ts.tv_sec * 1000 + ts.tv_nsec / 1000000;} void windowed_put(WindowedBuffer* buf, int item) { pthread_mutex_lock(&buf->mutex); // Purge expired items from tail before adding windowed_purge_expired(buf); // Add new item with timestamp buf->data[buf->head] = item; buf->timestamps[buf->head] = current_time_ms(); buf->head = (buf->head + 1) % BUFFER_SIZE; buf->count++; pthread_cond_signal(&buf->items_available); pthread_mutex_unlock(&buf->mutex);} bool windowed_get(WindowedBuffer* buf, int* item, long timeout_ms) { pthread_mutex_lock(&buf->mutex); long deadline = current_time_ms() + timeout_ms; while (true) { // Purge expired items windowed_purge_expired(buf); if (buf->count > 0) { // Check if oldest item is still valid if (current_time_ms() - buf->timestamps[buf->tail] <= buf->window_ms) { *item = buf->data[buf->tail]; buf->tail = (buf->tail + 1) % BUFFER_SIZE; buf->count--; pthread_mutex_unlock(&buf->mutex); return true; } } // No valid items, wait (with timeout) long remaining = deadline - current_time_ms(); if (remaining <= 0) { pthread_mutex_unlock(&buf->mutex); return false; // Timed out } // ... set up timed wait ... }} void windowed_purge_expired(WindowedBuffer* buf) { long now = current_time_ms(); while (buf->count > 0) { if (now - buf->timestamps[buf->tail] > buf->window_ms) { // Item expired, discard buf->tail = (buf->tail + 1) % BUFFER_SIZE; buf->count--; } else { break; // Found non-expired item } }}For extremely high-performance scenarios, lock-free bounded buffers eliminate mutex contention using atomic operations. This is an advanced topic but worth understanding for high-throughput systems.
Key Techniques:
SPSC Lock-Free Buffer:
The single-producer single-consumer case is simplest because there's no contention on indices:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
/* * Lock-Free SPSC Bounded Buffer * * Producer only writes head; Consumer only writes tail. * No atomics needed for single-producer single-consumer! * (Just careful memory ordering) */ #include <stdatomic.h> #define BUFFER_SIZE 1024 // Must be power of 2 typedef struct { int data[BUFFER_SIZE]; // Using volatile for visibility, no need for atomics in SPSC // (On x86; other architectures may need memory barriers) _Atomic size_t head; // Next write position (producer) _Atomic size_t tail; // Next read position (consumer)} SPSCBuffer; void spsc_init(SPSCBuffer* buf) { atomic_store(&buf->head, 0); atomic_store(&buf->tail, 0);} bool spsc_produce(SPSCBuffer* buf, int item) { size_t head = atomic_load_explicit(&buf->head, memory_order_relaxed); size_t tail = atomic_load_explicit(&buf->tail, memory_order_acquire); // Full if head is one behind tail (circular) if (((head + 1) & (BUFFER_SIZE - 1)) == tail) { return false; // Buffer full } buf->data[head] = item; // Release ensures data write is visible before head update atomic_store_explicit(&buf->head, (head + 1) & (BUFFER_SIZE - 1), memory_order_release); return true;} bool spsc_consume(SPSCBuffer* buf, int* item) { size_t tail = atomic_load_explicit(&buf->tail, memory_order_relaxed); size_t head = atomic_load_explicit(&buf->head, memory_order_acquire); // Empty if head == tail if (head == tail) { return false; // Buffer empty } *item = buf->data[tail]; // Release ensures data read is complete before tail update atomic_store_explicit(&buf->tail, (tail + 1) & (BUFFER_SIZE - 1), memory_order_release); return true;} /* * Why lock-free SPSC is simple: * - Only one writer to head (producer) * - Only one writer to tail (consumer) * - No contention on indices * - Memory ordering ensures data visibility * * MPMC is much harder - requires CAS loops or other techniques. */Lock-free is not always faster:
• Under low contention, mutex-based solutions have simpler code and comparable performance • Lock-free helps when contention is high and threads frequently collide • Lock-free structures are harder to implement correctly—bugs are subtle
Rule of thumb: Profile first. Use lock-free only when mutex contention is a demonstrated bottleneck.
With many variations available, choosing the right one depends on your system's requirements. Here's a decision framework:
| Scenario | Recommended Variation | Key Considerations |
|---|---|---|
| Single producer, single consumer | SPSC lock-free | Simplest, fastest for this case |
| Multiple producers/consumers, moderate throughput | MPMC with mutex | Simple, correct, good enough for most cases |
| Very high throughput, many cores | MPMC lock-free or split-mutex | Profile first; complexity cost is real |
| Items have different priorities | Priority buffer | Consider starvation prevention |
| Real-time deadlines | Non-blocking with timeout | Never block indefinitely |
| Overload expected | Try operations + backpressure | Shed load gracefully |
| Pipeline processing | Multiple coordinated buffers | Watch for pipeline deadlock |
| Variable-size items | Memory-bounded buffer | Track bytes, not items |
| Recent events only matter | Time-windowed buffer | Auto-expire old items |
Default to MPMC with mutex. It's correct, well-understood, and performs adequately for most workloads. Optimize to more complex variations only when profiling identifies the buffer as a bottleneck. Premature optimization here creates maintenance burden without measurable benefit.
Bounded buffers are perhaps the most widely used synchronization pattern. Understanding their variations prepares you for diverse real-world requirements.
You now have a comprehensive toolkit of bounded buffer variations. Each addresses specific requirements that go beyond the textbook single-producer single-consumer case. Next, we'll examine Common Patterns that recur across these and other synchronization problems.