Loading content...
Traditional REST APIs follow a strict request-response model: client sends one request, server sends one response. This works for many use cases, but falls short when you need to stream large datasets, push real-time updates, upload chunked data, or maintain persistent bidirectional communication.
gRPC, built on HTTP/2's streaming foundation, offers four distinct communication patterns that address these scenarios natively. These aren't workarounds or extensions—they're first-class citizens of the gRPC protocol, with full type safety, flow control, and error handling.
This flexibility is what makes gRPC the protocol of choice for microservices, real-time systems, IoT pipelines, and machine learning inference at companies like Google, Netflix, and Square.
By the end of this page, you will master all four gRPC streaming patterns: unary RPC, server streaming, client streaming, and bidirectional streaming. For each pattern, you'll understand the wire format, implementation details, flow control mechanics, error handling, and real-world use cases. You'll know when to use each pattern and how to combine them effectively.
Before diving deep, let's understand the four patterns and their essential characteristics:
| Pattern | Proto Syntax | Request | Response | Primary Use Cases |
|---|---|---|---|---|
| Unary | rpc Method(Request) returns (Response) | Single message | Single message | CRUD operations, queries, RPC calls |
| Server Streaming | rpc Method(Request) returns (stream Response) | Single message | Multiple messages | Event feeds, large result sets, subscriptions |
| Client Streaming | rpc Method(stream Request) returns (Response) | Multiple messages | Single message | File upload, aggregation, batch processing |
| Bidirectional | rpc Method(stream Request) returns (stream Response) | Multiple messages | Multiple messages | Chat, gaming, collaborative editing |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
syntax = "proto3"; package com.example.streaming.v1; // Complete service demonstrating all four patternsservice DataService { // Pattern 1: Unary RPC // Client sends one request, receives one response rpc GetItem(GetItemRequest) returns (GetItemResponse); // Pattern 2: Server Streaming RPC // Client sends one request, receives stream of responses rpc ListItems(ListItemsRequest) returns (stream Item); // Pattern 3: Client Streaming RPC // Client sends stream of requests, receives one response rpc UploadItems(stream Item) returns (UploadSummary); // Pattern 4: Bidirectional Streaming RPC // Both sides send streams independently rpc ProcessItems(stream ItemOperation) returns (stream OperationResult);} message GetItemRequest { string item_id = 1;} message GetItemResponse { Item item = 1;} message ListItemsRequest { string filter = 1; int32 page_size = 2;} message Item { string id = 1; string name = 2; bytes data = 3; int64 timestamp = 4;} message UploadSummary { int32 items_received = 1; int32 items_processed = 2; int32 items_failed = 3; repeated string error_ids = 4;} message ItemOperation { string item_id = 1; OperationType type = 2; bytes payload = 3;} message OperationResult { string item_id = 1; bool success = 2; string error_message = 3;} enum OperationType { OPERATION_TYPE_UNSPECIFIED = 0; OPERATION_TYPE_CREATE = 1; OPERATION_TYPE_UPDATE = 2; OPERATION_TYPE_DELETE = 3;}Key Insight: Streams Are HTTP/2 Features
The stream keyword in proto files maps directly to HTTP/2 capabilities:
All patterns use a single HTTP/2 stream; the difference is how many messages flow in each direction.
Unary RPC is the most familiar pattern—equivalent to a traditional HTTP request-response. Despite being "simple," unary RPCs in gRPC have important characteristics that distinguish them from REST:
Wire Format:
:method: POST, :path: /service/method:status: 200grpc-status and grpc-message123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
// Unary RPC: Client Implementation import { DataServiceClient } from './generated/data_service';import { credentials, Metadata, CallOptions } from '@grpc/grpc-js'; const client = new DataServiceClient( 'localhost:50051', credentials.createInsecure()); // Simple unary callasync function getItem(itemId: string): Promise<Item> { return new Promise((resolve, reject) => { client.getItem( { itemId }, (error, response) => { if (error) { reject(error); } else { resolve(response.item); } } ); });} // Unary call with metadata and deadlineasync function getItemWithOptions(itemId: string): Promise<Item> { const metadata = new Metadata(); metadata.set('authorization', 'Bearer <token>'); metadata.set('x-request-id', crypto.randomUUID()); const deadline = new Date(); deadline.setSeconds(deadline.getSeconds() + 5); // 5 second timeout const options: CallOptions = { deadline, }; return new Promise((resolve, reject) => { const call = client.getItem( { itemId }, metadata, options, (error, response) => { if (error) { console.error(`RPC failed: ${error.code} - ${error.message}`); reject(error); } else { resolve(response.item); } } ); // Access response metadata call.on('metadata', (metadata: Metadata) => { console.log('Response headers:', metadata.getMap()); }); call.on('status', (status) => { console.log('Trailing metadata:', status.metadata.getMap()); }); });} // Unary call with cancellationfunction getItemCancellable(itemId: string): { promise: Promise<Item>; cancel: () => void } { let call: any; const promise = new Promise<Item>((resolve, reject) => { call = client.getItem( { itemId }, (error, response) => { if (error) { reject(error); } else { resolve(response.item); } } ); }); return { promise, cancel: () => call.cancel(), };}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
// Unary RPC: Server Implementation import { sendUnaryData, ServerUnaryCall, status } from '@grpc/grpc-js';import { GetItemRequest, GetItemResponse, Item } from './generated/data_service'; class DataServiceImpl { async getItem( call: ServerUnaryCall<GetItemRequest, GetItemResponse>, callback: sendUnaryData<GetItemResponse> ): Promise<void> { // Extract request const { itemId } = call.request; // Access client metadata const metadata = call.metadata; const authHeader = metadata.get('authorization')[0]; const requestId = metadata.get('x-request-id')[0]; console.log(`Request ${requestId} for item ${itemId}`); // Check if client already cancelled if (call.cancelled) { console.log(`Request ${requestId} was cancelled`); return; } try { // Business logic const item = await this.fetchItemFromDatabase(itemId); if (!item) { // Return gRPC NOT_FOUND error callback({ code: status.NOT_FOUND, message: `Item ${itemId} not found`, details: 'Item does not exist or has been deleted', }, null); return; } // Set response metadata const responseMetadata = new Metadata(); responseMetadata.set('x-cache-status', 'HIT'); call.sendMetadata(responseMetadata); // Return successful response callback(null, { item }); } catch (error) { // Return gRPC INTERNAL error callback({ code: status.INTERNAL, message: 'Internal server error', details: error.message, }, null); } } private async fetchItemFromDatabase(itemId: string): Promise<Item | null> { // Database lookup... return null; }}Unary gRPC has advantages over REST: (1) Binary serialization is 5-10x faster, (2) Strong typing catches errors at compile time, (3) Trailing metadata (status codes) separate from headers, (4) Native deadline/cancellation propagation, (5) Bidirectional metadata exchange. Use unary for any operation that fits request-response semantics.
Server streaming enables a pattern where the client sends a single request and the server responds with a stream of messages. This is ideal for:
Wire Format:
The server sends multiple DATA frames, each containing one gRPC message (5-byte header + Protobuf). Flow control applies—the client's receive window determines how fast the server can send.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
// Server Streaming: Client Implementation import { DataServiceClient } from './generated/data_service';import { Metadata, ClientReadableStream } from '@grpc/grpc-js'; // Basic server streaming consumptionasync function listItems(filter: string): Promise<Item[]> { const items: Item[] = []; return new Promise((resolve, reject) => { const stream: ClientReadableStream<Item> = client.listItems({ filter, pageSize: 100, }); stream.on('data', (item: Item) => { items.push(item); console.log(`Received item ${item.id}`); }); stream.on('end', () => { console.log(`Stream ended, received ${items.length} items`); resolve(items); }); stream.on('error', (error) => { console.error(`Stream error: ${error.message}`); reject(error); }); stream.on('status', (status) => { console.log(`gRPC status: ${status.code}`); }); });} // Server streaming with async iteration (modern approach)async function* listItemsGenerator(filter: string): AsyncGenerator<Item> { const stream = client.listItems({ filter, pageSize: 100 }); for await (const item of stream) { yield item; }} // Usage with async iterationasync function processItems(filter: string): Promise<void> { for await (const item of listItemsGenerator(filter)) { await processItem(item); // Backpressure handled automatically }} // Server streaming with cancellationfunction listItemsWithTimeout(filter: string, timeoutMs: number): { generator: AsyncGenerator<Item>; cancel: () => void;} { const stream = client.listItems({ filter, pageSize: 100 }); let cancelled = false; const timeoutId = setTimeout(() => { stream.cancel(); cancelled = true; }, timeoutMs); async function* generator(): AsyncGenerator<Item> { try { for await (const item of stream) { clearTimeout(timeoutId); // Reset on each item yield item; } } catch (error) { if (cancelled && error.code === status.CANCELLED) { console.log('Stream cancelled due to timeout'); return; } throw error; } } return { generator: generator(), cancel: () => { stream.cancel(); cancelled = true; }, };}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
// Server Streaming: Server Implementation import { ServerWritableStream, status } from '@grpc/grpc-js';import { ListItemsRequest, Item } from './generated/data_service'; class DataServiceImpl { async listItems( call: ServerWritableStream<ListItemsRequest, Item> ): Promise<void> { const { filter, pageSize } = call.request; console.log(`Streaming items with filter: ${filter}`); try { // Database cursor for efficient large result sets const cursor = await this.createDatabaseCursor(filter); let count = 0; while (true) { // Check for client cancellation if (call.cancelled) { console.log('Client cancelled stream'); cursor.close(); return; } // Fetch next batch const batch = await cursor.next(pageSize); if (batch.length === 0) { break; // No more items } for (const item of batch) { // Write each item to stream // This respects flow control automatically const canContinue = call.write(item); if (!canContinue) { // Client's buffer is full, wait for drain await new Promise<void>(resolve => { call.once('drain', resolve); }); } count++; } } console.log(`Streamed ${count} items`); // End stream successfully call.end(); } catch (error) { // Send error and end stream call.destroy({ code: status.INTERNAL, message: `Stream failed: ${error.message}`, }); } } // Streaming with periodic heartbeats (for long-running streams) async streamWithHeartbeat( call: ServerWritableStream<SubscribeRequest, Event> ): Promise<void> { const heartbeatInterval = setInterval(() => { if (!call.cancelled) { call.write({ type: 'HEARTBEAT', timestamp: Date.now() }); } }, 30000); // Every 30 seconds try { // Subscribe to event source const subscription = await this.eventSource.subscribe( call.request.topic, (event) => { if (!call.cancelled) { call.write(event); } } ); // Wait for cancellation await new Promise<void>(resolve => { call.on('cancelled', resolve); call.on('close', resolve); }); subscription.unsubscribe(); } finally { clearInterval(heartbeatInterval); call.end(); } }}Common mistakes: (1) Not checking call.cancelled, leading to wasted work, (2) Ignoring backpressure from call.write() return value, (3) Not handling client disconnects gracefully, (4) Streams that never end (use heartbeats and explicit completion). Always implement proper cleanup.
Client streaming inverts the server streaming pattern: the client sends multiple messages, and the server responds with a single aggregated result. Use cases include:
Wire Format:
Client sends multiple DATA frames (each with one message), then ends the stream. Server sends HEADERS + DATA + trailing HEADERS as response.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
// Client Streaming: Client Implementation import { DataServiceClient } from './generated/data_service';import { ClientWritableStream } from '@grpc/grpc-js';import { Item, UploadSummary } from './generated/data_service';import * as fs from 'fs'; // Basic client streaming: upload itemsasync function uploadItems(items: Item[]): Promise<UploadSummary> { return new Promise((resolve, reject) => { const call: ClientWritableStream<Item> = client.uploadItems( (error, response) => { if (error) { reject(error); } else { resolve(response); } } ); // Write all items for (const item of items) { call.write(item); } // Signal end of stream call.end(); });} // Client streaming with backpressure handlingasync function uploadItemsWithBackpressure(items: Item[]): Promise<UploadSummary> { return new Promise((resolve, reject) => { const call: ClientWritableStream<Item> = client.uploadItems( (error, response) => { if (error) { reject(error); } else { resolve(response); } } ); let index = 0; function writeNext(): void { let ok = true; while (ok && index < items.length) { const item = items[index++]; if (index === items.length) { // Last item call.end(item); } else { // More items to come ok = call.write(item); } } if (index < items.length) { // Buffer was full, wait for drain call.once('drain', writeNext); } } writeNext(); });} // File upload with client streamingasync function uploadFile(filePath: string): Promise<UploadSummary> { const CHUNK_SIZE = 64 * 1024; // 64 KB chunks return new Promise((resolve, reject) => { const call = client.uploadItems((error, response) => { if (error) reject(error); else resolve(response); }); const readStream = fs.createReadStream(filePath, { highWaterMark: CHUNK_SIZE, }); readStream.on('data', (chunk: Buffer) => { const item: Item = { id: filePath, name: 'file_chunk', data: chunk, timestamp: Date.now(), }; if (!call.write(item)) { // Pause file reading until gRPC buffer drains readStream.pause(); call.once('drain', () => readStream.resume()); } }); readStream.on('end', () => { call.end(); }); readStream.on('error', (error) => { call.destroy(error); reject(error); }); });} // Async generator approachasync function uploadFromGenerator( items: AsyncGenerator<Item>): Promise<UploadSummary> { return new Promise(async (resolve, reject) => { const call = client.uploadItems((error, response) => { if (error) reject(error); else resolve(response); }); try { for await (const item of items) { // Wrap write in promise for backpressure const canContinue = call.write(item); if (!canContinue) { await new Promise<void>(r => call.once('drain', r)); } } call.end(); } catch (error) { call.destroy(error as Error); reject(error); } });}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
// Client Streaming: Server Implementation import { ServerReadableStream, sendUnaryData, status } from '@grpc/grpc-js';import { Item, UploadSummary } from './generated/data_service'; class DataServiceImpl { async uploadItems( call: ServerReadableStream<Item, UploadSummary>, callback: sendUnaryData<UploadSummary> ): Promise<void> { const results = { itemsReceived: 0, itemsProcessed: 0, itemsFailed: 0, errorIds: [] as string[], }; call.on('data', async (item: Item) => { results.itemsReceived++; try { await this.processItem(item); results.itemsProcessed++; } catch (error) { results.itemsFailed++; results.errorIds.push(item.id); console.error(`Failed to process ${item.id}: ${error.message}`); } }); call.on('end', () => { console.log(`Upload complete: ${results.itemsReceived} received`); callback(null, { itemsReceived: results.itemsReceived, itemsProcessed: results.itemsProcessed, itemsFailed: results.itemsFailed, errorIds: results.errorIds, }); }); call.on('error', (error) => { console.error(`Stream error: ${error.message}`); callback({ code: status.INTERNAL, message: error.message, }, null); }); call.on('cancelled', () => { console.log('Client cancelled upload'); // Clean up any partial state }); } // Alternative: async iterator approach async uploadItemsIterator( call: ServerReadableStream<Item, UploadSummary>, callback: sendUnaryData<UploadSummary> ): Promise<void> { let received = 0; let processed = 0; let failed = 0; const errorIds: string[] = []; try { for await (const item of call) { received++; try { await this.processItem(item); processed++; } catch (error) { failed++; errorIds.push(item.id); } } callback(null, { itemsReceived: received, itemsProcessed: processed, itemsFailed: failed, errorIds, }); } catch (error) { callback({ code: status.INTERNAL, message: error.message, }, null); } } private async processItem(item: Item): Promise<void> { // Validate and store item await this.database.insert(item); }}For large uploads: (1) Use fixed-size chunks (32KB-256KB optimal), (2) Include sequence numbers for reassembly, (3) Implement resumable uploads with offset tracking, (4) Send checksums for integrity verification. Client streaming is ideal when the server needs all data before responding.
Bidirectional streaming is gRPC's most powerful pattern: both client and server send independent streams of messages on the same connection. Unlike request-response, either side can send at any time—true full-duplex communication.
Key Characteristics:
Use Cases:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
// Bidirectional Streaming: Complete Implementation // Proto definition:// service ChatService {// rpc Chat(stream ChatMessage) returns (stream ChatMessage);// } // ============ CLIENT SIDE ============ import { ClientDuplexStream } from '@grpc/grpc-js';import { ChatMessage } from './generated/chat_service'; class ChatClient { private stream: ClientDuplexStream<ChatMessage, ChatMessage> | null = null; private messageHandlers: ((msg: ChatMessage) => void)[] = []; connect(userId: string): void { const metadata = new Metadata(); metadata.set('user-id', userId); this.stream = client.chat(metadata); // Handle incoming messages (server → client) this.stream.on('data', (message: ChatMessage) => { this.messageHandlers.forEach(handler => handler(message)); }); this.stream.on('end', () => { console.log('Server closed the stream'); this.stream = null; }); this.stream.on('error', (error) => { console.error(`Stream error: ${error.message}`); this.stream = null; }); } // Send message (client → server) send(content: string, recipientId?: string): void { if (!this.stream) { throw new Error('Not connected'); } const message: ChatMessage = { id: crypto.randomUUID(), content, timestamp: Date.now(), recipientId: recipientId || '', }; this.stream.write(message); } // Register message handler onMessage(handler: (msg: ChatMessage) => void): void { this.messageHandlers.push(handler); } // Close client's sending stream (can still receive) halfClose(): void { this.stream?.end(); } // Fully disconnect disconnect(): void { this.stream?.cancel(); this.stream = null; }} // Usageconst chatClient = new ChatClient();chatClient.onMessage((msg) => { console.log(`[${msg.senderId}]: ${msg.content}`);});chatClient.connect('user-123');chatClient.send('Hello, world!'); // ============ SERVER SIDE ============ import { ServerDuplexStream, status } from '@grpc/grpc-js'; class ChatServiceImpl { private connectedUsers = new Map<string, ServerDuplexStream<ChatMessage, ChatMessage>>(); chat(call: ServerDuplexStream<ChatMessage, ChatMessage>): void { const userId = call.metadata.get('user-id')[0] as string; if (!userId) { call.destroy({ code: status.UNAUTHENTICATED, message: 'user-id metadata required', }); return; } console.log(`User ${userId} connected`); this.connectedUsers.set(userId, call); // Notify presence this.broadcast({ type: 'PRESENCE', content: `${userId} joined`, senderId: 'system', timestamp: Date.now(), }, userId); // Handle incoming messages from this client call.on('data', (message: ChatMessage) => { console.log(`Message from ${userId}: ${message.content}`); if (message.recipientId) { // Direct message this.sendTo(message.recipientId, { ...message, senderId: userId, }); } else { // Broadcast this.broadcast({ ...message, senderId: userId, }, userId); } }); call.on('end', () => { console.log(`User ${userId} stream ended`); this.cleanupUser(userId); }); call.on('cancelled', () => { console.log(`User ${userId} cancelled`); this.cleanupUser(userId); }); call.on('error', (error) => { console.error(`User ${userId} error: ${error.message}`); this.cleanupUser(userId); }); } private sendTo(userId: string, message: ChatMessage): void { const stream = this.connectedUsers.get(userId); if (stream && !stream.cancelled) { stream.write(message); } } private broadcast(message: ChatMessage, excludeUserId?: string): void { for (const [userId, stream] of this.connectedUsers) { if (userId !== excludeUserId && !stream.cancelled) { stream.write(message); } } } private cleanupUser(userId: string): void { this.connectedUsers.delete(userId); this.broadcast({ type: 'PRESENCE', content: `${userId} left`, senderId: 'system', timestamp: Date.now(), }); }}Bidirectional streams can follow different patterns: (1) Ping-pong: strict request-response within the stream, (2) One-way heavy: most traffic flows one direction, (3) Independent: both sides send freely. gRPC supports all patterns; the stream relationship is defined by your application protocol, not gRPC itself.
When dealing with streams, flow control becomes critical. A fast producer can overwhelm a slow consumer, leading to memory exhaustion, increased latency, or dropped messages. gRPC provides automatic flow control through HTTP/2, but applications must cooperate.
Flow Control Hierarchy:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
// Proper Backpressure Handling // WRONG: Ignoring backpressureasync function badProducer(call: ServerWritableStream<Data>): Promise<void> { for (const item of hugeDataset) { call.write(item); // Buffers unbounded, memory explodes } call.end();} // CORRECT: Respecting backpressureasync function goodProducer(call: ServerWritableStream<Data>): Promise<void> { for (const item of hugeDataset) { if (call.cancelled) { return; // Don't produce for cancelled consumers } const canContinue = call.write(item); if (!canContinue) { // Buffer is full, wait for consumer to catch up await new Promise<void>(resolve => { call.once('drain', resolve); }); } } call.end();} // ADVANCED: Rate limiting with token bucketclass RateLimitedProducer { private tokens: number; private readonly maxTokens: number; private readonly refillRate: number; // tokens per second private lastRefill: number; constructor(maxTokens: number, refillRate: number) { this.tokens = maxTokens; this.maxTokens = maxTokens; this.refillRate = refillRate; this.lastRefill = Date.now(); } async acquire(): Promise<void> { this.refill(); while (this.tokens < 1) { await new Promise(r => setTimeout(r, 10)); this.refill(); } this.tokens--; } private refill(): void { const now = Date.now(); const elapsed = (now - this.lastRefill) / 1000; this.tokens = Math.min(this.maxTokens, this.tokens + elapsed * this.refillRate); this.lastRefill = now; }} async function rateLimitedStream( call: ServerWritableStream<Data>, messagesPerSecond: number): Promise<void> { const limiter = new RateLimitedProducer(messagesPerSecond, messagesPerSecond); for (const item of hugeDataset) { await limiter.acquire(); // Wait for rate limit token if (call.cancelled) return; const canContinue = call.write(item); if (!canContinue) { await new Promise<void>(r => call.once('drain', r)); } } call.end();} // CONSUMER SIDE: Processing with controlled concurrencyasync function controlledConsumer( stream: ClientReadableStream<WorkItem>, concurrency: number): Promise<void> { const semaphore = new Semaphore(concurrency); const pending: Promise<void>[] = []; for await (const item of stream) { await semaphore.acquire(); const task = processItem(item) .finally(() => semaphore.release()); pending.push(task); } await Promise.all(pending);} class Semaphore { private permits: number; private readonly waiters: (() => void)[] = []; constructor(permits: number) { this.permits = permits; } async acquire(): Promise<void> { if (this.permits > 0) { this.permits--; return; } await new Promise<void>(resolve => { this.waiters.push(resolve); }); } release(): void { const waiter = this.waiters.shift(); if (waiter) { waiter(); } else { this.permits++; } }}Unbounded buffering is a common source of production outages. Always: (1) Check call.write() return value, (2) Implement drain handlers, (3) Set maximum buffer sizes, (4) Consider rate limiting for predictable load, (5) Monitor memory usage in streaming handlers.
Error handling in streaming is more complex than unary RPCs. Errors can occur at various points in the stream lifecycle, and different errors require different responses.
Error Sources:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
// Comprehensive Stream Error Handling import { status, Metadata, ClientReadableStream, ClientWritableStream } from '@grpc/grpc-js'; // Server-side: Graceful error handlingasync function serverStreamWithErrorHandling( call: ServerWritableStream<Request, Response>): Promise<void> { let itemsSent = 0; try { for (const item of generateItems()) { // Check for client cancellation before expensive work if (call.cancelled) { console.log(`Client cancelled after ${itemsSent} items`); return; // No need to do anything, client is gone } try { const response = await processItem(item); const canContinue = call.write(response); if (!canContinue) { await new Promise<void>(r => call.once('drain', r)); } itemsSent++; } catch (itemError) { // Option 1: Skip failed item, continue stream console.error(`Skipping item: ${itemError.message}`); continue; // Option 2: End stream with error // call.destroy({ // code: status.INTERNAL, // message: `Failed processing item: ${itemError.message}`, // }); // return; } } // Successful completion call.end(); } catch (fatalError) { // Unrecoverable error console.error(`Fatal stream error: ${fatalError.message}`); call.destroy({ code: status.INTERNAL, message: 'Stream processing failed', details: fatalError.message, }); }} // Client-side: Resilient stream consumptionasync function resilientClientStream( request: Request, options: { maxRetries: number; retryDelay: number; onProgress?: (count: number) => void; }): Promise<Response[]> { const { maxRetries, retryDelay, onProgress } = options; const results: Response[] = []; let retries = 0; let lastItemId: string | undefined; while (retries <= maxRetries) { try { const stream = client.listItems({ ...request, resumeFromId: lastItemId, // Resume from last successful item }); for await (const item of stream) { results.push(item); lastItemId = item.id; onProgress?.(results.length); } // Stream completed successfully return results; } catch (error) { if (isRetryableError(error)) { retries++; console.log(`Stream failed (attempt ${retries}): ${error.message}`); if (retries <= maxRetries) { await sleep(retryDelay * retries); // Exponential backoff continue; } } // Non-retryable or exhausted retries throw new Error( `Stream failed after ${retries} retries with ${results.length} items: ${error.message}` ); } } return results;} function isRetryableError(error: any): boolean { const retryableCodes = [ status.UNAVAILABLE, // Transient failure status.RESOURCE_EXHAUSTED, // Rate limited status.ABORTED, // Conflict, retry status.INTERNAL, // Server error (maybe) ]; return retryableCodes.includes(error.code);} // Error classification for logging and metricsfunction classifyStreamError(error: any): 'client' | 'server' | 'network' | 'timeout' { switch (error.code) { case status.CANCELLED: return 'client'; case status.DEADLINE_EXCEEDED: return 'timeout'; case status.UNAVAILABLE: case status.UNKNOWN: return 'network'; case status.INTERNAL: case status.UNIMPLEMENTED: case status.RESOURCE_EXHAUSTED: return 'server'; default: return 'client'; // Assume client error }}For retryable streams, ensure item processing is idempotent. Use unique message IDs and track processed items. On retry, skip already-processed items. This prevents duplicate processing when resuming after mid-stream failures.
Selecting the appropriate gRPC pattern depends on your specific requirements. Here's a decision framework:
| Requirement | Unary | Server Stream | Client Stream | Bidirectional |
|---|---|---|---|---|
| Simple query/command | ✅ Best | ❌ | ❌ | ❌ |
| Large response (10K+ items) | ❌ | ✅ Best | ❌ | ❌ |
| Real-time subscriptions | ❌ | ✅ Use this | ❌ | ⚡ Also works |
| File/batch upload | ❌ | ❌ | ✅ Best | ❌ |
| Aggregation/reduce | ❌ | ❌ | ✅ Best | ❌ |
| Chat/messaging | ❌ | ❌ | ❌ | ✅ Best |
| Gaming/real-time sync | ❌ | ❌ | ❌ | ✅ Best |
| Simple with timeout | ✅ Best | ✅ OK | ✅ OK | ⚠️ Complex |
Streaming patterns require more implementation effort: connection management, error recovery, state synchronization, and testing. Start with unary RPCs and upgrade to streaming only when the benefits clearly outweigh the complexity. Many systems use unary for 90% of RPCs and streaming for specific high-value use cases.
gRPC's streaming capabilities unlock communication patterns impossible with traditional REST APIs. Understanding when and how to use each pattern is essential for building high-performance distributed systems.
What's Next:
We've covered gRPC's foundational technologies (Protocol Buffers, HTTP/2) and its communication patterns (streaming). Now we'll compare gRPC directly with REST to understand trade-offs and help you decide when each approach is appropriate for your specific use cases.
You now understand gRPC's streaming capabilities in depth: implementation patterns, flow control, error handling, and selection criteria. You can design streaming APIs that are efficient, resilient, and appropriate for your requirements.