Loading content...
We've studied the mechanics, understood the tradeoffs, and analyzed the performance characteristics of broadcast. Now it's time to see broadcast in action—in real, production-quality implementations you can adapt to your own systems.
Each use case in this page is a complete, working pattern that demonstrates the appropriate use of broadcast, along with the design rationale and best practices learned from real-world deployments.
By the end of this page, you will have a library of production-ready patterns: barrier synchronization, thread pool lifecycle management, reader-writer locks, condition variable-based semaphores, and event notification systems. Each pattern demonstrates when and how to use broadcast correctly.
A barrier is a synchronization point where threads wait until all participants arrive. This is perhaps the most classic use case for broadcast—when the barrier is satisfied, ALL waiting threads must proceed.
If N threads wait at a barrier and only one is woken (via signal), then N-1 threads remain stuck forever. Broadcast is the only correct choice for barrier release.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
#include <pthread.h>#include <errno.h> /** * Reusable Barrier Implementation * * Features: * - Supports any number of participants * - Reusable (can be used in loops) * - Returns special value for one "serial" thread * - Thread-safe initialization and destruction */ #define BARRIER_SERIAL_THREAD (-1) typedef struct { pthread_mutex_t mutex; pthread_cond_t cond; unsigned int threshold; // Number of threads needed unsigned int count; // Threads arrived so far unsigned int generation; // Barrier reuse generation int valid; // For safe destruction} barrier_t; int barrier_init(barrier_t *b, unsigned int count) { if (count == 0) return EINVAL; if (pthread_mutex_init(&b->mutex, NULL) != 0) { return -1; } if (pthread_cond_init(&b->cond, NULL) != 0) { pthread_mutex_destroy(&b->mutex); return -1; } b->threshold = count; b->count = 0; b->generation = 0; b->valid = 1; return 0;} int barrier_destroy(barrier_t *b) { pthread_mutex_lock(&b->mutex); if (b->count > 0) { // Threads are waiting - cannot destroy pthread_mutex_unlock(&b->mutex); return EBUSY; } b->valid = 0; pthread_mutex_unlock(&b->mutex); pthread_cond_destroy(&b->cond); pthread_mutex_destroy(&b->mutex); return 0;} int barrier_wait(barrier_t *b) { pthread_mutex_lock(&b->mutex); if (!b->valid) { pthread_mutex_unlock(&b->mutex); return EINVAL; } // Record current generation before modifying count unsigned int my_generation = b->generation; b->count++; if (b->count == b->threshold) { // We are the LAST thread to arrive // Reset for next use b->count = 0; b->generation++; // Increment generation BEFORE broadcast // BROADCAST: Wake ALL waiting threads // This is the canonical broadcast use case pthread_cond_broadcast(&b->cond); pthread_mutex_unlock(&b->mutex); return BARRIER_SERIAL_THREAD; // Indicate we were the release thread } // Not the last thread - wait for others // Use generation to handle spurious wakeups AND barrier reuse while (my_generation == b->generation) { pthread_cond_wait(&b->cond, &b->mutex); } pthread_mutex_unlock(&b->mutex); return 0;} // Example usage: Parallel computation with barrier sync#define NUM_THREADS 8barrier_t phase_barrier; void *parallel_worker(void *arg) { int id = *(int *)arg; for (int iteration = 0; iteration < 100; iteration++) { // Phase 1: Each thread computes independently compute_local_part(id, iteration); // Barrier: Wait for all threads to complete Phase 1 int ret = barrier_wait(&phase_barrier); if (ret == BARRIER_SERIAL_THREAD) { // We're the "serial" thread - do any reduction work aggregate_results(); } // Phase 2: All threads proceed together // (guaranteed that Phase 1 is complete for everyone) finalize_iteration(id, iteration); // Second barrier before next iteration barrier_wait(&phase_barrier); } return NULL;} int main() { barrier_init(&phase_barrier, NUM_THREADS); pthread_t threads[NUM_THREADS]; int ids[NUM_THREADS]; for (int i = 0; i < NUM_THREADS; i++) { ids[i] = i; pthread_create(&threads[i], NULL, parallel_worker, &ids[i]); } for (int i = 0; i < NUM_THREADS; i++) { pthread_join(threads[i], NULL); } barrier_destroy(&phase_barrier); return 0;}Design Notes:
Generation counter: The generation field is crucial for barrier reuse. Without it, threads from different barrier uses could interfere.
Increment generation before broadcast: This ensures that by the time woken threads check, the generation has already changed.
Return value: The -1 return for the 'serial thread' allows applications to designate one thread for special work (like aggregation) without additional synchronization.
Valid flag: Prevents use after destruction and allows safe destruction checks.
A thread pool must manage several states: normal operation (signal for work), graceful shutdown (broadcast for all), pause/resume (broadcast for state changes), and dynamic resizing. This use case showcases when signal vs broadcast is appropriate within a single component.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
#include <pthread.h>#include <stdlib.h>#include <stdbool.h> /** * Thread Pool with Lifecycle Management * * Demonstrates: * - signal() for individual work items * - broadcast() for shutdown * - broadcast() for pause/resume * - Proper graceful shutdown handling */ typedef void (*task_func_t)(void *arg); typedef struct task { task_func_t func; void *arg; struct task *next;} task_t; typedef enum { POOL_RUNNING, POOL_PAUSED, POOL_SHUTDOWN} pool_state_t; typedef struct { pthread_mutex_t mutex; pthread_cond_t work_available; // Signal: new work item pthread_cond_t state_changed; // Broadcast: state transitions pthread_cond_t all_idle; // Broadcast: all workers idle task_t *queue_head; task_t *queue_tail; int queue_size; int num_workers; int active_workers; // Currently executing tasks int idle_workers; // Waiting for work pool_state_t state; pthread_t *worker_threads;} thread_pool_t; static void *worker_thread(void *arg) { thread_pool_t *pool = (thread_pool_t *)arg; pthread_mutex_lock(&pool->mutex); while (1) { // Wait for work or state change while (pool->queue_size == 0 && pool->state == POOL_RUNNING) { pool->idle_workers++; // Check if we're all idle (for graceful pause) if (pool->idle_workers == pool->num_workers) { pthread_cond_broadcast(&pool->all_idle); } pthread_cond_wait(&pool->work_available, &pool->mutex); pool->idle_workers--; } // Handle paused state while (pool->state == POOL_PAUSED) { pool->idle_workers++; if (pool->idle_workers == pool->num_workers) { pthread_cond_broadcast(&pool->all_idle); } pthread_cond_wait(&pool->state_changed, &pool->mutex); pool->idle_workers--; } // Handle shutdown if (pool->state == POOL_SHUTDOWN) { pthread_mutex_unlock(&pool->mutex); return NULL; } // Dequeue task if (pool->queue_size > 0) { task_t *task = pool->queue_head; pool->queue_head = task->next; if (pool->queue_head == NULL) { pool->queue_tail = NULL; } pool->queue_size--; pool->active_workers++; pthread_mutex_unlock(&pool->mutex); // Execute task outside of lock task->func(task->arg); free(task); pthread_mutex_lock(&pool->mutex); pool->active_workers--; } }} thread_pool_t *thread_pool_create(int num_workers) { thread_pool_t *pool = calloc(1, sizeof(thread_pool_t)); pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->work_available, NULL); pthread_cond_init(&pool->state_changed, NULL); pthread_cond_init(&pool->all_idle, NULL); pool->num_workers = num_workers; pool->state = POOL_RUNNING; pool->worker_threads = calloc(num_workers, sizeof(pthread_t)); for (int i = 0; i < num_workers; i++) { pthread_create(&pool->worker_threads[i], NULL, worker_thread, pool); } return pool;} // Submit work - uses SIGNAL (one work item, one worker)void thread_pool_submit(thread_pool_t *pool, task_func_t func, void *arg) { task_t *task = malloc(sizeof(task_t)); task->func = func; task->arg = arg; task->next = NULL; pthread_mutex_lock(&pool->mutex); if (pool->state == POOL_SHUTDOWN) { pthread_mutex_unlock(&pool->mutex); free(task); return; } // Enqueue if (pool->queue_tail) { pool->queue_tail->next = task; } else { pool->queue_head = task; } pool->queue_tail = task; pool->queue_size++; // SIGNAL: Wake ONE worker for ONE task pthread_cond_signal(&pool->work_available); pthread_mutex_unlock(&pool->mutex);} // Pause all workers - uses BROADCAST (all workers must respond)void thread_pool_pause(thread_pool_t *pool) { pthread_mutex_lock(&pool->mutex); pool->state = POOL_PAUSED; // Wait for all workers to become idle while (pool->idle_workers < pool->num_workers) { pthread_cond_wait(&pool->all_idle, &pool->mutex); } pthread_mutex_unlock(&pool->mutex);} // Resume workers - uses BROADCAST (all workers must wake)void thread_pool_resume(thread_pool_t *pool) { pthread_mutex_lock(&pool->mutex); pool->state = POOL_RUNNING; // BROADCAST: Wake ALL workers, they should all resume pthread_cond_broadcast(&pool->state_changed); // Also signal if there's pending work if (pool->queue_size > 0) { pthread_cond_broadcast(&pool->work_available); } pthread_mutex_unlock(&pool->mutex);} // Graceful shutdown - uses BROADCAST (all workers must exit)void thread_pool_shutdown(thread_pool_t *pool, bool wait_for_completion) { pthread_mutex_lock(&pool->mutex); if (wait_for_completion) { // Wait for queue to drain while (pool->queue_size > 0 || pool->active_workers > 0) { pthread_cond_wait(&pool->all_idle, &pool->mutex); } } pool->state = POOL_SHUTDOWN; // BROADCAST: Wake ALL workers to observe shutdown pthread_cond_broadcast(&pool->work_available); pthread_cond_broadcast(&pool->state_changed); pthread_mutex_unlock(&pool->mutex); // Wait for all workers to exit for (int i = 0; i < pool->num_workers; i++) { pthread_join(pool->worker_threads[i], NULL); } // Cleanup free(pool->worker_threads); pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->work_available); pthread_cond_destroy(&pool->state_changed); pthread_cond_destroy(&pool->all_idle); free(pool);}| Operation | Notification Type | Reason |
|---|---|---|
| Submit work | signal(work_available) | One work item → one worker |
| Pause | Wait on all_idle | (waiters, not signaler) |
| Resume | broadcast(state_changed) | ALL workers must wake and resume |
| Shutdown | broadcast(work_available + state_changed) | ALL workers must wake and exit |
A reader-writer lock allows multiple concurrent readers but exclusive writer access. This use case demonstrates sophisticated use of both signal and broadcast based on the type of transition.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
#include <pthread.h> /** * Reader-Writer Lock * * Demonstrates: * - signal() for writer-to-writer transitions (one at a time) * - broadcast() for writer-to-readers transitions (many can enter) * - Writer preference to prevent writer starvation */ typedef struct { pthread_mutex_t mutex; pthread_cond_t readers_ok; // Readers wait here pthread_cond_t writers_ok; // Writers wait here int active_readers; // Currently reading int active_writers; // 0 or 1 int waiting_readers; // Readers in queue int waiting_writers; // Writers in queue} rwlock_t; void rwlock_init(rwlock_t *rw) { pthread_mutex_init(&rw->mutex, NULL); pthread_cond_init(&rw->readers_ok, NULL); pthread_cond_init(&rw->writers_ok, NULL); rw->active_readers = 0; rw->active_writers = 0; rw->waiting_readers = 0; rw->waiting_writers = 0;} void rwlock_destroy(rwlock_t *rw) { pthread_mutex_destroy(&rw->mutex); pthread_cond_destroy(&rw->readers_ok); pthread_cond_destroy(&rw->writers_ok);} void rwlock_read_lock(rwlock_t *rw) { pthread_mutex_lock(&rw->mutex); rw->waiting_readers++; // Wait if there's an active or waiting writer (writer preference) while (rw->active_writers > 0 || rw->waiting_writers > 0) { pthread_cond_wait(&rw->readers_ok, &rw->mutex); } rw->waiting_readers--; rw->active_readers++; pthread_mutex_unlock(&rw->mutex);} void rwlock_read_unlock(rwlock_t *rw) { pthread_mutex_lock(&rw->mutex); rw->active_readers--; // If we're the last reader and writers are waiting if (rw->active_readers == 0 && rw->waiting_writers > 0) { // SIGNAL: Wake ONE writer (only one can write at a time) pthread_cond_signal(&rw->writers_ok); } pthread_mutex_unlock(&rw->mutex);} void rwlock_write_lock(rwlock_t *rw) { pthread_mutex_lock(&rw->mutex); rw->waiting_writers++; // Wait for no active readers AND no active writer while (rw->active_readers > 0 || rw->active_writers > 0) { pthread_cond_wait(&rw->writers_ok, &rw->mutex); } rw->waiting_writers--; rw->active_writers = 1; pthread_mutex_unlock(&rw->mutex);} void rwlock_write_unlock(rwlock_t *rw) { pthread_mutex_lock(&rw->mutex); rw->active_writers = 0; if (rw->waiting_writers > 0) { // Another writer waiting - give them priority // SIGNAL: Wake ONE writer (only one at a time) pthread_cond_signal(&rw->writers_ok); } else if (rw->waiting_readers > 0) { // No writers waiting, let all readers in // BROADCAST: Wake ALL waiting readers (they can all run concurrently) pthread_cond_broadcast(&rw->readers_ok); } pthread_mutex_unlock(&rw->mutex);} // Upgrade: Atomically convert read lock to write lock// Returns 0 on success, -1 if upgrade not possible (other readers exist)int rwlock_upgrade(rwlock_t *rw) { pthread_mutex_lock(&rw->mutex); // We must be the only reader to upgrade if (rw->active_readers != 1) { pthread_mutex_unlock(&rw->mutex); return -1; // Cannot upgrade, other readers exist } // Convert our read lock to write lock rw->active_readers = 0; rw->active_writers = 1; pthread_mutex_unlock(&rw->mutex); return 0;} // Downgrade: Atomically convert write lock to read lockvoid rwlock_downgrade(rwlock_t *rw) { pthread_mutex_lock(&rw->mutex); rw->active_writers = 0; rw->active_readers = 1; // We're now a reader - let other readers in if no writers waiting if (rw->waiting_writers == 0 && rw->waiting_readers > 0) { // BROADCAST: Let all waiting readers in pthread_cond_broadcast(&rw->readers_ok); } pthread_mutex_unlock(&rw->mutex);}Key Design Decisions:
Writer preference: Waiting writers block new readers. This prevents writer starvation.
signal() for writers: Only one writer can be active, so signal is always correct.
broadcast() for readers: Multiple readers can be active concurrently, so when transitioning from writer to readers, all waiting readers should wake.
Upgrade/Downgrade: Atomic lock conversions without releasing, useful for read-then-modify patterns.
An event notification system allows threads to wait for specific events and be notified when they occur. This pattern is common in GUI systems, network servers, and async frameworks.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
#include <pthread.h>#include <stdlib.h>#include <string.h>#include <stdbool.h> /** * Event Notification System * * Features: * - Named events * - Multiple waiters per event * - Auto-reset and manual-reset events * - Broadcast to all waiters on signal */ #define MAX_EVENTS 64#define EVENT_NAME_MAX 32 typedef enum { EVENT_AUTO_RESET, // Automatically resets after one waiter proceeds EVENT_MANUAL_RESET // Stays set until explicitly reset} event_type_t; typedef struct { char name[EVENT_NAME_MAX]; bool signaled; event_type_t type; int waiters; pthread_cond_t cond;} event_t; typedef struct { pthread_mutex_t mutex; event_t events[MAX_EVENTS]; int num_events;} event_system_t; void event_system_init(event_system_t *es) { pthread_mutex_init(&es->mutex, NULL); es->num_events = 0;} // Create a new eventint event_create(event_system_t *es, const char *name, event_type_t type) { pthread_mutex_lock(&es->mutex); // Check if event already exists for (int i = 0; i < es->num_events; i++) { if (strcmp(es->events[i].name, name) == 0) { pthread_mutex_unlock(&es->mutex); return i; // Already exists } } if (es->num_events >= MAX_EVENTS) { pthread_mutex_unlock(&es->mutex); return -1; // Full } int idx = es->num_events++; strncpy(es->events[idx].name, name, EVENT_NAME_MAX - 1); es->events[idx].signaled = false; es->events[idx].type = type; es->events[idx].waiters = 0; pthread_cond_init(&es->events[idx].cond, NULL); pthread_mutex_unlock(&es->mutex); return idx;} static int find_event(event_system_t *es, const char *name) { for (int i = 0; i < es->num_events; i++) { if (strcmp(es->events[i].name, name) == 0) { return i; } } return -1;} // Wait for an eventint event_wait(event_system_t *es, const char *name, int timeout_ms) { pthread_mutex_lock(&es->mutex); int idx = find_event(es, name); if (idx < 0) { pthread_mutex_unlock(&es->mutex); return -1; } event_t *ev = &es->events[idx]; ev->waiters++; // Wait for signal while (!ev->signaled) { if (timeout_ms > 0) { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += timeout_ms / 1000; ts.tv_nsec += (timeout_ms % 1000) * 1000000; if (ts.tv_nsec >= 1000000000) { ts.tv_sec++; ts.tv_nsec -= 1000000000; } int rc = pthread_cond_timedwait(&ev->cond, &es->mutex, &ts); if (rc == ETIMEDOUT) { ev->waiters--; pthread_mutex_unlock(&es->mutex); return -2; // Timeout } } else { pthread_cond_wait(&ev->cond, &es->mutex); } } // For auto-reset events, reset after ONE waiter proceeds if (ev->type == EVENT_AUTO_RESET) { ev->signaled = false; } ev->waiters--; pthread_mutex_unlock(&es->mutex); return 0;} // Signal an event - wake all waitersint event_signal(event_system_t *es, const char *name) { pthread_mutex_lock(&es->mutex); int idx = find_event(es, name); if (idx < 0) { pthread_mutex_unlock(&es->mutex); return -1; } event_t *ev = &es->events[idx]; ev->signaled = true; // BROADCAST: An event may have multiple waiters // All waiters should wake up and check the event // For MANUAL_RESET, multiple can proceed // For AUTO_RESET, only one will proceed (others re-wait) pthread_cond_broadcast(&ev->cond); pthread_mutex_unlock(&es->mutex); return 0;} // Reset a manual-reset eventint event_reset(event_system_t *es, const char *name) { pthread_mutex_lock(&es->mutex); int idx = find_event(es, name); if (idx < 0) { pthread_mutex_unlock(&es->mutex); return -1; } es->events[idx].signaled = false; pthread_mutex_unlock(&es->mutex); return 0;} // Example usage: Multi-stage initializationvoid initialize_system(event_system_t *es) { // Create initialization events event_create(es, "db_ready", EVENT_MANUAL_RESET); event_create(es, "cache_ready", EVENT_MANUAL_RESET); event_create(es, "server_ready", EVENT_MANUAL_RESET); // Start initialization threads // Each signals its event when done} void *database_thread(void *arg) { event_system_t *es = (event_system_t *)arg; initialize_database(); // Signal all threads waiting for DB event_signal(es, "db_ready"); // Continue with normal operation return NULL;} void *application_thread(void *arg) { event_system_t *es = (event_system_t *)arg; // Wait for all prerequisites event_wait(es, "db_ready", 0); event_wait(es, "cache_ready", 0); // All dependencies ready, start application run_application(); return NULL;}Events are inherently broadcast-based because multiple components may need to observe the same system state change. AUTO_RESET events add fairness by ensuring only one waiter proceeds per signal, while MANUAL_RESET events allow all waiters to observe persistent state.
Understanding how to build a semaphore from mutex and condition variable demonstrates the power of these primitives and clarifies when broadcast might be used in semaphore-like structures.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
#include <pthread.h>#include <errno.h> /** * Counting Semaphore using Mutex + Condition Variable * * Design choices: * - Uses signal() for single resource release (common case) * - Uses broadcast() for bulk release or when count increases by >1 */ typedef struct { pthread_mutex_t mutex; pthread_cond_t cond; int value; // Current semaphore value int waiters; // Number of waiting threads} semaphore_t; void semaphore_init(semaphore_t *sem, int initial_value) { pthread_mutex_init(&sem->mutex, NULL); pthread_cond_init(&sem->cond, NULL); sem->value = initial_value; sem->waiters = 0;} void semaphore_destroy(semaphore_t *sem) { pthread_mutex_destroy(&sem->mutex); pthread_cond_destroy(&sem->cond);} // P / wait / acquirevoid semaphore_wait(semaphore_t *sem) { pthread_mutex_lock(&sem->mutex); sem->waiters++; while (sem->value <= 0) { pthread_cond_wait(&sem->cond, &sem->mutex); } sem->waiters--; sem->value--; pthread_mutex_unlock(&sem->mutex);} // P with timeoutint semaphore_timedwait(semaphore_t *sem, int timeout_ms) { pthread_mutex_lock(&sem->mutex); struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += timeout_ms / 1000; ts.tv_nsec += (timeout_ms % 1000) * 1000000; if (ts.tv_nsec >= 1000000000) { ts.tv_sec++; ts.tv_nsec -= 1000000000; } sem->waiters++; int rc = 0; while (sem->value <= 0 && rc == 0) { rc = pthread_cond_timedwait(&sem->cond, &sem->mutex, &ts); } sem->waiters--; if (rc == ETIMEDOUT) { pthread_mutex_unlock(&sem->mutex); return -1; } sem->value--; pthread_mutex_unlock(&sem->mutex); return 0;} // V / signal / release - single unitvoid semaphore_post(semaphore_t *sem) { pthread_mutex_lock(&sem->mutex); sem->value++; // SIGNAL: One resource available, one waiter can proceed if (sem->waiters > 0) { pthread_cond_signal(&sem->cond); } pthread_mutex_unlock(&sem->mutex);} // Bulk release - release multiple units at oncevoid semaphore_post_multiple(semaphore_t *sem, int count) { pthread_mutex_lock(&sem->mutex); sem->value += count; // How many to wake? if (count >= sem->waiters) { // BROADCAST: Releasing more than waiters, wake everyone pthread_cond_broadcast(&sem->cond); } else { // Signal 'count' times (or use broadcast for simplicity) // Using broadcast is simpler and always correct pthread_cond_broadcast(&sem->cond); } pthread_mutex_unlock(&sem->mutex);} // Try to acquire without blockingint semaphore_trywait(semaphore_t *sem) { pthread_mutex_lock(&sem->mutex); if (sem->value > 0) { sem->value--; pthread_mutex_unlock(&sem->mutex); return 0; } pthread_mutex_unlock(&sem->mutex); return -1; // Would block} // Get current value (for debugging/monitoring)int semaphore_getvalue(semaphore_t *sem, int *value) { pthread_mutex_lock(&sem->mutex); *value = sem->value; pthread_mutex_unlock(&sem->mutex); return 0;} // Example: Resource pool using semaphore#define POOL_SIZE 10semaphore_t pool_sem;resource_t *resources[POOL_SIZE]; void init_pool() { semaphore_init(&pool_sem, POOL_SIZE); // 10 resources available for (int i = 0; i < POOL_SIZE; i++) { resources[i] = create_resource(i); }} resource_t *acquire_resource() { semaphore_wait(&pool_sem); // Get a resource (protected by another lock or lock-free) return dequeue_resource();} void release_resource(resource_t *r) { enqueue_resource(r); semaphore_post(&pool_sem);} // Bulk returnvoid release_resources(resource_t **r, int count) { for (int i = 0; i < count; i++) { enqueue_resource(r[i]); } semaphore_post_multiple(&pool_sem, count); // Uses broadcast}This semaphore uses signal() for standard post (one resource) and broadcast() for bulk post (multiple resources). The broadcast for bulk post ensures that up to 'count' waiters can proceed immediately, maximizing parallelism.
In systems that support dynamic reconfiguration, all components may need to observe configuration changes. This requires broadcast to notify all interested parties.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
#include <pthread.h>#include <stdlib.h>#include <stdbool.h> /** * Dynamic Configuration System * * Pattern: Config changes trigger broadcast to all observers * Uses generation counting for efficient change detection */ typedef struct { int log_level; int max_connections; int timeout_ms; bool debug_mode; char hostname[256];} config_t; typedef struct { pthread_mutex_t mutex; pthread_cond_t changed; config_t current; unsigned long generation; // Incremented on each change bool shutdown;} config_manager_t; void config_manager_init(config_manager_t *cm, const config_t *initial) { pthread_mutex_init(&cm->mutex, NULL); pthread_cond_init(&cm->changed, NULL); cm->current = *initial; cm->generation = 1; cm->shutdown = false;} // Get current config (lock-protected copy)void config_get(config_manager_t *cm, config_t *out) { pthread_mutex_lock(&cm->mutex); *out = cm->current; pthread_mutex_unlock(&cm->mutex);} // Update configuration - notifies ALL observersvoid config_update(config_manager_t *cm, const config_t *new_config) { pthread_mutex_lock(&cm->mutex); cm->current = *new_config; cm->generation++; // BROADCAST: ALL components need to observe the change // - Workers need new timeout settings // - Logger needs new log level // - Network needs new connection limits // Signal would only wake one, leaving others with stale config pthread_cond_broadcast(&cm->changed); pthread_mutex_unlock(&cm->mutex);} // Wait for config change (returns when generation changes)unsigned long config_wait_for_change(config_manager_t *cm, unsigned long last_seen_gen, config_t *out) { pthread_mutex_lock(&cm->mutex); while (cm->generation == last_seen_gen && !cm->shutdown) { pthread_cond_wait(&cm->changed, &cm->mutex); } if (cm->shutdown) { pthread_mutex_unlock(&cm->mutex); return 0; // Shutdown signal } *out = cm->current; unsigned long new_gen = cm->generation; pthread_mutex_unlock(&cm->mutex); return new_gen;} // Shutdown - wake all observersvoid config_manager_shutdown(config_manager_t *cm) { pthread_mutex_lock(&cm->mutex); cm->shutdown = true; // BROADCAST: All observers must wake and see shutdown pthread_cond_broadcast(&cm->changed); pthread_mutex_unlock(&cm->mutex);} // Example: Worker that responds to config changesvoid *worker_with_config(void *arg) { config_manager_t *cm = (config_manager_t *)arg; config_t local_config; unsigned long my_generation = 0; while (1) { // Wait for config change my_generation = config_wait_for_change(cm, my_generation, &local_config); if (my_generation == 0) { // Shutdown break; } // Apply new configuration printf("Worker: Updating timeout to %d ms", local_config.timeout_ms); apply_timeout(local_config.timeout_ms); // Continue working with new config... } return NULL;} // Hot reload from filevoid config_reload_from_file(config_manager_t *cm, const char *path) { config_t new_config; // Parse file (details omitted) if (parse_config_file(path, &new_config) == 0) { config_update(cm, &new_config); printf("Configuration reloaded, notifying all observers"); }}Pattern Benefits:
Generation counting: Allows observers to efficiently detect changes without full comparison
Broadcast for config changes: ALL observers must see updates, not just one
Local copies: Observers take a local copy, reducing lock contention during config reads
Shutdown integration: Same mechanism handles graceful shutdown notification
This reference summarizes the key patterns where broadcast is appropriate.
| Pattern | When to Use | Why Broadcast |
|---|---|---|
| Barrier | All threads must reach a point before any proceed | ALL waiters must proceed simultaneously |
| Shutdown/Cleanup | Terminating all workers gracefully | ALL workers must observe shutdown flag |
| State Transition | System-wide state change (pause, resume) | ALL threads must respond to new state |
| Configuration Change | Dynamic reconfiguration | ALL components need new config |
| Reader Release | Writer finishing, readers waiting | ALL readers can proceed concurrently |
| Event Notification | Named event signaled | Multiple observers may be waiting |
| Resource Pool Bulk Return | Returning multiple resources | Multiple acquirers may proceed |
| Epoch/Generation Change | Moving to new processing phase | ALL threads must see new epoch |
| Pattern | When to Use | Why Signal |
|---|---|---|
| Work Queue | Adding single work item | One item → one worker |
| Resource Release | Returning single resource | One resource → one acquirer |
| Writer Queue | Reader finished, writer waiting | Only one writer can enter |
| Producer/Consumer (separate CVs) | Producer adds, consumer waits | One item → one consumer |
| Leader Promotion | Promoting next leader | Only one leader at a time |
Congratulations! You have completed the Broadcast module. You now understand when and how to use broadcast effectively—the mechanics, the scenarios that require it, the performance implications, and the real-world patterns where it shines. You're equipped to make informed decisions about condition variable notification in any concurrent system you build.
What You've Learned:
With this knowledge, you can confidently design concurrent systems that correctly and efficiently synchronize multiple threads through condition variables.