Loading content...
When processes communicate through message queues, raw byte streams aren't enough. Real-world IPC requires structure—the ability to distinguish commands from responses, urgent messages from routine ones, and messages destined for different receivers.
Message types provide this structure. In System V message queues, the mtype field serves as both an identifier and a routing mechanism. In POSIX queues, while there's no mtype field, message priority serves a similar role, and application-level type fields are commonly added.
Understanding message types is essential for designing robust IPC protocols. A well-designed type system enables:
By the end of this page, you will understand how to design message type systems, implement type-based routing, create request-response protocols, handle multiple message categories efficiently, and avoid common pitfalls in message type design.
In System V message queues, the mtype field is the cornerstone of message typing. Let's understand its mechanics deeply.
struct msgbuf {
long mtype; // MUST be > 0
char mtext[1]; // Variable-length data
};
Critical Rules:
mtype must be a positive value (> 0)mtype of 0 or negative causes msgsnd() to fail with EINVALThe msgtyp parameter of msgrcv() enables powerful selection:
| msgtyp Value | Behavior | Use Case |
|---|---|---|
0 | First message of any type (FIFO) | Generic message drain |
> 0 (e.g., 5) | First message with mtype = 5 | Specific message type |
< 0 (e.g., -5) | First message with mtype ≤ 5, lowest first | Priority reception |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h> #define MSG_SIZE 64 struct message { long mtype; char text[MSG_SIZE];}; void demonstrate_selection(int msqid) { struct message msg; ssize_t len; // ======================================== // Populate queue with various message types // ======================================== printf("=== Populating Queue ===\n"); struct { long type; const char *text; } messages[] = { {3, "Message type 3 (first)"}, {1, "Message type 1"}, {5, "Message type 5"}, {3, "Message type 3 (second)"}, {2, "Message type 2"}, }; for (int i = 0; i < 5; i++) { msg.mtype = messages[i].type; strncpy(msg.text, messages[i].text, MSG_SIZE); msgsnd(msqid, &msg, MSG_SIZE, 0); printf("Sent: type=%ld, '%s'\n", msg.mtype, msg.text); } // Queue now: [3, 1, 5, 3, 2] (insertion order) // ======================================== // Selection 1: Receive ANY (msgtyp = 0) // ======================================== printf("\n=== msgtyp = 0 (any message) ===\n"); len = msgrcv(msqid, &msg, MSG_SIZE, 0, IPC_NOWAIT); if (len > 0) { printf("Received: type=%ld, '%s'\n", msg.mtype, msg.text); // Receives first message: type 3 } // ======================================== // Selection 2: Receive SPECIFIC type (msgtyp = 5) // ======================================== printf("\n=== msgtyp = 5 (exact match) ===\n"); len = msgrcv(msqid, &msg, MSG_SIZE, 5, IPC_NOWAIT); if (len > 0) { printf("Received: type=%ld, '%s'\n", msg.mtype, msg.text); // Skips types 1,3 and receives type 5 } // ======================================== // Selection 3: Priority (msgtyp = -3) // ======================================== printf("\n=== msgtyp = -3 (lowest type <= 3) ===\n"); len = msgrcv(msqid, &msg, MSG_SIZE, -3, IPC_NOWAIT); if (len > 0) { printf("Received: type=%ld, '%s'\n", msg.mtype, msg.text); // Receives type 1 (lowest available <= 3) } // Drain remaining printf("\n=== Drain remaining ===\n"); while ((len = msgrcv(msqid, &msg, MSG_SIZE, 0, IPC_NOWAIT)) > 0) { printf("Received: type=%ld, '%s'\n", msg.mtype, msg.text); }} int main() { int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666); if (msqid == -1) { perror("msgget"); return EXIT_FAILURE; } demonstrate_selection(msqid); msgctl(msqid, IPC_RMID, NULL); return EXIT_SUCCESS;}Think of mtype as a 'channel' within the queue. Each type value creates a logical sub-queue. Receivers requesting that specific type see only messages on their channel, while type=0 receivers see all channels in FIFO order.
Different applications require different type schemes. Here are the most common patterns used in production systems:
The simplest scheme distinguishes requests from responses:
#define MSG_REQUEST 1L
#define MSG_RESPONSE 2L
Clients send type 1, servers receive type 1, process, and reply with type 2.
When multiple clients use the same server queue, each client needs a unique response type:
#define MSG_REQUEST 1L
#define MSG_RESPONSE_BASE 1000L
// Client with PID 1234 uses response type 1234
long my_response_type = getpid();
This prevents clients from receiving each other's responses.
Group related operations under type ranges:
// Administrative operations: 1-99
#define MSG_ADMIN_START 1L
#define MSG_ADMIN_STOP 2L
#define MSG_ADMIN_STATUS 3L
// Data operations: 100-199
#define MSG_DATA_READ 100L
#define MSG_DATA_WRITE 101L
#define MSG_DATA_DELETE 102L
// Event notifications: 200-299
#define MSG_EVENT_CONNECT 200L
#define MSG_EVENT_ERROR 201L
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>#include <sys/wait.h> // ================================================// Pattern: Per-Client Response Types// ================================================ #define MSG_REQUEST 1L // Message structure includes client's response typestruct client_request { long mtype; long client_response_type; // Where to send reply char command[64];}; struct server_response { long mtype; // Will be set to client_response_type int status; char result[128];}; void run_server(int msqid) { printf("[Server] Starting...\n"); struct client_request req; struct server_response resp; for (int i = 0; i < 3; i++) { // Handle 3 requests // Receive any request (type = 1) if (msgrcv(msqid, &req, sizeof(req) - sizeof(long), MSG_REQUEST, 0) == -1) { perror("server msgrcv"); return; } printf("[Server] Request from client %ld: '%s'\n", req.client_response_type, req.command); // Process and respond resp.mtype = req.client_response_type; // Route to specific client resp.status = 0; snprintf(resp.result, sizeof(resp.result), "Processed: %s", req.command); if (msgsnd(msqid, &resp, sizeof(resp) - sizeof(long), 0) == -1) { perror("server msgsnd"); } printf("[Server] Sent response to type %ld\n", resp.mtype); }} void run_client(int msqid, const char *command) { pid_t pid = getpid(); long my_type = pid; // Use PID as unique response type printf("[Client %d] Sending: '%s'\n", pid, command); struct client_request req; req.mtype = MSG_REQUEST; req.client_response_type = my_type; strncpy(req.command, command, sizeof(req.command)); if (msgsnd(msqid, &req, sizeof(req) - sizeof(long), 0) == -1) { perror("client msgsnd"); return; } // Wait for response on MY type only struct server_response resp; if (msgrcv(msqid, &resp, sizeof(resp) - sizeof(long), my_type, 0) == -1) { perror("client msgrcv"); return; } printf("[Client %d] Received: status=%d, '%s'\n", pid, resp.status, resp.result);} int main() { int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666); if (msqid == -1) { perror("msgget"); return EXIT_FAILURE; } // Fork multiple clients and one server for (int i = 0; i < 3; i++) { if (fork() == 0) { char cmd[32]; snprintf(cmd, sizeof(cmd), "command_%d", i); sleep(i); // Stagger requests run_client(msqid, cmd); exit(0); } } // Parent runs server run_server(msqid); // Wait for all children while (wait(NULL) > 0); msgctl(msqid, IPC_RMID, NULL); return EXIT_SUCCESS;}| Scheme | Best For | Limitations |
|---|---|---|
| Simple Command/Response | Single client-server pairs | Multiple clients receive wrong responses |
| Per-Client Types | Multiple clients, one server | Requires unique ID communication |
| Category Ranges | Complex multi-command systems | Range exhaustion, harder to extend |
| Priority Encoding | Urgency-based processing | Loses type semantics |
The most common IPC pattern is request-response: a client sends a request and waits for a reply. Message types are essential for routing responses correctly.
For robust request-response, consider using separate queues:
Client A → [Request Queue] → Server
← [Client A Response Queue] ←
Client B → [Request Queue] → Server
← [Client B Response Queue] ←
This avoids type-based routing complexity but requires more queue management.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>#include <time.h>#include <errno.h> // ================================================// Complete Request-Response Protocol with// Correlation IDs and Timeout Support// ================================================ #define MSG_REQUEST 1L#define TIMEOUT_SEC 5 // Generate unique request IDstatic unsigned int next_request_id = 1; struct request { long mtype; long reply_type; // Client's response type unsigned int request_id; // Correlation ID int operation; // What to do char data[128]; // Request data}; struct response { long mtype; unsigned int request_id; // Matches request int status; // 0 = success int error_code; // If status != 0 char data[128]; // Response data}; // Client function with timeoutint send_request(int msqid, int operation, const char *data, struct response *resp, int timeout_sec) { pid_t pid = getpid(); struct request req; req.mtype = MSG_REQUEST; req.reply_type = pid; req.request_id = __sync_fetch_and_add(&next_request_id, 1); req.operation = operation; strncpy(req.data, data, sizeof(req.data)); printf("[Client] Sending request #%u: op=%d\n", req.request_id, operation); if (msgsnd(msqid, &req, sizeof(req) - sizeof(long), 0) == -1) { perror("msgsnd"); return -1; } // Calculate deadline time_t deadline = time(NULL) + timeout_sec; // Poll with timeout (simplified - production code should use select/poll) while (time(NULL) < deadline) { ssize_t ret = msgrcv(msqid, resp, sizeof(*resp) - sizeof(long), pid, IPC_NOWAIT); if (ret > 0) { // Verify correlation ID if (resp->request_id != req.request_id) { printf("[Client] Stale response #%u (expected #%u)\n", resp->request_id, req.request_id); continue; // Wrong response, keep waiting } printf("[Client] Received response #%u\n", resp->request_id); return 0; // Success } if (errno != ENOMSG) { perror("msgrcv"); return -1; } usleep(100000); // 100ms poll interval } printf("[Client] Request #%u timed out\n", req.request_id); return -1; // Timeout} // Server functionvoid server_loop(int msqid, int num_requests) { struct request req; struct response resp; for (int i = 0; i < num_requests; i++) { if (msgrcv(msqid, &req, sizeof(req) - sizeof(long), MSG_REQUEST, 0) == -1) { perror("server msgrcv"); return; } printf("[Server] Processing request #%u from client type %ld\n", req.request_id, req.reply_type); // Process request resp.mtype = req.reply_type; resp.request_id = req.request_id; resp.status = 0; resp.error_code = 0; snprintf(resp.data, sizeof(resp.data), "Processed op %d on '%s'", req.operation, req.data); // Simulate processing time usleep(100000); // 100ms if (msgsnd(msqid, &resp, sizeof(resp) - sizeof(long), 0) == -1) { perror("server msgsnd"); } }} int main() { int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666); if (msqid == -1) { perror("msgget"); return EXIT_FAILURE; } if (fork() == 0) { // Child: server server_loop(msqid, 2); exit(0); } // Parent: client sleep(1); // Let server start struct response resp; // Successful request if (send_request(msqid, 1, "test_data", &resp, TIMEOUT_SEC) == 0) { printf("[Client] Success: %s\n", resp.data); } // Another request if (send_request(msqid, 2, "more_data", &resp, TIMEOUT_SEC) == 0) { printf("[Client] Success: %s\n", resp.data); } wait(NULL); msgctl(msqid, IPC_RMID, NULL); return EXIT_SUCCESS;}Complex systems often have multiple categories of messages that need different handling. Here's how to design scalable type hierarchies.
Reserve ranges for different purposes:
// System-wide type ranges
#define TYPE_RANGE_SYSTEM 0x0001 // 1-255: System messages
#define TYPE_RANGE_ADMIN 0x0100 // 256-511: Admin commands
#define TYPE_RANGE_USER 0x0200 // 512-767: User operations
#define TYPE_RANGE_RESPONSE 0x1000 // 4096+: Responses (PID-indexed)
// Within each range
#define MSG_SYS_HEARTBEAT (TYPE_RANGE_SYSTEM | 0x01) // 1
#define MSG_SYS_SHUTDOWN (TYPE_RANGE_SYSTEM | 0x02) // 2
#define MSG_ADMIN_LIST (TYPE_RANGE_ADMIN | 0x01) // 257
#define MSG_ADMIN_KILL (TYPE_RANGE_ADMIN | 0x02) // 258
Using negative msgtyp in msgrcv(), a worker can handle an entire category:
// Receive any admin message (type 256-511)
// msgtyp = -511 receives lowest type <= 511
msgrcv(msqid, &msg, size, -511, 0);
// But this also receives types 1-255!
// Better: Use separate queues or explicit type checking
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>#include <sys/wait.h> // ================================================// Multi-Category Message System// ================================================ // Type categoriesenum msg_category { CAT_CONTROL = 1, // 1-99: Control messages CAT_DATA = 100, // 100-199: Data operations CAT_EVENT = 200, // 200-299: Events}; // Control messages (1-99)#define MSG_CTRL_PING 1L#define MSG_CTRL_PONG 2L#define MSG_CTRL_STOP 99L // Data messages (100-199)#define MSG_DATA_PUT 100L#define MSG_DATA_GET 101L#define MSG_DATA_DELETE 102L // Event messages (200-299)#define MSG_EVENT_CONNECTED 200L#define MSG_EVENT_DISCONNECTED 201L#define MSG_EVENT_ERROR 202L struct generic_message { long mtype; pid_t sender; char payload[128];}; // Helper to categorize messagesconst char* get_category_name(long mtype) { if (mtype >= 200) return "EVENT"; if (mtype >= 100) return "DATA"; return "CONTROL";} // Dedicated handler for control messagesvoid control_handler(int msqid) { printf("[ControlHandler] Starting\n"); struct generic_message msg; while (1) { // Receive control messages one at a time by explicit type // Check each control type for (long type = 1; type <= 99; type++) { if (msgrcv(msqid, &msg, sizeof(msg) - sizeof(long), type, IPC_NOWAIT) > 0) { if (msg.mtype == MSG_CTRL_STOP) { printf("[ControlHandler] Received STOP\n"); return; } printf("[ControlHandler] Type=%ld from PID %d: %s\n", msg.mtype, msg.sender, msg.payload); // Handle PING with PONG if (msg.mtype == MSG_CTRL_PING) { struct generic_message pong; pong.mtype = MSG_CTRL_PONG; pong.sender = getpid(); strcpy(pong.payload, "PONG"); msgsnd(msqid, &pong, sizeof(pong) - sizeof(long), 0); } } } usleep(10000); // 10ms sleep between polls }} // Dedicated handler for data messagesvoid data_handler(int msqid) { printf("[DataHandler] Starting\n"); struct generic_message msg; int count = 0; while (count < 3) { // Handle 3 data messages for (long type = 100; type <= 199; type++) { if (msgrcv(msqid, &msg, sizeof(msg) - sizeof(long), type, IPC_NOWAIT) > 0) { printf("[DataHandler] Type=%ld from PID %d: %s\n", msg.mtype, msg.sender, msg.payload); count++; } } usleep(10000); } printf("[DataHandler] Completed\n");} int main() { int msqid = msgget(IPC_PRIVATE, IPC_CREAT | 0666); if (msqid == -1) { perror("msgget"); return EXIT_FAILURE; } // Fork handlers pid_t ctrl_pid, data_pid; if ((ctrl_pid = fork()) == 0) { control_handler(msqid); exit(0); } if ((data_pid = fork()) == 0) { data_handler(msqid); exit(0); } // Main process sends various messages sleep(1); // Let handlers start struct generic_message msg; msg.sender = getpid(); // Send control messages msg.mtype = MSG_CTRL_PING; strcpy(msg.payload, "Hello?"); msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0); // Send data messages msg.mtype = MSG_DATA_PUT; strcpy(msg.payload, "key=value"); msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0); msg.mtype = MSG_DATA_GET; strcpy(msg.payload, "key"); msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0); msg.mtype = MSG_DATA_DELETE; strcpy(msg.payload, "key"); msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0); // Stop control handler sleep(1); msg.mtype = MSG_CTRL_STOP; msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0); // Wait for children waitpid(ctrl_pid, NULL, 0); waitpid(data_pid, NULL, 0); msgctl(msqid, IPC_RMID, NULL); return EXIT_SUCCESS;}POSIX message queues don't have an mtype field—priority is the only built-in selection mechanism. However, you can implement application-level typing:
Use priority values as pseudo-types:
#define TYPE_COMMAND 10 // Priority 10
#define TYPE_RESPONSE 5 // Priority 5
#define TYPE_EVENT 1 // Priority 1
mq_send(mq, msg, len, TYPE_COMMAND);
Limitation: You lose natural priority semantics, and you can't receive a specific "type"—you always get the highest priority message.
Include a type field in your message structure:
struct posix_typed_message {
uint32_t msg_type; // Application-defined type
uint32_t msg_flags; // Additional metadata
uint8_t data[]; // Variable-length payload
};
Limitation: You must receive messages and check types manually; can't selectively receive.
Create separate queues for each message category:
mq_open("/app_commands", ...);
mq_open("/app_responses", ...);
mq_open("/app_events", ...);
Trade-off: More resource usage, but clean separation.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <mqueue.h>#include <fcntl.h>#include <errno.h> // ================================================// POSIX Message Queue with Application-Level Types// ================================================ #define QUEUE_NAME "/typed_demo"#define MAX_MSG_SIZE 256 // Application-level message typesenum app_msg_type { TYPE_COMMAND = 1, TYPE_RESPONSE = 2, TYPE_EVENT = 3, TYPE_HEARTBEAT = 4,}; // Typed message structurestruct typed_message { uint32_t type; // Application type uint32_t seq_num; // Sequence number uint16_t data_len; // Payload length char data[240]; // Payload}; const char* type_name(uint32_t type) { switch (type) { case TYPE_COMMAND: return "COMMAND"; case TYPE_RESPONSE: return "RESPONSE"; case TYPE_EVENT: return "EVENT"; case TYPE_HEARTBEAT: return "HEARTBEAT"; default: return "UNKNOWN"; }} // Send typed messageint send_typed(mqd_t mq, enum app_msg_type type, const char *data, unsigned int priority) { static uint32_t seq = 0; struct typed_message msg; msg.type = type; msg.seq_num = ++seq; msg.data_len = strlen(data); strncpy(msg.data, data, sizeof(msg.data)); return mq_send(mq, (char*)&msg, sizeof(msg), priority);} // Receive and filter by type// Returns 0 if matching message received, -1 if wrong type (re-queued)int receive_typed(mqd_t mq, enum app_msg_type wanted_type, struct typed_message *msg, int timeout_ms) { unsigned int priority; ssize_t len = mq_receive(mq, (char*)msg, MAX_MSG_SIZE, &priority); if (len == -1) { return -1; } if (msg->type != wanted_type) { // Wrong type - re-queue with same priority printf("[Filter] Got type %s, wanted %s - requeuing\n", type_name(msg->type), type_name(wanted_type)); mq_send(mq, (char*)msg, sizeof(*msg), priority); return -1; // Caller should retry } return 0; // Got the right type} int main() { struct mq_attr attr = { .mq_maxmsg = 10, .mq_msgsize = MAX_MSG_SIZE }; mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr); if (mq == (mqd_t)-1) { perror("mq_open"); return EXIT_FAILURE; } // Send various message types printf("=== Sending Messages ===\n"); send_typed(mq, TYPE_HEARTBEAT, "ping", 0); send_typed(mq, TYPE_COMMAND, "start", 5); send_typed(mq, TYPE_EVENT, "connected", 2); send_typed(mq, TYPE_COMMAND, "stop", 5); printf("\n=== Receiving Only COMMAND Messages ===\n"); struct typed_message msg; int received_commands = 0; // Try to receive only COMMAND types // Note: This is inefficient - just demonstrating the pattern for (int attempts = 0; attempts < 10 && received_commands < 2; attempts++) { if (receive_typed(mq, TYPE_COMMAND, &msg, 0) == 0) { printf("COMMAND #%u: %s\n", msg.seq_num, msg.data); received_commands++; } } mq_close(mq); mq_unlink(QUEUE_NAME); return EXIT_SUCCESS;}The 'receive and re-queue' pattern shown above is inefficient—it can lead to message reordering and wasted CPU cycles. If you need type-based selection, System V message queues or multiple POSIX queues are better architectural choices.
Based on production experience, here are essential guidelines for message type design:
#define MSG_TYPE_X 42L. Never use literal numbers in code.For large systems, implement a type registry—a central header file or configuration that defines all message types, their meanings, and which processes send/receive each type. This becomes the authoritative documentation for your IPC protocol.
We've explored how message types enable structured, routable, and maintainable IPC. Here are the key concepts:
You now understand message types as a protocol design tool—how to use them for routing, priority, and organization. Next, we'll explore priority ordering in depth, examining how to build priority-based message processing systems.