Loading learning content...
In a typical synchronous system, a request enters, processing begins, and the client waits until processing completes. This model has a fundamental throughput limitation: the system can only handle as many concurrent requests as it can actively process.
Consider a video processing service:
If 100 users upload simultaneously, 95 users experience timeouts or failures. The synchronous model breaks under load.
The queue-based alternative:
This page explores queue-based processing architectures for throughput optimization. You'll understand when to use queues, queue types (message queues, task queues, event streams), implementation patterns, backpressure handling, and how queues enable massive throughput while maintaining system stability.
Queues fundamentally transform system architecture by decoupling producers from consumers. This decoupling enables several critical throughput and reliability benefits.
Synchronous vs Queue-Based Architecture:
Synchronous: Queue-Based:
Client Client
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Server │──────────────────────│ Server │──────┐
│ (process │ Process (slow) │ (enqueue)│ │
│ inline) │ │ fast! │ ▼
└────┬─────┘ └──────────┘ ┌───────┐
│ │ Queue │
│ Waits for └───┬───┘
│ completion │
│ (timeout risk) ▼
│ ┌──────────────────────┐
▼ │ Workers │
Response │ (process from queue) │
(slow) │ (scale independently)│
└──────────────────────┘
Key insight: In the queue model, the user doesn't wait for processing to complete. The server's job is just to accept and enqueue—a fast operation. Processing happens asynchronously.
Use queues when: (1) processing takes significant time, (2) processing can be deferred, (3) you need to handle traffic spikes, (4) reliability/retry is important, (5) you need to scale producers and consumers independently. Don't use queues when: (1) immediate response is required, (2) processing is trivially fast, (3) added complexity isn't warranted.
Not all queues are created equal. Different queue systems offer different guarantees, performance characteristics, and use cases.
| System | Type | Delivery | Ordering | Best For |
|---|---|---|---|---|
| RabbitMQ | Message broker | At-least-once | FIFO per queue | Task queues, RPC, routing |
| Apache Kafka | Event log | At-least-once* | Per partition | Event streaming, replay, analytics |
| AWS SQS | Managed queue | At-least-once | Best-effort (FIFO available) | Cloud-native, serverless integration |
| Redis (Lists/Streams) | In-memory | At-most-once | FIFO | Simple queues, low latency |
| Apache Pulsar | Hybrid | Exactly-once capable | Per subscription | Multi-tenant, geo-replication |
| BullMQ | Redis-backed | At-least-once | Priority + FIFO | Node.js job queues |
| Celery | Python-native | At-least-once | Priority + FIFO | Python async tasks |
Producers (the components that enqueue messages) need careful design to maximize throughput without losing reliability.
Pattern 1: Fire-and-Forget
Client Request
│
▼
┌─────────────┐
│ Server │
│ publish() │───────▶ Queue
│ no wait │
└──────┬──────┘
│
▼
Response
(immediate)
Publish without waiting for acknowledgment. Fastest but risks message loss if queue is unavailable.
Pattern 2: Confirmed Publishing
Client Request
│
▼
┌─────────────┐
│ Server │
│ publish() │───────▶ Queue ───────▶ ACK
│ wait for │◀────────────────────────┘
│ ACK │
└──────┬──────┘
│
▼
Response
(after ACK)
Wait for broker acknowledgment before returning success. Reliable but adds latency (network round-trip to broker).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
// High-throughput producer patterns // Pattern 1: Batched publishing for throughputclass BatchedProducer { private buffer: Message[] = []; private readonly batchSize = 100; private readonly flushIntervalMs = 50; private flushTimer: NodeJS.Timer | null = null; async publish(message: Message): Promise<void> { this.buffer.push(message); if (this.buffer.length >= this.batchSize) { await this.flush(); } else if (!this.flushTimer) { this.flushTimer = setTimeout(() => this.flush(), this.flushIntervalMs); } } private async flush(): Promise<void> { if (this.buffer.length === 0) return; const batch = this.buffer; this.buffer = []; if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } // Single network call for entire batch await this.broker.publishBatch(batch); }} // Pattern 2: Transactional outbox for reliability// Guarantees message publishing even if producer crashes async function handleOrder(order: Order, db: Database, queue: Queue) { await db.transaction(async (tx) => { // 1. Write business data await tx.insert('orders', order); // 2. Write message to outbox table (same transaction!) await tx.insert('outbox', { id: uuid(), topic: 'order-created', payload: JSON.stringify(order), created_at: new Date(), published: false, }); // Both succeed or both fail - no message loss! }); // Return immediately - outbox worker will publish} // Separate outbox worker (runs continuously)async function outboxWorker(db: Database, queue: Queue) { while (true) { await db.transaction(async (tx) => { // Lock and fetch unpublished messages const messages = await tx.query(` SELECT * FROM outbox WHERE published = false ORDER BY created_at LIMIT 100 FOR UPDATE SKIP LOCKED `); if (messages.length > 0) { // Publish to queue await queue.publishBatch(messages.map(m => ({ topic: m.topic, payload: m.payload, }))); // Mark as published (or delete) await tx.update('outbox', { published: true }, { id: messages.map(m => m.id) } ); } }); await sleep(100); // Poll interval }} // Pattern 3: Async publishing with local bufferclass AsyncBufferedProducer { private localQueue: Message[] = []; private publishing = false; private readonly maxLocalQueue = 10000; publish(message: Message): boolean { if (this.localQueue.length >= this.maxLocalQueue) { // Backpressure: reject new messages return false; } this.localQueue.push(message); this.scheduleFlush(); return true; } private async scheduleFlush() { if (this.publishing) return; this.publishing = true; while (this.localQueue.length > 0) { const batch = this.localQueue.splice(0, 100); try { await this.broker.publishBatch(batch); } catch (error) { // Push failed messages back to front this.localQueue.unshift(...batch); await sleep(1000); // Backoff before retry } } this.publishing = false; }}Consumer design significantly impacts throughput. The goal is to maximize parallel processing while maintaining reliability.
Consumer Scaling Strategies:
Single Consumer Multiple Consumers Partitioned Consumers
(Low throughput) (Duplicate processing) (Parallel, no duplicates)
Queue Queue Topic (Partitioned)
│ │ ┌─P0─┬─P1─┬─P2─┐
▼ ┌────┴────┐ │ │ │ │
┌─────────┐ ▼ ▼ ▼ ▼ ▼ ▼
│Consumer1│ ┌────────┐ ┌────────┐ ┌───┐ ┌───┐ ┌───┐
└─────────┘ │Consumer│ │Consumer│ │C1 │ │C2 │ │C3 │
│ 1 │ │ 2 │ │P0 │ │P1 │ │P2 │
└────────┘ └────────┘ └───┘ └───┘ └───┘
1 message/time Race for same messages 1 partition per consumer
(need visibility timeout) (No coordination needed)
Kafka Consumer Groups (Optimal for Throughput):
Topic: orders (6 partitions)
│
├── P0 ────▶ Consumer 1
├── P1 ────▶ Consumer 1
├── P2 ────▶ Consumer 2
├── P3 ────▶ Consumer 2
├── P4 ────▶ Consumer 3
└── P5 ────▶ Consumer 3
Group: "order-processing"
3 consumers, 6 partitions → 2 partitions each
Add consumer 4 → rebalance: 1-2 partitions each
Max consumers = partition count (adding more is useless)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
// High-throughput consumer patterns // Pattern 1: Batch consumptionasync function batchConsumer(queue: Queue) { while (true) { // Fetch multiple messages at once const messages = await queue.receive({ maxMessages: 10, // Up to 10 messages per call visibilityTimeout: 30, // 30 seconds to process waitTimeSeconds: 20, // Long polling (efficient) }); if (messages.length === 0) continue; // Process batch in parallel const results = await Promise.allSettled( messages.map(msg => processMessage(msg)) ); // Acknowledge only successful messages const toAck = messages.filter((_, i) => results[i].status === 'fulfilled' ); await queue.deleteBatch(toAck); // Failed messages will reappear after visibility timeout }} // Pattern 2: Prefetch with worker poolclass PrefetchConsumer { private readonly prefetchCount = 100; // Buffer 100 messages private readonly concurrency = 10; // Process 10 at a time private buffer: Message[] = []; private processing = 0; async start() { // Start prefetch loop this.prefetchLoop(); // Start workers for (let i = 0; i < this.concurrency; i++) { this.workerLoop(); } } private async prefetchLoop() { while (true) { // Keep buffer full while (this.buffer.length < this.prefetchCount) { const messages = await this.queue.receive({ maxMessages: 10 }); this.buffer.push(...messages); } await sleep(100); } } private async workerLoop() { while (true) { const message = this.buffer.shift(); if (!message) { await sleep(10); continue; } this.processing++; try { await processMessage(message); await this.queue.delete(message); } catch (error) { // Message will reappear for retry console.error('Processing failed', error); } this.processing--; } }} // Pattern 3: Kafka consumer with parallel partition processingimport { Kafka } from 'kafkajs'; const kafka = new Kafka({ brokers: ['localhost:9092'] });const consumer = kafka.consumer({ groupId: 'order-processing' }); await consumer.connect();await consumer.subscribe({ topic: 'orders', fromBeginning: false }); await consumer.run({ // Parallel processing within partition batches partitionsConsumedConcurrently: 5, // Process 5 partitions in parallel eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => { // Process messages concurrently within batch const chunkSize = 10; for (let i = 0; i < batch.messages.length; i += chunkSize) { if (!isRunning()) break; const chunk = batch.messages.slice(i, i + chunkSize); await Promise.all(chunk.map(async (message) => { await processMessage(JSON.parse(message.value!.toString())); resolveOffset(message.offset); })); await heartbeat(); // Keep consumer alive during long batches } },}); // Pattern 4: Rate-limited consumer (protect downstream services)class RateLimitedConsumer { private readonly rateLimit = 100; // 100 per second private readonly interval = 1000 / this.rateLimit; // 10ms between messages private lastProcessed = 0; async consume(message: Message) { const now = Date.now(); const elapsed = now - this.lastProcessed; if (elapsed < this.interval) { await sleep(this.interval - elapsed); } await processMessage(message); this.lastProcessed = Date.now(); }}Without proper flow control, fast producers can overwhelm slow consumers, leading to unbounded queue growth, memory exhaustion, and system failure. Backpressure is the mechanism by which consumers signal producers to slow down.
Backpressure Manifestations:
No Backpressure: With Backpressure:
Producer (1000/s) Producer (1000/s)
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│ Queue │ ─── grows ───▶ │ Queue │─── signals "full" ───▶ Producer slows
│ (grows) │ unbounded │ (cap) │
└────┬────┘ └────┬────┘
│ │
▼ ▼
Consumer (100/s) Consumer (100/s)
(overwhelmed) (keeps up with throttled input)
Backpressure Strategies:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
// Backpressure implementation patterns // Pattern 1: Queue depth-based throttlingclass ThrottledProducer { private readonly maxQueueDepth = 10000; private readonly baseRateLimit = 1000; // messages/sec async publish(message: Message): Promise<boolean> { const depth = await this.queue.getApproximateDepth(); // Calculate dynamic rate based on queue depth const utilizationRatio = depth / this.maxQueueDepth; if (utilizationRatio >= 1.0) { // Queue full - reject message return false; } if (utilizationRatio > 0.8) { // High pressure - throttle aggressively const throttle = this.baseRateLimit * (1 - utilizationRatio); await this.rateLimiter.acquire(throttle); } await this.queue.publish(message); return true; }} // Pattern 2: Consumer lag-based autoscalingclass AutoScalingConsumerPool { private consumers: Consumer[] = []; private readonly minConsumers = 2; private readonly maxConsumers = 20; private readonly scaleUpThreshold = 1000; // messages lag private readonly scaleDownThreshold = 100; async monitorAndScale() { setInterval(async () => { const lag = await this.getTotalConsumerLag(); if (lag > this.scaleUpThreshold && this.consumers.length < this.maxConsumers) { // Scale up await this.addConsumer(); console.log(`Scaled up to ${this.consumers.length} consumers (lag: ${lag})`); } if (lag < this.scaleDownThreshold && this.consumers.length > this.minConsumers) { // Scale down await this.removeConsumer(); console.log(`Scaled down to ${this.consumers.length} consumers (lag: ${lag})`); } }, 10000); // Check every 10 seconds } private async getTotalConsumerLag(): Promise<number> { // Implementation depends on queue system // Kafka: sum of (latest offset - committed offset) per partition // SQS: ApproximateNumberOfMessages metric const offsets = await this.kafka.admin().fetchTopicOffsets('my-topic'); const committed = await this.kafka.admin().fetchOffsets({ groupId: 'my-group' }); return offsets.reduce((total, partition) => { const commit = committed.find(c => c.partition === partition.partition); return total + (parseInt(partition.offset) - parseInt(commit?.offset || '0')); }, 0); }} // Pattern 3: Circuit breaker with queue bufferclass CircuitBreakerQueue { private state: 'closed' | 'open' | 'half-open' = 'closed'; private failureCount = 0; private readonly failureThreshold = 5; private readonly recoveryTimeout = 30000; private lastFailure = 0; async process(message: Message): Promise<void> { if (this.state === 'open') { if (Date.now() - this.lastFailure > this.recoveryTimeout) { this.state = 'half-open'; } else { // Circuit open - don't process, let message stay in queue throw new Error('Circuit breaker open'); } } try { await this.downstream.send(message); if (this.state === 'half-open') { this.state = 'closed'; this.failureCount = 0; } } catch (error) { this.failureCount++; this.lastFailure = Date.now(); if (this.failureCount >= this.failureThreshold) { this.state = 'open'; console.log('Circuit breaker opened due to downstream failures'); } throw error; // Message returns to queue for retry } }}In high-throughput systems, some messages will inevitably fail processing. Without proper handling, these "poison pills" can block queues, trigger infinite retry loops, or cause cascading failures. Dead Letter Queues (DLQs) provide a safety valve.
Message Flow with DLQ:
Main Queue
│
▼
┌────────────┐
│ Consumer │
└─────┬──────┘
│
┌──────────────┼──────────────┐
│ │ │
Success Retry (1-3) Max Retries
│ │ │
▼ │ ▼
Complete │ ┌─────────────┐
└───────▶│ DLQ │
│ (for later) │
│ investigation│
└─────────────┘
DLQ Use Cases:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
// Dead Letter Queue implementation with retry logic interface ProcessedMessage { originalMessage: Message; attempts: number; lastError: string; firstAttempt: Date; lastAttempt: Date;} class RobustConsumer { private readonly maxRetries = 3; private readonly retryDelays = [1000, 5000, 30000]; // Exponential backoff async processWithRetry(message: Message): Promise<void> { const attempts = this.getAttemptCount(message); try { await this.process(message); await this.acknowledge(message); } catch (error) { if (attempts >= this.maxRetries) { // Max retries exceeded - send to DLQ await this.sendToDLQ(message, error as Error, attempts); await this.acknowledge(message); // Remove from main queue return; } // Schedule retry with backoff const delay = this.retryDelays[attempts] || this.retryDelays[this.retryDelays.length - 1]; await this.scheduleRetry(message, delay); } } private async sendToDLQ(message: Message, error: Error, attempts: number) { const dlqMessage: ProcessedMessage = { originalMessage: message, attempts, lastError: error.message, firstAttempt: new Date(message.attributes?.firstAttempt || Date.now()), lastAttempt: new Date(), }; await this.dlq.publish({ ...dlqMessage, // Add diagnostics stackTrace: error.stack, consumerVersion: process.env.APP_VERSION, consumerId: this.consumerId, }); // Alert ops team await this.alerting.warn('Message sent to DLQ', { messageId: message.id, error: error.message, attempts, }); } private getAttemptCount(message: Message): number { // SQS: ApproximateReceiveCount attribute // Kafka: Custom header // RabbitMQ: x-death header return parseInt(message.attributes?.receiveCount || '1'); }} // DLQ Processor - for manual/automated reprocessingclass DLQProcessor { async reprocessAll(): Promise<{ success: number; failed: number }> { let success = 0, failed = 0; while (true) { const messages = await this.dlq.receive({ maxMessages: 10 }); if (messages.length === 0) break; for (const message of messages) { const original = message.body as ProcessedMessage; try { // Attempt reprocessing await this.mainQueue.publish(original.originalMessage); await this.dlq.delete(message); success++; } catch (error) { console.error('Reprocess failed', error); failed++; // Leave in DLQ for investigation } } } return { success, failed }; } // Selective reprocessing (e.g., after bug fix) async reprocessByFilter(filter: (msg: ProcessedMessage) => boolean): Promise<number> { let reprocessed = 0; // Scan DLQ and selectively requeue await this.dlq.forEach(async (message) => { const original = message.body as ProcessedMessage; if (filter(original)) { await this.mainQueue.publish(original.originalMessage); await this.dlq.delete(message); reprocessed++; } }); return reprocessed; }} // Example: Reprocess all messages that failed due to specific errorawait dlqProcessor.reprocessByFilter(msg => msg.lastError.includes('Database connection timeout'));A growing DLQ is a symptom of upstream problems—bad data, broken consumers, or failing dependencies. Set alerts on DLQ depth and age. A single message sitting for days indicates a forgotten problem. Regular DLQ review should be part of operational practice.
Queue throughput depends on proper configuration. Default settings are often conservative; production systems require tuning.
Producer Tuning:
# Batching
batch.size=65536 # 64KB batches (default 16KB)
linger.ms=10 # Wait 10ms for batch to fill
# Compression
compression.type=lz4 # LZ4 for speed, zstd for ratio
# Throughput
acks=1 # Only leader ack (faster, less durable)
# Use acks=all for critical data
# Memory
buffer.memory=67108864 # 64MB buffer
max.block.ms=60000 # Wait up to 60s when buffer full
Consumer Tuning:
# Batching
fetch.min.bytes=1048576 # Wait for 1MB before returning
fetch.max.wait.ms=500 # Or 500ms, whichever first
max.poll.records=500 # Process 500 records per poll
# Parallelism
partition.assignment.strategy=CooperativeStickyAssignor
# Smooth rebalancing
# Session management
max.poll.interval.ms=300000 # 5 min to process batch
session.timeout.ms=30000 # 30s heartbeat timeout
heartbeat.interval.ms=3000 # Heartbeat every 3s
Broker Tuning:
num.io.threads=8 # I/O operations
num.network.threads=3 # Network handling
num.partitions=12 # Default partitions (>= consumers)
log.segment.bytes=1073741824 # 1GB segments
log.retention.hours=168 # 7 days retention
We've explored queue-based processing as a powerful throughput optimization architecture. Here are the key insights:
What's next:
Queue-based processing handles high-volume workloads on a single system, but eventually, you need to distribute work across multiple machines. The next page explores horizontal scaling—spreading load across many servers to achieve throughput levels impossible for any single machine.
You now understand queue-based processing as a throughput optimization architecture—from fundamental benefits, through queue types and producer/consumer patterns, to backpressure handling and performance tuning. Next, we'll examine horizontal scaling strategies.