Loading learning content...
A perfectly tuned PREEMPT_RT kernel with optimally configured hardware still produces unpredictable behavior if the application itself introduces non-determinism. The application is the final link in the real-time chain—and often the weakest.
Real-time application development requires a disciplined approach that differs fundamentally from general-purpose programming. Operations that are perfectly acceptable in a web server—dynamic memory allocation, blocking I/O, unbounded loops—become sources of catastrophic latency in a motor controller or medical device.
This page consolidates the essential guidelines for writing deterministic, deadline-meeting applications on real-time Linux. These aren't suggestions; in critical systems, they're requirements.
By the end of this page, you will understand: (1) The RT application lifecycle and initialization patterns; (2) Memory management for determinism; (3) Safe inter-thread communication; (4) Proper timing and synchronization; (5) Common anti-patterns and how to avoid them; (6) Debugging and validation techniques; and (7) Production deployment considerations.
Well-designed RT applications follow a consistent structural pattern that separates initialization from real-time execution:
123456789101112131415161718192021222324252627282930313233343536373839404142434445
Real-Time Application Phases: ╔════════════════════════════════════════════════════════════════╗║ PHASE 1: NON-RT INITIALIZATION ║║ • Parse configuration ║║ • Allocate ALL memory ║║ • Open files and devices ║║ • Initialize data structures ║║ • Create threads (but don't start RT work) ║║ • Set up communication channels ║║ • Lock memory (mlockall) ║║ • Pre-fault all pages ║╚════════════════════════════════════════════════════════════════╝ │ ▼╔════════════════════════════════════════════════════════════════╗║ PHASE 2: RT CONFIGURATION ║║ • Set thread priorities (SCHED_FIFO/RR/DEADLINE) ║║ • Pin threads to CPUs ║║ • Set up periodic timers ║║ • Enter RT scheduling ║╚════════════════════════════════════════════════════════════════╝ │ ▼╔════════════════════════════════════════════════════════════════╗║ PHASE 3: RT EXECUTION LOOP ║║ ┌────────────────────────────────────────────────────────────┐ ║║ │ • Wake at precise period │ ║║ │ • Sample inputs │ ║║ │ • Execute control algorithm │ ║║ │ • Output results │ ║║ │ • NO: malloc, file I/O, console output, syscalls │ ║║ └────────────────────────────────────────────────────────────┘ ║║ ↑ │ ║║ └────────────────── Repeat ────────────────────────┘ ║╚════════════════════════════════════════════════════════════════╝ │ ▼╔════════════════════════════════════════════════════════════════╗║ PHASE 4: SHUTDOWN ║║ • Signal threads to stop ║║ • Wait for clean termination ║║ • Release resources ║║ • Exit ║╚════════════════════════════════════════════════════════════════╝123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
/** * Real-Time Application Template * * This template demonstrates the proper structure for * a deterministic real-time application on Linux. */ #define _GNU_SOURCE#include <pthread.h>#include <sched.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <sys/mman.h>#include <time.h>#include <unistd.h>#include <signal.h> /* Configuration */#define RT_PRIORITY 80#define RT_CPU 2#define PERIOD_NS 1000000 /* 1ms */#define STACK_SIZE (512 * 1024)#define NUM_BUFFERS 100#define BUFFER_SIZE 4096 /* Application state - all pre-allocated */typedef struct { /* Pre-allocated buffers */ void* buffers[NUM_BUFFERS]; int buffer_free[NUM_BUFFERS]; /* Control data */ double control_state[64]; /* Communication */ pthread_mutex_t data_lock; /* Runtime state */ volatile sig_atomic_t running;} app_state_t; static app_state_t* g_state = NULL; /* ============================================ * PHASE 1: NON-RT INITIALIZATION * ============================================ */ /** * Allocate all memory needed for RT operation * Must be called BEFORE entering RT mode */static int init_memory(app_state_t* state) { /* Allocate buffer pool */ for (int i = 0; i < NUM_BUFFERS; i++) { state->buffers[i] = aligned_alloc(64, BUFFER_SIZE); if (!state->buffers[i]) { perror("Failed to allocate buffer"); return -1; } /* Touch pages to ensure they're faulted in */ memset(state->buffers[i], 0, BUFFER_SIZE); state->buffer_free[i] = 1; } /* Initialize control state */ memset(state->control_state, 0, sizeof(state->control_state)); return 0;} /** * Lock all memory and pre-fault stack */static int lock_memory(void) { /* Lock current and future memory */ if (mlockall(MCL_CURRENT | MCL_FUTURE) != 0) { perror("mlockall failed"); return -1; } /* Pre-fault stack by touching pages */ volatile char stack_preallocate[STACK_SIZE]; memset((void*)stack_preallocate, 0, sizeof(stack_preallocate)); return 0;} /* ============================================ * PHASE 2: RT CONFIGURATION * ============================================ */ /** * Configure thread for real-time execution */static int configure_rt_thread(int priority, int cpu) { struct sched_param param; cpu_set_t cpuset; /* Set SCHED_FIFO with specified priority */ memset(¶m, 0, sizeof(param)); param.sched_priority = priority; if (sched_setscheduler(0, SCHED_FIFO, ¶m) != 0) { perror("sched_setscheduler failed"); return -1; } /* Pin to specific CPU */ CPU_ZERO(&cpuset); CPU_SET(cpu, &cpuset); if (sched_setaffinity(0, sizeof(cpuset), &cpuset) != 0) { perror("sched_setaffinity failed"); return -1; } return 0;} /* ============================================ * PHASE 3: RT EXECUTION * ============================================ */ /** * The actual RT work - MUST be deterministic * NO allocations, NO blocking I/O, NO syscalls */static inline void do_rt_work(app_state_t* state) { /* Read sensors (via pre-mapped memory or similar) */ /* ... */ /* Execute control algorithm */ /* ... deterministic computation only ... */ /* Write outputs (via pre-mapped memory or similar) */ /* ... */} /** * Real-time thread main function */static void* rt_thread_main(void* arg) { app_state_t* state = (app_state_t*)arg; struct timespec next_wake; /* Configure RT scheduling */ if (configure_rt_thread(RT_PRIORITY, RT_CPU) != 0) { fprintf(stderr, "Failed to configure RT\n"); return NULL; } /* Initialize wake time */ clock_gettime(CLOCK_MONOTONIC, &next_wake); /* ========== RT LOOP ========== */ while (state->running) { /* Calculate next wake time (BEFORE work) */ next_wake.tv_nsec += PERIOD_NS; if (next_wake.tv_nsec >= 1000000000L) { next_wake.tv_nsec -= 1000000000L; next_wake.tv_sec++; } /* Do real-time work */ do_rt_work(state); /* Sleep until next period (absolute time = no drift) */ clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_wake, NULL); } /* ========== END RT LOOP ========== */ return NULL;} /* ============================================ * MAIN * ============================================ */ int main(int argc, char* argv[]) { pthread_t rt_thread; /* Allocate application state */ g_state = calloc(1, sizeof(app_state_t)); if (!g_state) { perror("Failed to allocate state"); return 1; } g_state->running = 1; /* PHASE 1: Initialize everything BEFORE RT */ printf("Initializing...\n"); if (init_memory(g_state) != 0) { return 1; } if (pthread_mutex_init(&g_state->data_lock, NULL) != 0) { return 1; } if (lock_memory() != 0) { return 1; } printf("Starting RT thread...\n"); /* Create RT thread */ if (pthread_create(&rt_thread, NULL, rt_thread_main, g_state) != 0) { perror("pthread_create failed"); return 1; } /* Main thread can do non-RT work (logging, UI, etc.) */ while (g_state->running) { sleep(1); /* Monitor, log, handle commands, etc. */ } /* PHASE 4: Shutdown */ g_state->running = 0; pthread_join(rt_thread, NULL); /* Cleanup */ for (int i = 0; i < NUM_BUFFERS; i++) { free(g_state->buffers[i]); } pthread_mutex_destroy(&g_state->data_lock); free(g_state); return 0;}Everything that could possibly cause non-determinism must happen BEFORE the RT loop starts or AFTER it ends. The RT loop itself should be a pure, predictable computation from pre-allocated inputs to pre-allocated outputs.
Memory operations are among the most significant sources of non-determinism. The standard C library's malloc/free, page faults, and memory compaction can all cause unbounded delays.
Memory Locking:
The first defense against memory-related latency is locking all memory:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
#include <sys/mman.h>#include <string.h> #define STACK_PREFAULT_SIZE (512 * 1024) /* 512 KB */#define HEAP_PREFAULT_SIZE (10 * 1024 * 1024) /* 10 MB */ /** * Comprehensive memory preparation for RT */int prepare_memory_for_rt(void) { void* heap_region; volatile char stack_prefault[STACK_PREFAULT_SIZE]; /* * Step 1: Lock all current and future memory * * MCL_CURRENT: Lock pages currently in address space * MCL_FUTURE: Lock pages added later (by mmap, brk) */ if (mlockall(MCL_CURRENT | MCL_FUTURE) != 0) { perror("mlockall failed"); fprintf(stderr, "Note: requires CAP_IPC_LOCK or sufficient RLIMIT_MEMLOCK\n"); return -1; } /* * Step 2: Pre-fault stack * * Stack grows downward; touch pages to prevent * page faults during RT execution when stack is used. */ memset((void*)stack_prefault, 0, sizeof(stack_prefault)); /* * Step 3: Pre-allocate and pre-fault heap region * * If you'll use any heap during RT (even from pools), * pre-allocate the region now. */ heap_region = malloc(HEAP_PREFAULT_SIZE); if (heap_region) { memset(heap_region, 0, HEAP_PREFAULT_SIZE); /* Now all pages are faulted in and locked */ /* Don't free - we want the memory to stay mapped! */ /* Or use it as your RT memory pool base. */ } /* * Step 4: Pre-touch any other memory regions * - Shared memory segments * - Memory-mapped device regions * - Thread-local storage */ return 0;}RT-Safe Memory Allocation:
For dynamic memory needs in RT code, use pre-allocated pools:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
#include <stdint.h>#include <string.h>#include <stdlib.h> /** * Simple Fixed-Size Block Pool Allocator * * Deterministic O(1) allocation and free from pre-allocated pool. * No system calls, no locks (single-threaded or use atomic ops). */typedef struct { void* base; /* Pool base address */ size_t block_size; /* Size of each block */ size_t num_blocks; /* Total number of blocks */ void* free_list; /* Head of free list */} rt_pool_t; /* Initialize pool - call BEFORE entering RT */rt_pool_t* rt_pool_create(size_t block_size, size_t num_blocks) { rt_pool_t* pool = malloc(sizeof(rt_pool_t)); if (!pool) return NULL; /* Ensure block size can hold a pointer for free list */ if (block_size < sizeof(void*)) block_size = sizeof(void*); /* Align block size */ block_size = (block_size + 7) & ~7; pool->base = aligned_alloc(64, block_size * num_blocks); if (!pool->base) { free(pool); return NULL; } /* Pre-fault all pages */ memset(pool->base, 0, block_size * num_blocks); pool->block_size = block_size; pool->num_blocks = num_blocks; /* Build free list (each block points to next) */ pool->free_list = pool->base; char* p = (char*)pool->base; for (size_t i = 0; i < num_blocks - 1; i++) { *(void**)(p) = p + block_size; p += block_size; } *(void**)(p) = NULL; /* Last block */ return pool;} /* RT-safe allocation - O(1), no syscalls */void* rt_pool_alloc(rt_pool_t* pool) { if (!pool->free_list) return NULL; /* Pool exhausted */ void* block = pool->free_list; pool->free_list = *(void**)block; return block;} /* RT-safe free - O(1), no syscalls */void rt_pool_free(rt_pool_t* pool, void* block) { if (!block) return; /* Add to front of free list */ *(void**)block = pool->free_list; pool->free_list = block;} /* Destroy pool - call AFTER exiting RT */void rt_pool_destroy(rt_pool_t* pool) { if (pool) { free(pool->base); free(pool); }}In C++, use custom allocators for STL containers, placement new for objects, and avoid implicit allocations (string concatenation, exceptions, lambdas that capture by value). Consider RT-safe containers or pre-sized containers that won't reallocate.
Real-time applications often need to communicate between RT and non-RT threads: the RT thread produces sensor data while a non-RT thread logs it, or the RT thread receives setpoints from a non-RT UI thread. This communication must be carefully designed to avoid blocking the RT thread.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
#include <stdatomic.h>#include <stdbool.h>#include <string.h> /** * Single-Producer Single-Consumer Lock-Free Queue * * RT thread can be producer OR consumer (but not both) * Non-RT thread takes the other role * * Uses power-of-2 buffer size for mask-based wrapping. */#define QUEUE_SIZE 1024 /* Must be power of 2 */#define QUEUE_MASK (QUEUE_SIZE - 1) typedef struct { void* buffer[QUEUE_SIZE]; atomic_size_t head; /* Write position (producer) */ atomic_size_t tail; /* Read position (consumer) */} spsc_queue_t; /* Initialize - call before RT */void spsc_queue_init(spsc_queue_t* q) { memset(q->buffer, 0, sizeof(q->buffer)); atomic_store(&q->head, 0); atomic_store(&q->tail, 0);} /* Check if queue is empty */bool spsc_queue_empty(spsc_queue_t* q) { return atomic_load(&q->head) == atomic_load(&q->tail);} /* Check if queue is full */bool spsc_queue_full(spsc_queue_t* q) { size_t next_head = (atomic_load(&q->head) + 1) & QUEUE_MASK; return next_head == atomic_load(&q->tail);} /** * Enqueue - Producer only (RT-safe) * * @return true if successful, false if queue full */bool spsc_queue_push(spsc_queue_t* q, void* item) { size_t head = atomic_load_explicit(&q->head, memory_order_relaxed); size_t next_head = (head + 1) & QUEUE_MASK; /* Check if full */ if (next_head == atomic_load_explicit(&q->tail, memory_order_acquire)) { return false; /* Queue full */ } q->buffer[head] = item; /* Release ensures buffer write visible before head update */ atomic_store_explicit(&q->head, next_head, memory_order_release); return true;} /** * Dequeue - Consumer only (RT-safe) * * @return item if successful, NULL if queue empty */void* spsc_queue_pop(spsc_queue_t* q) { size_t tail = atomic_load_explicit(&q->tail, memory_order_relaxed); /* Check if empty */ if (tail == atomic_load_explicit(&q->head, memory_order_acquire)) { return NULL; /* Queue empty */ } void* item = q->buffer[tail]; /* Release ensures we've read item before advancing tail */ atomic_store_explicit(&q->tail, (tail + 1) & QUEUE_MASK, memory_order_release); return item;}Using Priority Inheritance Mutexes:
When lock-free isn't feasible, use mutexes with priority inheritance to prevent priority inversion:
12345678910111213141516171819202122232425262728293031323334353637
#include <pthread.h> /** * Create a mutex with priority inheritance * * When an RT thread blocks waiting for this mutex, * the holder's priority is boosted to the waiter's priority. */int create_pi_mutex(pthread_mutex_t* mutex) { pthread_mutexattr_t attr; int ret; ret = pthread_mutexattr_init(&attr); if (ret != 0) return ret; /* Set priority inheritance protocol */ ret = pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT); if (ret != 0) { pthread_mutexattr_destroy(&attr); return ret; } ret = pthread_mutex_init(mutex, &attr); pthread_mutexattr_destroy(&attr); return ret;} /* * Usage rules for PI mutexes in RT code: * * 1. Keep critical sections SHORT * 2. NEVER call functions that might block while holding mutex * 3. Avoid nested locking (potential deadlock) * 4. RT thread should hold lock for bounded time only * 5. Test thoroughly under load for priority inversion */Before using mutexes, consider whether a lock-free design is possible. SPSC queues and double-buffering handle most RT-to-nonRT communication patterns without any locks. Locks should be the exception, not the rule, in RT code.
Correct timing is fundamental to real-time systems. Using the wrong clock, sleep mechanism, or timing approach causes drift, jitter, or outright failure.
| Clock | Behavior | Use Case |
|---|---|---|
| CLOCK_MONOTONIC | Steady, never jumps, advances at wall time rate | Primary choice for RT timing |
| CLOCK_MONOTONIC_RAW | Like MONOTONIC but not adjusted by NTP | Precision physics, when NTP adjustment is noise |
| CLOCK_REALTIME | Wall clock time, can jump (NTP, admin) | Timestamps for logs, NEVER for scheduling |
| CLOCK_TAI | Atomic time without leap seconds | Systems crossing leap second boundaries |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
#include <time.h>#include <stdint.h> /** * CORRECT: Absolute time periodic loop * * Uses absolute wake times to prevent drift. * Each iteration targets a precise wall-clock instant. */void periodic_loop_correct(uint64_t period_ns) { struct timespec next_wake; clock_gettime(CLOCK_MONOTONIC, &next_wake); while (running) { /* Calculate NEXT absolute wake time first */ next_wake.tv_nsec += period_ns; while (next_wake.tv_nsec >= 1000000000L) { next_wake.tv_nsec -= 1000000000L; next_wake.tv_sec++; } /* Do work AFTER calculating next wake time */ do_periodic_work(); /* Absolute sleep - sleep UNTIL next_wake time */ clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_wake, NULL); /* * Even if work takes varying time, we always wake * at the same absolute instants: T, T+P, T+2P, T+3P... * No drift accumulation! */ }} /** * WRONG: Relative time periodic loop * * Drift accumulates because sleep starts AFTER work completes. * Period becomes: work_time + sleep_time, not just period. */void periodic_loop_wrong(uint64_t period_ns) { struct timespec sleep_time = { .tv_sec = period_ns / 1000000000L, .tv_nsec = period_ns % 1000000000L }; while (running) { do_periodic_work(); /* Takes variable time */ /* Relative sleep - sleep FOR duration starting NOW */ nanosleep(&sleep_time, NULL); /* * PROBLEM: If work takes 100μs, actual period is: * 100μs + 1000μs = 1100μs instead of 1000μs * * Over 1000 iterations: 100ms drift! */ }}Handling Overruns:
What happens when work takes longer than the period? The design must handle this gracefully:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
#include <time.h>#include <stdio.h> /** * Periodic loop with overrun detection and handling */void periodic_loop_with_overrun_handling(uint64_t period_ns) { struct timespec next_wake, now; uint64_t overrun_count = 0; clock_gettime(CLOCK_MONOTONIC, &next_wake); while (running) { /* Calculate next wake time */ next_wake.tv_nsec += period_ns; while (next_wake.tv_nsec >= 1000000000L) { next_wake.tv_nsec -= 1000000000L; next_wake.tv_sec++; } /* Do work */ do_periodic_work(); /* Check for overrun BEFORE sleeping */ clock_gettime(CLOCK_MONOTONIC, &now); int64_t time_to_wake = (next_wake.tv_sec - now.tv_sec) * 1000000000LL + (next_wake.tv_nsec - now.tv_nsec); if (time_to_wake < 0) { /* * OVERRUN: We've missed our deadline! * The work took longer than the period. */ overrun_count++; /* Option 1: Log and skip to next valid period */ while (time_to_wake < 0) { next_wake.tv_nsec += period_ns; while (next_wake.tv_nsec >= 1000000000L) { next_wake.tv_nsec -= 1000000000L; next_wake.tv_sec++; } time_to_wake += period_ns; } /* Log outside RT loop later, or to RT-safe trace */ /* rt_trace("Overrun detected, skipped to next period"); */ /* Option 2: For critical systems, signal fault */ /* signal_overrun_fault(); */ } /* If we haven't overrun, sleep; otherwise loop immediately */ if (time_to_wake > 0) { clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_wake, NULL); } } /* Report statistics after loop */ if (overrun_count > 0) { fprintf(stderr, "Warning: %lu overruns detected\n", overrun_count); }}Every RT system must define an overrun policy: Skip period? Accumulate debt? Signal failure? The correct choice depends on the application. A video player might skip frames; a safety system might trigger an emergency stop.
Learn from the mistakes of others. These anti-patterns appear frequently in RT code and cause subtle or catastrophic problems:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
/** * RT-Safe Tracing: Instead of printf() * * Uses lock-free ring buffer that non-RT thread drains. */ #include <stdatomic.h>#include <time.h> #define TRACE_ENTRIES 4096#define TRACE_MSG_LEN 64 typedef struct { uint64_t timestamp_ns; int event_id; int64_t value; char msg[TRACE_MSG_LEN];} trace_entry_t; typedef struct { trace_entry_t entries[TRACE_ENTRIES]; atomic_size_t head; /* Next write position */ atomic_size_t tail; /* Next read position */} rt_trace_buffer_t; static rt_trace_buffer_t g_trace = {0}; /* RT-safe trace - O(1), no blocking */void rt_trace(int event_id, int64_t value, const char* msg) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); size_t idx = atomic_fetch_add(&g_trace.head, 1) % TRACE_ENTRIES; trace_entry_t* entry = &g_trace.entries[idx]; entry->timestamp_ns = ts.tv_sec * 1000000000ULL + ts.tv_nsec; entry->event_id = event_id; entry->value = value; if (msg) { strncpy(entry->msg, msg, TRACE_MSG_LEN - 1); entry->msg[TRACE_MSG_LEN - 1] = '\0'; } else { entry->msg[0] = '\0'; } /* Note: This is a simplified single-producer implementation. * Full implementation would handle wraparound stealing from tail. */} /* Non-RT thread calls this to drain and print */void drain_trace_buffer(FILE* out) { size_t tail = atomic_load(&g_trace.tail); size_t head = atomic_load(&g_trace.head); while (tail != head) { trace_entry_t* e = &g_trace.entries[tail % TRACE_ENTRIES]; fprintf(out, "[%lu.%09lu] Event %d: %ld %s\n", e->timestamp_ns / 1000000000ULL, e->timestamp_ns % 1000000000ULL, e->event_id, e->value, e->msg); tail++; } atomic_store(&g_trace.tail, tail);}For every function called in your RT path, ask: (1) Can it allocate memory? (2) Can it do I/O? (3) Can it block on a lock? (4) Can it call into the kernel? (5) Is its execution time bounded? If any answer is 'yes' or 'maybe', investigate or refactor.
Debugging real-time systems requires specialized techniques. Traditional debuggers that stop execution break the real-time behavior you're trying to observe.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
#include <string.h>#include <stdio.h> /** * Simple Latency Histogram for RT Analysis */#define HIST_BUCKETS 100#define HIST_BUCKET_NS 1000 /* 1μs per bucket */#define HIST_OVERFLOW_IDX (HIST_BUCKETS - 1) typedef struct { uint64_t buckets[HIST_BUCKETS]; uint64_t min_ns; uint64_t max_ns; uint64_t total_ns; uint64_t count;} latency_histogram_t; void histogram_init(latency_histogram_t* h) { memset(h, 0, sizeof(*h)); h->min_ns = UINT64_MAX;} /* Record a latency sample (RT-safe) */void histogram_record(latency_histogram_t* h, uint64_t latency_ns) { /* Update min/max */ if (latency_ns < h->min_ns) h->min_ns = latency_ns; if (latency_ns > h->max_ns) h->max_ns = latency_ns; /* Update total and count */ h->total_ns += latency_ns; h->count++; /* Update bucket */ size_t bucket = latency_ns / HIST_BUCKET_NS; if (bucket >= HIST_BUCKETS) bucket = HIST_OVERFLOW_IDX; h->buckets[bucket]++;} /* Print histogram (NOT RT-safe - use after RT loop) */void histogram_print(latency_histogram_t* h, FILE* out) { fprintf(out, "Latency Histogram (n=%lu)\n", h->count); fprintf(out, "Min: %lu ns, Max: %lu ns, Avg: %lu ns\n", h->min_ns, h->max_ns, h->count ? h->total_ns / h->count : 0); fprintf(out, "\nDistribution:\n"); for (int i = 0; i < HIST_BUCKETS; i++) { if (h->buckets[i] > 0) { if (i == HIST_OVERFLOW_IDX) { fprintf(out, ">=%d us: %lu\n", i * HIST_BUCKET_NS / 1000, h->buckets[i]); } else { fprintf(out, "%d-%d us: %lu\n", i * HIST_BUCKET_NS / 1000, (i + 1) * HIST_BUCKET_NS / 1000, h->buckets[i]); } } }} /* Example usage in RT loop */void rt_loop_with_histogram(void) { latency_histogram_t hist; struct timespec target_wake, actual_wake; histogram_init(&hist); clock_gettime(CLOCK_MONOTONIC, &target_wake); while (running && hist.count < 1000000) { target_wake.tv_nsec += PERIOD_NS; /* normalize... */ clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &target_wake, NULL); clock_gettime(CLOCK_MONOTONIC, &actual_wake); int64_t latency = (actual_wake.tv_sec - target_wake.tv_sec) * 1000000000LL + (actual_wake.tv_nsec - target_wake.tv_nsec); if (latency > 0) { /* Record positive jitter */ histogram_record(&hist, latency); } do_work(); } /* Print after RT loop */ histogram_print(&hist, stdout);}When analyzing histograms, focus on the tail—the rare high-latency events. For RT guarantees, you need to know not just the 99th percentile but the absolute maximum observed. Run millions of iterations under load to exercise rare code paths.
Moving RT applications from development to production requires additional considerations for reliability, monitoring, and maintenance.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
#include <sys/resource.h>#include <signal.h>#include <unistd.h>#include <fcntl.h> /** * Production safety setup */int setup_production_safety(void) { int wd_fd; /* * 1. Hardware Watchdog * * Open watchdog device and configure timeout. * Must write periodically or system reboots. */ wd_fd = open("/dev/watchdog", O_WRONLY); if (wd_fd >= 0) { int timeout = 5; /* 5 second timeout */ ioctl(wd_fd, WDIOC_SETTIMEOUT, &timeout); /* Store wd_fd; RT thread must write to it periodically */ g_watchdog_fd = wd_fd; } else { fprintf(stderr, "Warning: No hardware watchdog\n"); } /* * 2. Set RLIMIT_RTTIME * * Limits continuous RT CPU time without blocking. * Process receives SIGXCPU if exceeded, then SIGKILL. * Prevents runaway RT task from hanging system. */ struct rlimit limit; limit.rlim_cur = 100000; /* 100ms soft limit */ limit.rlim_max = 200000; /* 200ms hard limit */ if (setrlimit(RLIMIT_RTTIME, &limit) != 0) { perror("setrlimit RLIMIT_RTTIME"); /* Non-fatal but warn */ } /* * 3. Install SIGXCPU handler * * Handle RT time limit exceeded gracefully. */ struct sigaction sa; sa.sa_handler = handle_rt_timeout; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGXCPU, &sa, NULL); return 0;} /* Feed watchdog from RT loop */void feed_watchdog(void) { if (g_watchdog_fd >= 0) { write(g_watchdog_fd, "\0", 1); }} /* Handle RT time limit exceeded */void handle_rt_timeout(int sig) { /* * RT task ran too long without blocking. * Options: * 1. Log and continue (soft limit only) * 2. Trigger graceful shutdown * 3. Switch to degraded mode */ g_rt_timeout_count++; /* Signal main thread to investigate */ sem_post(&g_check_health_sem);}Before production deployment, run the complete system under worst-case load for extended periods (24+ hours minimum, ideally weeks). RT problems that appear once per day will eventually appear—better in testing than production.
Building deterministic real-time applications requires discipline at every level of the software stack. Let's consolidate the essential guidelines:
Module Complete:
You have now completed the comprehensive coverage of Real-Time Linux. From the PREEMPT_RT architecture through scheduling policies, latency reduction, RT-Linux variants, and application development guidelines, you possess the knowledge to design, implement, and deploy deterministic real-time systems on Linux platforms.
You now have the complete toolkit for real-time Linux development: understand PREEMPT_RT internals, can select appropriate scheduling policies, know how to reduce latency, understand the RT-Linux ecosystem, and can write deterministic applications. Apply this knowledge to build reliable, deadline-meeting systems.