Loading learning content...
In the distributed systems we've explored so far—RPC, REST, gRPC—communication is fundamentally synchronous: the sender waits for the receiver to process the request and return a response. This tight coupling creates challenges:
Message queues break this coupling by introducing an intermediary that stores messages until consumers are ready to process them. Producers send messages to a queue and continue without waiting. Consumers retrieve messages at their own pace. The queue provides a buffer that smooths load, enables retry, and allows independent scaling.
By the end of this page, you will understand message queue fundamentals and their role in distributed architectures, the difference between message queues and event streams, common messaging patterns (work queues, pub/sub, request-reply), delivery guarantees and acknowledgment mechanisms, and practical implementation with systems like RabbitMQ and Apache Kafka.
A message queue is an intermediary service that accepts messages from producers, stores them, and delivers them to consumers. It's the distributed systems equivalent of a buffer—smoothing the flow between components that operate at different speeds or availability.
Core Concepts
Why Message Queues?
Message queues solve several distributed system challenges:
| Aspect | Synchronous (RPC) | Asynchronous (Queue) |
|---|---|---|
| Coupling | Tight (caller waits) | Loose (fire-and-forget) |
| Latency | Includes processing | Fire-and-forget is instant |
| Availability | Both must be up | Queue buffers if consumer down |
| Load handling | Direct impact on receiver | Queue absorbs spikes |
| Response | Immediate response | Response via separate channel |
| Ordering | Per-request | Queue guarantees order |
| Complexity | Simpler mental model | More moving parts |
Traditional message queues (RabbitMQ, ActiveMQ) are designed for work distribution—once consumed, a message is removed. Event streams (Kafka, Pulsar) retain messages for a configured period, allowing multiple consumers to read independently and replay historical events. The choice depends on whether you need work distribution or event sourcing.
Message queues support various communication patterns, each suited to different use cases.
Point-to-Point (Work Queue)
The simplest pattern: one producer sends messages, one or more consumers compete to process them. Each message goes to exactly one consumer. Perfect for:
Publish/Subscribe (Fan-out)
One producer sends messages that are delivered to all subscribed consumers. Each consumer gets a copy. Perfect for:
Topic-Based Routing
Messages are published to topics/exchanges with routing keys. Consumers subscribe to patterns. Messages route based on matching rules. Perfect for:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
# Messaging patterns with RabbitMQ using pika libraryimport pikaimport json # =================== POINT-TO-POINT (WORK QUEUE) =================== def work_queue_producer(): """ Work Queue: Multiple workers compete for messages. Each message is delivered to exactly one worker. Used for: Background jobs, task distribution, load balancing. """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Declare a durable queue (survives broker restart) channel.queue_declare(queue='task_queue', durable=True) # Send tasks for i in range(10): task = {'task_id': i, 'action': 'process_image', 'file': f'img_{i}.jpg'} channel.basic_publish( exchange='', # Default exchange, direct to queue routing_key='task_queue', body=json.dumps(task), properties=pika.BasicProperties( delivery_mode=2, # Persistent message content_type='application/json', ) ) print(f"Sent task {i}") connection.close() def work_queue_consumer(): """ Worker: Consumes tasks from the work queue. Multiple workers can run - each task goes to one worker. """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) # Fair dispatch: don't give more than 1 unacked message per worker channel.basic_qos(prefetch_count=1) def callback(ch, method, properties, body): task = json.loads(body) print(f"Processing task {task['task_id']}: {task['action']}") # Simulate work import time time.sleep(2) # Acknowledge after successful processing ch.basic_ack(delivery_tag=method.delivery_tag) print(f"Task {task['task_id']} completed") channel.basic_consume(queue='task_queue', on_message_callback=callback) print("Worker waiting for tasks...") channel.start_consuming() # =================== PUBLISH/SUBSCRIBE (FAN-OUT) =================== def pubsub_publisher(): """ Pub/Sub: Message goes to all subscribed consumers. Uses a fanout exchange that broadcasts to all bound queues. Used for: Event notifications, broadcasting updates. """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Declare a fanout exchange channel.exchange_declare(exchange='events', exchange_type='fanout') # Publish an event (all subscribers receive it) event = { 'type': 'user_registered', 'user_id': '12345', 'email': 'user@example.com', 'timestamp': '2024-01-15T10:30:00Z', } channel.basic_publish( exchange='events', routing_key='', # Ignored for fanout body=json.dumps(event), ) print("Published event") connection.close() def pubsub_subscriber(subscriber_name): """ Subscriber: Each subscriber gets its own queue bound to the exchange. All subscribers receive every published message. """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='events', exchange_type='fanout') # Create exclusive queue for this subscriber result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # Bind queue to exchange channel.queue_bind(exchange='events', queue=queue_name) def callback(ch, method, properties, body): event = json.loads(body) print(f"[{subscriber_name}] Received event: {event['type']}") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print(f"[{subscriber_name}] Waiting for events...") channel.start_consuming() # =================== TOPIC-BASED ROUTING =================== def topic_publisher(): """ Topic Routing: Route messages based on routing key patterns. Routing keys are dot-separated words (e.g., 'order.created.us'). Bindings use patterns: * matches one word, # matches zero or more. """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Declare a topic exchange channel.exchange_declare(exchange='orders', exchange_type='topic') # Publish with specific routing keys events = [ ('order.created.us', {'order_id': '1', 'region': 'us', 'action': 'created'}), ('order.created.eu', {'order_id': '2', 'region': 'eu', 'action': 'created'}), ('order.completed.us', {'order_id': '3', 'region': 'us', 'action': 'completed'}), ('order.cancelled.eu', {'order_id': '4', 'region': 'eu', 'action': 'cancelled'}), ] for routing_key, event in events: channel.basic_publish( exchange='orders', routing_key=routing_key, body=json.dumps(event), ) print(f"Published {routing_key}") connection.close() def topic_subscriber(name, binding_key): """ Subscribe to specific routing key patterns. Examples: - "order.created.*" matches order.created.us, order.created.eu - "order.*.us" matches order.created.us, order.completed.us - "order.#" matches all order events """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # Bind with pattern channel.queue_bind(exchange='orders', queue=queue_name, routing_key=binding_key) def callback(ch, method, properties, body): event = json.loads(body) print(f"[{name}] Received {method.routing_key}: {event}") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) print(f"[{name}] Listening for {binding_key}") channel.start_consuming() # Usage:# topic_subscriber("US Service", "order.*.us") # All US orders# topic_subscriber("Created Handler", "order.created.*") # All created orders# topic_subscriber("All Orders", "order.#") # EverythingQueues can implement RPC: the client sends a request with a correlation ID and reply-to queue. The server processes the request and sends the response to the reply-to queue. The client correlates responses by ID. This provides asynchronous RPC with timeout handling and better resilience than synchronous calls.
Reliable message processing requires careful handling of acknowledgments and delivery guarantees. Without proper acknowledgment, messages can be lost or processed multiple times.
Message Lifecycle
What Can Go Wrong
| Guarantee | Implementation | Trade-offs |
|---|---|---|
| At-most-once | No acks, immediate delete after delivery | Fast but may lose messages |
| At-least-once | Wait for ack, redeliver on timeout/nack | Reliable but may duplicate |
| Exactly-once | Idempotent consumers + deduplication | Complex, requires app support |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
# Acknowledgment patterns in message queues import pikaimport json # =================== AUTOMATIC ACK (AT-MOST-ONCE) =================== def auto_ack_consumer(): """ Automatic acknowledgment: Message is acked immediately on delivery. Risk: If consumer crashes during processing, message is lost. Use for: Non-critical messages, logging, metrics. """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() def callback(ch, method, properties, body): # Message already acked - if we crash here, it's lost process_message(body) channel.basic_consume( queue='my_queue', on_message_callback=callback, auto_ack=True, # Automatic acknowledgment ) channel.start_consuming() # =================== MANUAL ACK (AT-LEAST-ONCE) =================== def manual_ack_consumer(): """ Manual acknowledgment: Consumer explicitly acks after processing. If consumer crashes before ack, message is redelivered. Consumer must handle potential duplicates! """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Prefetch: Only deliver N unacked messages at a time # Prevents fast producer from overwhelming slow consumer channel.basic_qos(prefetch_count=10) def callback(ch, method, properties, body): try: # Process message result = process_message(body) # Acknowledge successful processing ch.basic_ack(delivery_tag=method.delivery_tag) except TransientError: # Temporary failure - requeue for retry ch.basic_nack( delivery_tag=method.delivery_tag, requeue=True, # Put back in queue ) except PermanentError: # Permanent failure - reject without requeue ch.basic_nack( delivery_tag=method.delivery_tag, requeue=False, # Goes to dead letter queue if configured ) channel.basic_consume( queue='my_queue', on_message_callback=callback, auto_ack=False, # Manual acknowledgment ) channel.start_consuming() # =================== BATCH ACKNOWLEDGMENT =================== def batch_ack_consumer(): """ Batch acknowledgment: Ack multiple messages at once. More efficient but if we crash, all unacked messages redeliver. """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.basic_qos(prefetch_count=100) messages_processed = 0 last_delivery_tag = None def callback(ch, method, properties, body): nonlocal messages_processed, last_delivery_tag process_message(body) last_delivery_tag = method.delivery_tag messages_processed += 1 # Ack every 100 messages if messages_processed >= 100: ch.basic_ack( delivery_tag=last_delivery_tag, multiple=True, # Acks all messages up to this tag ) messages_processed = 0 channel.basic_consume( queue='my_queue', on_message_callback=callback, auto_ack=False, ) channel.start_consuming() # =================== IDEMPOTENT PROCESSING (EXACTLY-ONCE) =================== class IdempotentProcessor: """ Exactly-once semantics through idempotent processing. Maintain a record of processed message IDs. Before processing, check if already done. After processing, store the ID. """ def __init__(self, redis_client): self.redis = redis_client self.processed_key = "processed_messages" self.ttl = 24 * 60 * 60 # Keep IDs for 24 hours def process_message(self, channel, method, properties, body): # Extract unique message ID message_id = properties.message_id if not message_id: # If no ID, hash the body import hashlib message_id = hashlib.sha256(body).hexdigest() # Check if already processed if self.redis.sismember(self.processed_key, message_id): print(f"Duplicate message {message_id}, skipping") channel.basic_ack(delivery_tag=method.delivery_tag) return try: # Process the message result = self.do_process(body) # Record successful processing self.redis.sadd(self.processed_key, message_id) # Acknowledge channel.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: # Don't record - will retry on redelivery channel.basic_nack( delivery_tag=method.delivery_tag, requeue=True, ) def do_process(self, body): # Actual processing logic pass # =================== DEAD LETTER QUEUES =================== def setup_dead_letter_queue(): """ Dead Letter Queue (DLQ): Catch messages that can't be processed. Messages go to DLQ when: - Rejected with requeue=False - TTL expires - Queue length limit exceeded """ connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # Declare the dead letter exchange and queue channel.exchange_declare(exchange='dlx', exchange_type='direct') channel.queue_declare(queue='dead_letters', durable=True) channel.queue_bind(queue='dead_letters', exchange='dlx', routing_key='failed') # Declare main queue with DLX configuration channel.queue_declare( queue='main_queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'failed', 'x-message-ttl': 60000, # 60 second TTL } ) connection.close()By default, when a producer publishes a message, it doesn't know if the broker received it. Enable publisher confirms to get acknowledgment from the broker when messages are persisted. Without this, messages can be lost if the broker crashes before persistence completes.
While traditional message queues delete messages after consumption, Apache Kafka takes a different approach: it's a distributed event streaming platform that retains messages in a persistent, ordered log.
Kafka vs. Traditional Queues
Kafka Core Concepts
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
# Kafka producer and consumer with kafka-pythonfrom kafka import KafkaProducer, KafkaConsumerfrom kafka.admin import KafkaAdminClient, NewTopicimport json # =================== PRODUCER =================== def create_producer(): """ Create a Kafka producer with common configurations. """ return KafkaProducer( bootstrap_servers=['localhost:9092'], # Serialization key_serializer=lambda k: k.encode('utf-8') if k else None, value_serializer=lambda v: json.dumps(v).encode('utf-8'), # Reliability acks='all', # Wait for all replicas to acknowledge retries=3, # Retry on failure retry_backoff_ms=100, # Performance linger_ms=5, # Batch messages for 5ms batch_size=16384, # 16KB batch size compression_type='gzip', ) def produce_events(): """ Produce events to Kafka topic. """ producer = create_producer() for i in range(100): # Key determines partition (same key -> same partition -> ordering) key = f"user-{i % 10}" event = { 'event_type': 'page_view', 'user_id': key, 'page': f'/product/{i}', 'timestamp': '2024-01-15T10:30:00Z', } # Send (async by default) future = producer.send( topic='page_views', key=key, value=event, ) # Optional: Wait for acknowledgment # record_metadata = future.get(timeout=10) # print(f"Sent to partition {record_metadata.partition}, offset {record_metadata.offset}") # Flush to ensure all messages are sent producer.flush() producer.close() # =================== CONSUMER =================== def consume_events(): """ Consume events from Kafka topic. Consumer groups enable parallel consumption: - Each partition is assigned to exactly one consumer in the group - Multiple groups can independently consume the same topic """ consumer = KafkaConsumer( 'page_views', bootstrap_servers=['localhost:9092'], # Consumer group group_id='analytics-group', # Deserialization key_deserializer=lambda k: k.decode('utf-8') if k else None, value_deserializer=lambda v: json.loads(v.decode('utf-8')), # Start position for new group auto_offset_reset='earliest', # or 'latest' # Commit behavior enable_auto_commit=True, auto_commit_interval_ms=5000, # Commit every 5 seconds ) print(f"Assigned partitions: {consumer.assignment()}") for message in consumer: print(f"Partition {message.partition}, Offset {message.offset}") print(f"Key: {message.key}") print(f"Value: {message.value}") # Process the event process_page_view(message.value) # =================== MANUAL OFFSET COMMIT =================== def consume_with_manual_commit(): """ Manual offset commit for exactly-once processing. Commit after processing to avoid data loss. Commit before processing to avoid duplicates. Most apps commit after for at-least-once semantics. """ consumer = KafkaConsumer( 'page_views', bootstrap_servers=['localhost:9092'], group_id='analytics-group', value_deserializer=lambda v: json.loads(v.decode('utf-8')), enable_auto_commit=False, # Disable auto-commit ) try: for message in consumer: try: # Process message process_page_view(message.value) # Commit after successful processing consumer.commit() except ProcessingError: # Don't commit - message will be redelivered on restart print(f"Failed to process {message.offset}") # Could implement retry logic here finally: consumer.close() # =================== SEEKING AND REPLAY =================== def replay_from_beginning(): """ Kafka retains messages - you can replay history! Use cases: - Rebuilding derived data after bug fix - Onboarding new services - Reprocessing with new logic """ consumer = KafkaConsumer( 'page_views', bootstrap_servers=['localhost:9092'], group_id='replay-group', value_deserializer=lambda v: json.loads(v.decode('utf-8')), enable_auto_commit=False, ) # Seek to beginning of all assigned partitions consumer.poll() # Trigger assignment consumer.seek_to_beginning() # Or seek to specific offset # from kafka import TopicPartition # tp = TopicPartition('page_views', 0) # consumer.seek(tp, 1000) # Start from offset 1000 for message in consumer: reprocess_event(message.value) consumer.commit() # =================== PARTITIONING STRATEGY =================== class OrderPartitioner: """ Custom partitioner to ensure related events go to same partition. Default: hash(key) % num_partitions Custom: Whatever logic you need """ def __call__(self, key, all_partitions, available_partitions): # Route all events for an order to the same partition # Ensures ordering for that order if key: order_id = key.split('-')[0] # Extract order ID return hash(order_id) % len(available_partitions) return 0 # Usagedef produce_with_partitioner(): producer = KafkaProducer( bootstrap_servers=['localhost:9092'], key_serializer=lambda k: k.encode('utf-8'), value_serializer=lambda v: json.dumps(v).encode('utf-8'), partitioner=OrderPartitioner(), ) # All events for order-123 go to same partition producer.send('orders', key='order-123-created', value={...}) producer.send('orders', key='order-123-paid', value={...}) producer.send('orders', key='order-123-shipped', value={...})The number of partitions limits consumer parallelism—you can't have more consumers than partitions in a group. Choose partition count based on expected throughput: each partition can handle ~10-100 MB/s. Too few limits parallelism; too many increases overhead. Common starting point: 3-12 partitions per topic, scaled on demand.
Several message queue implementations exist, each optimized for different use cases.
| System | Type | Best For | Trade-offs |
|---|---|---|---|
| RabbitMQ | Traditional queue | Complex routing, work queues | Lower throughput than Kafka |
| Apache Kafka | Event stream | High-throughput streaming, event sourcing | More ops complexity |
| Amazon SQS | Managed queue | Simple queuing without ops burden | Limited features, AWS-only |
| Apache Pulsar | Unified | Both queuing and streaming patterns | Newer, smaller community |
| Redis Streams | In-memory stream | Low-latency streaming | Limited durability, memory-bound |
| NATS | Lightweight | Cloud-native, simple messaging | Less feature-rich |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
# AWS SQS - Managed message queueimport boto3import json sqs = boto3.client('sqs')queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue' # =================== PRODUCER =================== def send_message(): response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps({'order_id': '123', 'action': 'process'}), # Message attributes for filtering MessageAttributes={ 'Priority': { 'DataType': 'Number', 'StringValue': '1' } }, # Delay delivery by 10 seconds DelaySeconds=10, ) print(f"Message ID: {response['MessageId']}") def send_batch(): """Send up to 10 messages in one API call""" entries = [ { 'Id': str(i), 'MessageBody': json.dumps({'order_id': str(i)}), } for i in range(10) ] response = sqs.send_message_batch( QueueUrl=queue_url, Entries=entries, ) print(f"Successful: {len(response.get('Successful', []))}") print(f"Failed: {len(response.get('Failed', []))}") # =================== CONSUMER =================== def receive_messages(): """ Long polling: Wait up to 20s for messages. More efficient than short polling (immediate return). """ while True: response = sqs.receive_message( QueueUrl=queue_url, MaxNumberOfMessages=10, # Up to 10 per call WaitTimeSeconds=20, # Long polling VisibilityTimeout=30, # Time to process before redelivery MessageAttributeNames=['All'], ) messages = response.get('Messages', []) for message in messages: try: body = json.loads(message['Body']) process_order(body) # Delete after successful processing sqs.delete_message( QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'], ) except Exception as e: print(f"Failed to process: {e}") # Message becomes visible again after VisibilityTimeout # =================== FIFO QUEUE =================== def fifo_example(): """ FIFO queues guarantee ordering and exactly-once processing. Queue name must end with .fifo """ fifo_queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/orders.fifo' # Send with deduplication and grouping sqs.send_message( QueueUrl=fifo_queue_url, MessageBody=json.dumps({'order_id': '123', 'action': 'create'}), # Required for FIFO: prevents duplicate sends MessageDeduplicationId='order-123-create-attempt-1', # Group for ordering: messages in same group are ordered MessageGroupId='order-123', # All events for order-123 delivered in order ) # =================== DEAD LETTER QUEUE =================== def setup_dlq(): """ Configure dead letter queue to catch failed messages. """ # Create DLQ dlq_response = sqs.create_queue( QueueName='my-queue-dlq', Attributes={ 'MessageRetentionPeriod': str(14 * 24 * 60 * 60), # 14 days } ) dlq_arn = sqs.get_queue_attributes( QueueUrl=dlq_response['QueueUrl'], AttributeNames=['QueueArn'] )['Attributes']['QueueArn'] # Configure main queue to use DLQ sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'RedrivePolicy': json.dumps({ 'deadLetterTargetArn': dlq_arn, 'maxReceiveCount': 5, # After 5 failed attempts, send to DLQ }) } )Managed services (SQS, Cloud Pub/Sub, Azure Service Bus) eliminate operational burden but limit customization and may increase costs at scale. Self-hosted (Kafka, RabbitMQ) requires expertise but provides full control. Many organizations start with managed services and migrate to self-hosted as expertise grows.
Effective use of message queues requires thoughtful design. Here are proven patterns and practices.
Message Design
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
# Message design best practices from dataclasses import dataclassfrom typing import Optional, Dict, Anyimport uuidfrom datetime import datetime @dataclassclass Message: """ Well-designed message structure. Include metadata for debugging, routing, and processing. """ # Unique identifier for idempotency message_id: str # Trace through distributed system correlation_id: str # Message type/schema for routing and deserialization type: str # Schema version for backward compatibility version: str # When the event occurred (not when sent) timestamp: str # Source service identifier source: str # Actual payload payload: Dict[str, Any] # Optional: routing hints partition_key: Optional[str] = None @classmethod def create(cls, type: str, payload: Dict[str, Any], correlation_id: Optional[str] = None, partition_key: Optional[str] = None): return cls( message_id=str(uuid.uuid4()), correlation_id=correlation_id or str(uuid.uuid4()), type=type, version="1.0", timestamp=datetime.utcnow().isoformat() + "Z", source="order-service", payload=payload, partition_key=partition_key, ) # =================== OUTBOX PATTERN =================== class OutboxPattern: """ Outbox Pattern: Ensure exactly-once publishing with database. Problem: What if database commits but queue publish fails? Or queue publishes but database rolls back? Solution: Write messages to database in same transaction as business data. Background process reads outbox and publishes to queue. """ def __init__(self, db, queue): self.db = db self.queue = queue def process_order(self, order_data): with self.db.transaction() as tx: # Create order in database order = tx.execute( "INSERT INTO orders (...) VALUES (...) RETURNING id", order_data ) # Write to outbox in SAME transaction message = Message.create( type="order.created", payload={'order_id': order.id, **order_data}, partition_key=str(order.id), ) tx.execute( "INSERT INTO outbox (id, message, created_at) VALUES (?, ?, ?)", (message.message_id, json.dumps(asdict(message)), datetime.utcnow()) ) # Both succeed or both fail def publish_outbox(self): """Background job: Publish pending outbox messages""" while True: with self.db.transaction() as tx: # Lock and get unpublished messages messages = tx.execute( "SELECT id, message FROM outbox WHERE published = false " "ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED" ) for row in messages: try: self.queue.publish(json.loads(row.message)) tx.execute( "UPDATE outbox SET published = true WHERE id = ?", (row.id,) ) except QueueError: # Will retry on next run pass time.sleep(1) # =================== SAGA PATTERN =================== class SagaOrchestrator: """ Saga Pattern: Distributed transactions across services. Each step has a corresponding compensation action. On failure, execute compensations in reverse order. """ def __init__(self, queue): self.queue = queue def create_order_saga(self, order_data): saga_id = str(uuid.uuid4()) steps = [ {'action': 'reserve_inventory', 'compensate': 'release_inventory'}, {'action': 'charge_payment', 'compensate': 'refund_payment'}, {'action': 'create_shipment', 'compensate': 'cancel_shipment'}, ] for i, step in enumerate(steps): # Send command self.queue.publish({ 'saga_id': saga_id, 'step': i, 'action': step['action'], 'order': order_data, }) # Wait for response (simplified - real impl uses callbacks) response = self.wait_for_response(saga_id, step['action']) if not response['success']: # Compensate previous steps in reverse for j in range(i - 1, -1, -1): self.queue.publish({ 'saga_id': saga_id, 'action': steps[j]['compensate'], 'order': order_data, }) return False return True # =================== COMPETING CONSUMERS =================== def scale_consumers(topic: str, target_lag: int): """ Auto-scale consumers based on queue lag. Monitor: messages_in_queue - messages_processed If lag > threshold, add consumers If lag < threshold, remove consumers """ current_lag = get_consumer_lag(topic) current_consumers = get_consumer_count(topic) if current_lag > target_lag * 2: # Scale up add_consumers(topic, min(current_consumers, 10)) elif current_lag < target_lag * 0.5 and current_consumers > 1: # Scale down remove_consumers(topic, 1)Global ordering is expensive and limits throughput. Most systems provide per-partition ordering only. If order matters for related messages (all events for one order), use a consistent partition key (order_id). Messages with different keys may arrive out of order relative to each other.
Message queues are fundamental building blocks for modern distributed systems, enabling asynchronous, decoupled communication that improves reliability and scalability.
Module Complete:
You've now explored the full spectrum of distributed system communication—from low-level message passing and sockets through RPC abstractions to high-level message queues. These foundations underpin every distributed system, from microservices to global-scale platforms.
You now understand message queues, the asynchronous communication backbone of distributed systems. From delivery semantics and messaging patterns to Kafka event streaming and production best practices, you have the knowledge to design reliable, scalable distributed architectures.