Loading content...
Not all messages are created equal. In real systems, some messages require immediate attention while others can wait. A system alert about imminent disk failure must take precedence over a routine log message. A user login request should be processed before a background cleanup task.
Priority ordering in message queues allows processes to assign urgency levels to messages, ensuring critical communications are processed first. This capability is essential for building responsive, real-time, and deadline-aware systems.
Both System V and POSIX message queues support priority, but with fundamentally different mechanisms:
msgtyp in receive selects lowest types firstUnderstanding these mechanisms and their trade-offs is crucial for designing systems that handle urgency correctly.
By the end of this page, you will understand priority semantics in both System V and POSIX queues, how to implement priority-based task distribution, priority inversion problems and solutions, fair scheduling with priorities, and real-world priority queue patterns.
POSIX message queues provide native, per-message priority support. Each message sent with mq_send() has an associated priority, and mq_receive() always returns the highest-priority message first.
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio);
Key Properties:
MQ_PRIO_MAX - 1 (typically 0-32767)#include <unistd.h>
long max_prio = sysconf(_SC_MQ_PRIO_MAX);
printf("Maximum priority: %ld
", max_prio - 1);
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <mqueue.h>#include <fcntl.h>#include <unistd.h> #define QUEUE_NAME "/priority_demo"#define MAX_MSG_SIZE 128 int main() { // Check system priority limit long max_prio = sysconf(_SC_MQ_PRIO_MAX); printf("System MQ_PRIO_MAX: %ld", max_prio); printf("Usable priority range: 0 to %ld ", max_prio - 1); struct mq_attr attr = { .mq_maxmsg = 10, .mq_msgsize = MAX_MSG_SIZE }; mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr); if (mq == (mqd_t)-1) { perror("mq_open"); return EXIT_FAILURE; } // ======================================== // Send messages in non-priority order // ======================================== printf("=== Sending Messages (various priorities) ==="); struct { const char *msg; unsigned int prio; } messages[] = { {"Low priority task", 1}, {"Normal priority task", 5}, {"HIGH PRIORITY ALERT", 20}, {"Another low priority", 1}, {"Medium priority job", 10}, {"CRITICAL SYSTEM ERROR", 31}, {"Normal work", 5}, }; for (size_t i = 0; i < sizeof(messages)/sizeof(messages[0]); i++) { mq_send(mq, messages[i].msg, strlen(messages[i].msg), messages[i].prio); printf("Sent (prio=%2u): %s", messages[i].prio, messages[i].msg); } // ======================================== // Receive messages (automatic priority order) // ======================================== printf("=== Receiving Messages (priority order) ==="); char buffer[MAX_MSG_SIZE]; unsigned int priority; ssize_t len; int count = 1; while ((len = mq_receive(mq, buffer, MAX_MSG_SIZE, &priority)) > 0) { buffer[len] = '\0'; printf("%d. (prio=%2u) %s", count++, priority, buffer); } printf("Notice: Messages received in priority order, FIFO within same priority"); mq_close(mq); mq_unlink(QUEUE_NAME); return EXIT_SUCCESS;}POSIX guarantees that mq_receive() returns the oldest message with the highest priority. You cannot receive a lower-priority message while higher-priority messages exist—even if you want to. For selective reception, you need System V queues or multiple POSIX queues.
System V message queues don't have explicit priority. Instead, priority behavior is achieved through the mtype field and negative msgtyp in msgrcv().
Recall that msgrcv(msqid, &msg, size, msgtyp, flags) with:
msgtyp > 0: Receives first message with that exact typemsgtyp < 0: Receives first message with type ≤ |msgtyp|, lowest type firstThis "lowest type first" behavior creates priority ordering:
// Define priorities (lower number = higher priority)
#define PRIO_CRITICAL 1L // Highest priority
#define PRIO_HIGH 10L
#define PRIO_NORMAL 100L
#define PRIO_LOW 1000L
// Receive highest-priority message (lowest type)
msgrcv(msqid, &msg, size, -1000L, 0); // Gets type 1 first
| Aspect | System V | POSIX |
|---|---|---|
| Priority direction | Lower mtype = higher priority | Higher priority = higher priority |
| Selectivity | Can still select specific types | Always highest priority first |
| Dual use | mtype serves as type AND priority | Separate priority field |
| Range | Positive long (1 to LONG_MAX) | 0 to MQ_PRIO_MAX-1 |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h> // Priority levels (lower = higher priority)#define PRIO_CRITICAL 1L#define PRIO_HIGH 10L#define PRIO_NORMAL 100L#define PRIO_LOW 1000L#define PRIO_BACKGROUND 10000L // Maximum priority level for reception#define PRIO_MAX PRIO_BACKGROUND struct priority_msg { long mtype; // Used as priority (lower = more urgent) char text[128];}; const char* priority_name(long prio) { if (prio <= PRIO_CRITICAL) return "CRITICAL"; if (prio <= PRIO_HIGH) return "HIGH"; if (prio <= PRIO_NORMAL) return "NORMAL"; if (prio <= PRIO_LOW) return "LOW"; return "BACKGROUND";} int main() { int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666); if (msqid == -1) { perror("msgget"); return EXIT_FAILURE; } struct priority_msg msg; // ======================================== // Send messages with various priorities // ======================================== printf("=== Sending Messages ==="); struct { long prio; const char *text; } messages[] = { {PRIO_NORMAL, "Normal task 1"}, {PRIO_LOW, "Low priority cleanup"}, {PRIO_CRITICAL, "SYSTEM CRITICAL ALERT!"}, {PRIO_NORMAL, "Normal task 2"}, {PRIO_HIGH, "High priority request"}, {PRIO_BACKGROUND, "Background indexing"}, {PRIO_CRITICAL, "Another critical issue"}, }; for (size_t i = 0; i < sizeof(messages)/sizeof(messages[0]); i++) { msg.mtype = messages[i].prio; strncpy(msg.text, messages[i].text, sizeof(msg.text)); msgsnd(msqid, &msg, sizeof(msg.text), 0); printf("Sent (%s): %s", priority_name(messages[i].prio), messages[i].text); } // ======================================== // Receive in priority order using negative msgtyp // ======================================== printf("=== Receiving in Priority Order ==="); printf("Using msgtyp = -%ld (receives lowest type first) ", PRIO_MAX); int count = 1; while (msgrcv(msqid, &msg, sizeof(msg.text), -PRIO_MAX, IPC_NOWAIT) != -1) { printf("%d. [%10s] %s", count++, priority_name(msg.mtype), msg.text); } printf("=== Analysis ==="); printf("Notice: CRITICAL messages (mtype=1) received first,"); printf("then HIGH (10), NORMAL (100), LOW (1000), BACKGROUND (10000)."); printf("Within same priority level, FIFO order is preserved."); msgctl(msqid, IPC_RMID, NULL); return EXIT_SUCCESS;}Leave gaps between priority levels (1, 10, 100, 1000) to allow inserting new levels later. If you use contiguous values (1, 2, 3, 4), you can't easily add a priority between CRITICAL and HIGH without renumbering everything.
One of the most common uses for priority message queues is task distribution to worker processes. Tasks are submitted with priorities, and workers always pick up the most urgent work first.
[Producer] ─┬─ Priority 31 ──┐
[Producer] ─┼─ Priority 10 ─┼─► [Priority Queue] ─┬─► [Worker 1]
[Producer] ─┼─ Priority 5 ─┤ ├─► [Worker 2]
[Producer] ─┴─ Priority 1 ─┘ └─► [Worker 3]
▲ Workers compete for
highest priority task
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <mqueue.h>#include <fcntl.h>#include <sys/wait.h>#include <time.h> #define QUEUE_NAME "/task_queue"#define MAX_MSG_SIZE 256#define NUM_WORKERS 3#define TASKS_PER_PRODUCER 5 // Task message structurestruct task { int task_id; int complexity; // 1-10, affects processing time char description[200];}; // Worker processvoid worker(int worker_id) { mqd_t mq = mq_open(QUEUE_NAME, O_RDONLY); if (mq == (mqd_t)-1) { perror("worker mq_open"); exit(1); } struct mq_attr attr; mq_getattr(mq, &attr); struct task task; unsigned int priority; printf("[Worker %d] Started", worker_id); while (1) { ssize_t len = mq_receive(mq, (char*)&task, attr.mq_msgsize, &priority); if (len == -1) break; printf("[Worker %d] Processing task %d (priority=%u, complexity=%d): %s", worker_id, task.task_id, priority, task.complexity, task.description); // Simulate work based on complexity usleep(task.complexity * 50000); // 50-500ms printf("[Worker %d] Completed task %d", worker_id, task.task_id); // Exit condition: special shutdown task if (priority == 0 && task.task_id == -1) { break; } } mq_close(mq); printf("[Worker %d] Shutting down", worker_id);} // Producer processvoid producer(mqd_t mq, int producer_id, int start_task_id) { srand(time(NULL) + producer_id); struct task task; for (int i = 0; i < TASKS_PER_PRODUCER; i++) { task.task_id = start_task_id + i; task.complexity = (rand() % 10) + 1; // Assign random priority (higher = more urgent) unsigned int priority = (rand() % 30) + 1; snprintf(task.description, sizeof(task.description), "Task from producer %d", producer_id); mq_send(mq, (char*)&task, sizeof(task), priority); printf("[Producer %d] Submitted task %d with priority %u", producer_id, task.task_id, priority); usleep(10000); // Small delay between submissions }} int main() { // Create queue struct mq_attr attr = { .mq_maxmsg = 50, .mq_msgsize = MAX_MSG_SIZE }; mq_unlink(QUEUE_NAME); // Clean up any existing queue mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr); if (mq == (mqd_t)-1) { perror("mq_open"); return EXIT_FAILURE; } printf("=== Priority Task Distribution Demo === "); // Start workers pid_t worker_pids[NUM_WORKERS]; for (int i = 0; i < NUM_WORKERS; i++) { worker_pids[i] = fork(); if (worker_pids[i] == 0) { worker(i + 1); exit(0); } } sleep(1); // Let workers start // Run producers (in main process for simplicity) printf("=== Submitting Tasks ==="); producer(mq, 1, 1); producer(mq, 2, 100); // Wait for queue to drain sleep(3); // Send shutdown signals struct task shutdown = { .task_id = -1 }; for (int i = 0; i < NUM_WORKERS; i++) { mq_send(mq, (char*)&shutdown, sizeof(shutdown), 0); } // Wait for workers for (int i = 0; i < NUM_WORKERS; i++) { waitpid(worker_pids[i], NULL, 0); } printf("=== All workers completed ==="); mq_close(mq); mq_unlink(QUEUE_NAME); return EXIT_SUCCESS;}Priority inversion occurs when a high-priority task is blocked waiting for a resource held by a low-priority task, while medium-priority tasks run freely. This violates the fundamental expectation that higher-priority work completes first.
Time →
Process L (Low priority): [====HOLDS LOCK====]────waiting─────►
Process M (Med priority): [=====RUNNING FREELY=====]
Process H (High priority): [WAITING FOR LOCK...]──►
↑
H should run, but M runs instead!
H is blocked by L, L can't release
lock because M is running.
NASA's Mars Pathfinder mission experienced priority inversion that caused system resets. A low-priority meteorological task held a shared mutex while a high-priority bus management task waited. Medium-priority tasks continued running, and the high-priority task missed deadlines, triggering watchdog resets.
Message queues can experience analogous problems:
Priority inversion is a design problem, not a runtime problem. If your system uses priorities, audit every shared resource, every queue, and every synchronization point for inversion potential. The bugs are subtle and often only manifest under specific load patterns.
In strict priority systems, low-priority messages can starve indefinitely if high-priority messages keep arriving. This is sometimes acceptable (critical alerts should always preempt logging), but often problematic.
Time →
Arrivals: H H L H H L H H L H H L ...
Processed: H H H H H H H H ... (L never runs!)
1. Priority Aging Increase message priority based on time waiting:
effective_priority = base_priority + (wait_time / aging_interval)
Caveat: Complicates priority semantics; all priorities eventually become "high."
2. Weighted Fair Queuing Process N high-priority messages, then 1 low-priority:
// Process 5 high-priority, then 1 low-priority
for (i = 0; i < 5; i++) {
receive_high_priority();
}
receive_any_priority();
3. Quota Systems Limit high-priority message submission rate:
if (high_prio_this_second > QUOTA) {
reject_or_downgrade(message);
}
4. Multi-Queue with Weighted Consumption Separate queues, weighted round-robin consumption.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <mqueue.h>#include <fcntl.h>#include <time.h>#include <errno.h> // ================================================// Fair Priority Consumer with Weighted Processing// ================================================ #define QUEUE_NAME "/fair_demo"#define MAX_MSG_SIZE 128 // Priority tiers#define TIER_HIGH 20 // Priority >= 20#define TIER_NORMAL 10 // Priority 10-19#define TIER_LOW 0 // Priority 0-9 // Weights: process high_weight high-priority, then normal_weight normal, etc.#define WEIGHT_HIGH 5#define WEIGHT_NORMAL 3#define WEIGHT_LOW 1 struct message { int id; char content[120];}; // Try to receive a message within a priority range// Returns 1 if received, 0 if none available in rangeint receive_in_range(mqd_t mq, struct message *msg, unsigned int *priority, unsigned int min_prio, unsigned int max_prio) { struct mq_attr attr; mq_getattr(mq, &attr); // Try to receive (non-blocking) ssize_t len = mq_receive(mq, (char*)msg, attr.mq_msgsize, priority); if (len == -1) { return 0; // Queue empty } // Check if priority is in our target range if (*priority >= min_prio && *priority <= max_prio) { return 1; // Got one in range } // Wrong priority tier - put it back (imperfect but demonstrates concept) // In production, you'd use separate queues per tier mq_send(mq, (char*)msg, sizeof(*msg), *priority); return 0;} void fair_consumer(mqd_t mq) { struct message msg; unsigned int priority; int total = 0; int high_count = 0, normal_count = 0, low_count = 0; printf("=== Fair Priority Consumer ==="); printf("Weights: HIGH=%d, NORMAL=%d, LOW=%d ", WEIGHT_HIGH, WEIGHT_NORMAL, WEIGHT_LOW); // Process in weighted rounds while (total < 20) { int processed_this_round = 0; // Process WEIGHT_HIGH high-priority messages for (int i = 0; i < WEIGHT_HIGH; i++) { if (receive_in_range(mq, &msg, &priority, TIER_HIGH, 31)) { printf("HIGH (prio=%2u): msg %d", priority, msg.id); high_count++; total++; processed_this_round++; } } // Process WEIGHT_NORMAL normal-priority messages for (int i = 0; i < WEIGHT_NORMAL; i++) { if (receive_in_range(mq, &msg, &priority, TIER_NORMAL, TIER_HIGH-1)) { printf("NORMAL (prio=%2u): msg %d", priority, msg.id); normal_count++; total++; processed_this_round++; } } // Process WEIGHT_LOW low-priority messages for (int i = 0; i < WEIGHT_LOW; i++) { if (receive_in_range(mq, &msg, &priority, TIER_LOW, TIER_NORMAL-1)) { printf("LOW (prio=%2u): msg %d", priority, msg.id); low_count++; total++; processed_this_round++; } } if (processed_this_round == 0) break; // Queue drained } printf("=== Summary ==="); printf("High: %d, Normal: %d, Low: %d", high_count, normal_count, low_count); printf("Expected ratio: %d:%d:%d", WEIGHT_HIGH, WEIGHT_NORMAL, WEIGHT_LOW);} int main() { struct mq_attr attr = { .mq_maxmsg = 50, .mq_msgsize = MAX_MSG_SIZE }; mq_unlink(QUEUE_NAME); mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr); if (mq == (mqd_t)-1) { perror("mq_open"); return EXIT_FAILURE; } // Populate queue with mixed priorities printf("Populating queue with mixed-priority messages... "); struct message msg; srand(time(NULL)); for (int i = 1; i <= 30; i++) { msg.id = i; snprintf(msg.content, sizeof(msg.content), "Message %d", i); // Distribute priorities: more high than low (typical load) unsigned int prio; int r = rand() % 10; if (r < 5) prio = 20 + (rand() % 10); // 50% high else if (r < 8) prio = 10 + (rand() % 10); // 30% normal else prio = rand() % 10; // 20% low mq_send(mq, (char*)&msg, sizeof(msg), prio); } fair_consumer(mq); mq_close(mq); mq_unlink(QUEUE_NAME); return EXIT_SUCCESS;}Production systems use various priority patterns tailored to their requirements. Here are proven designs from real-world systems:
| Pattern | Description | Use Case |
|---|---|---|
| Two-Queue | Separate queues: urgent + normal | Simple systems; clear urgent/non-urgent division |
| N-Tier Queues | One queue per priority tier (e.g., 3-5 queues) | Complex systems; prevents cross-tier interference |
| Deadline-Based | Priority derived from deadline proximity | Real-time systems; ETL pipelines with SLAs |
| Cost-Based | Priority based on message processing cost | Resource optimization; batch processing |
| Hybrid Type+Priority | System V types for routing, nested priority for ordering | Complex routing with per-route priorities |
Like credential inflation in organizations, priority inflation is inevitable. Users and systems learn that high-priority gets faster service, so everything becomes high-priority. Combat this with quotas, auditing, or automatic priority decay for chronic high-priority senders.
We've explored priority ordering in message queues comprehensively. Here are the essential takeaways:
You now understand priority ordering in message queues—how to implement it, its pitfalls, and production patterns. Next, we'll explore the advantages of message queues and when they're the right choice for your IPC needs.