Loading learning content...
Message passing is powerful but primitive. Writing correct distributed programs using raw send() and receive() calls is tedious and error-prone. Every communication requires manually constructing messages, parsing responses, and handling failures. Developers spend more time on plumbing than on business logic.
What if calling a function on a remote machine looked exactly like calling a local function?
This is the promise of Remote Procedure Calls (RPC)—an abstraction that hides the complexity of distributed communication behind a familiar function call interface. Instead of:
send(server, serialize({"op": "add", "a": 5, "b": 3}))
response = receive(server)
result = deserialize(response)["result"]
You simply write:
result = remote_server.add(5, 3)
The RPC system handles the rest—marshaling arguments into a message, transmitting over the network, unmarshaling on the server, executing the function, marshaling the result, and returning it to the caller. Distribution becomes invisible (almost).
By the end of this page, you will understand how RPC bridges local and distributed programming. You'll learn about the client and server stubs that enable transparent communication, the marshaling/unmarshaling process, binding and service discovery, call semantics, and how modern RPC frameworks like gRPC build on these foundations.
Remote Procedure Call was introduced by Birrell and Nelson in 1984 with a simple but powerful goal: make remote calls syntactically and semantically identical to local procedure calls.
The idea represents the principle of transparency—hiding the distributed nature of the system from the programmer. If the RPC system is perfectly transparent, code doesn't change whether the callee is in the same address space, a different process, or across the planet.
The Basic Mechanism
An RPC invocation follows a defined sequence:
result = server.compute(data)To the programmer, steps 2-8 are invisible. The call looks local; the mechanics are hidden.
Stubs: The Transparency Enablers
The key to RPC's transparency is the stub concept:
Client stub (proxy): A local function that has the same signature as the remote procedure. When called, it doesn't execute business logic—it packages arguments, sends them to the server, waits for a response, and returns the result.
Server stub (skeleton): Receives incoming requests, extracts arguments, calls the actual implementation, and packages the result for transmission.
Stubs are typically generated automatically from an interface definition. Developers write:
service Calculator {
int add(int a, int b);
int multiply(int a, int b);
}
A code generator produces client and server stubs for the target language. This ensures type consistency and eliminates manual marshaling code.
Despite the goal of transparency, RPC cannot perfectly hide distribution. Network latency, partial failures, and the difficulty of passing complex object graphs all 'leak' through the abstraction. As Joel Spolsky famously noted, 'All non-trivial abstractions, to some degree, are leaky.' Understanding RPC internals helps you handle these leaks gracefully.
A central challenge in RPC is converting in-memory data structures into a byte sequence suitable for network transmission, and reconstructing them on the receiving end. This process is called marshaling (or serialization).
The Marshaling Challenge
Consider calling process_user(user) where user is a complex object with nested structures, references, and perhaps cycles. The marshaler must:
Serialization Formats
The choice of serialization format significantly impacts RPC performance and usability:
| Format | Type | Human Readable | Size | Speed | Schema |
|---|---|---|---|---|---|
| JSON | Text | Yes | Large | Slow | Optional (JSON Schema) |
| XML | Text | Yes | Very Large | Slow | Yes (XSD) |
| Protocol Buffers | Binary | No | Small | Fast | Required (.proto) |
| Thrift | Binary | No | Small | Fast | Required (.thrift) |
| MessagePack | Binary | No | Small | Fast | Optional |
| FlatBuffers | Binary | No | Minimal* | Very Fast* | Required |
*FlatBuffers supports zero-copy deserialization—data can be accessed directly without parsing.
Protocol Buffers: A Deep Dive
Google's Protocol Buffers (protobuf) is the industry standard for high-performance RPC. Let's examine how it works:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546
// Protocol Buffer definition for a Calculator servicesyntax = "proto3"; package calculator; // Message definitionsmessage AddRequest { int32 a = 1; // Field number 1 int32 b = 2; // Field number 2 string request_id = 3; // Optional, for tracing} message AddResponse { int32 result = 1; int64 compute_time_ns = 2; // Processing metadata} message UserContext { string user_id = 1; repeated string roles = 2; // Array of roles map<string, string> metadata = 3; // Key-value pairs} // Complex nested messagemessage BatchRequest { UserContext context = 1; repeated AddRequest requests = 2; enum Priority { LOW = 0; NORMAL = 1; HIGH = 2; } Priority priority = 3;} message BatchResponse { repeated AddResponse results = 1; int32 total_compute_time_ms = 2;} // Service definition (for gRPC)service Calculator { rpc Add(AddRequest) returns (AddResponse); rpc BatchAdd(BatchRequest) returns (BatchResponse);}The Wire Format
Protobuf uses a compact binary encoding:
For example, encoding AddRequest{a: 5, b: 3}:
Tag for field 1 (a) + varint type: 0x08
Value 5 as varint: 0x05
Tag for field 2 (b) + varint type: 0x10
Value 3 as varint: 0x03
Total: 4 bytes: 08 05 10 03
Compare to JSON: {"a":5,"b":3} = 13 bytes—more than 3x larger for this simple case. The difference compounds with complex data.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
# Protocol Buffers marshaling/unmarshaling in Python# Generated from calculator.proto using: protoc --python_out=. calculator.proto from calculator_pb2 import AddRequest, AddResponse def demonstrate_marshaling(): # Create a message request = AddRequest() request.a = 5 request.b = 3 request.request_id = "req-12345" # Marshal to bytes wire_bytes = request.SerializeToString() print(f"Serialized size: {len(wire_bytes)} bytes") print(f"Wire format: {wire_bytes.hex()}") # Output: # Serialized size: 15 bytes # Wire format: 0805100318097265712d3132333435 # Unmarshal from bytes received = AddRequest() received.ParseFromString(wire_bytes) print(f"Received: a={received.a}, b={received.b}, id={received.request_id}") def demonstrate_forward_compatibility(): """ Protobuf handles version differences gracefully. Old code can read messages with new fields (ignores them). New code can read messages missing optional fields (uses defaults). """ # Imagine we're an old client that doesn't know about request_id old_format_bytes = bytes.fromhex("08051003") # Just a and b # New code can still parse it (request_id will be empty string) request = AddRequest() request.ParseFromString(old_format_bytes) print(f"a={request.a}, b={request.b}") # Works fine print(f"request_id='{request.request_id}'") # Empty string (default) # Conversely, new messages work with old code # Old code will ignore unknown field 3 (request_id) def benchmark_serialization(): """Compare serialization formats""" import json import time # Create test data request = AddRequest(a=5, b=3, request_id="req-12345") json_data = {"a": 5, "b": 3, "request_id": "req-12345"} iterations = 100000 # Benchmark protobuf start = time.time() for _ in range(iterations): wire = request.SerializeToString() parsed = AddRequest() parsed.ParseFromString(wire) proto_time = time.time() - start # Benchmark JSON start = time.time() for _ in range(iterations): wire = json.dumps(json_data).encode('utf-8') parsed = json.loads(wire.decode('utf-8')) json_time = time.time() - start print(f"Protobuf: {proto_time:.2f}s ({len(request.SerializeToString())} bytes)") print(f"JSON: {json_time:.2f}s ({len(json.dumps(json_data))} bytes)") print(f"Protobuf is {json_time/proto_time:.1f}x faster, " f"{len(json.dumps(json_data))/len(request.SerializeToString()):.1f}x smaller")Pointers and references cannot cross address spaces—a memory address on one machine is meaningless on another. RPC systems handle this by either (1) prohibiting pointer types, (2) deep-copying referenced data (following pointers and serializing content), or (3) replacing references with IDs and maintaining an object table. Cyclic references require special handling to avoid infinite loops during serialization.
Before a client can call a remote procedure, it must know where the server is. This is the binding problem—how does a client locate and connect to a service?
Static Binding
The simplest approach: hardcode the server address:
server = connect("192.168.1.100:8080")
result = server.add(5, 3)
This is brittle. If the server moves, restarts on a different port, or if we want multiple servers for load balancing, the client code must change.
Name Services and Registries
Production systems use service registries that map logical service names to physical locations:
"calculator-service" → "192.168.1.100:8080"lookup("calculator-service") → returns connection infoThis is location transparency—clients reference services by name, not physical address.
| Approach | Mechanism | Pros | Cons |
|---|---|---|---|
| Static configuration | Hardcoded addresses | Simple, no dependencies | Inflexible, manual updates |
| DNS-based | DNS A/SRV records | Standard, widely supported | TTL delays, limited metadata |
| Service registry | Consul, etcd, ZooKeeper | Dynamic, rich metadata, health checks | Additional infrastructure |
| Service mesh | Istio, Linkerd sidecars | Zero-config, observability | Complexity, latency overhead |
| Cloud-native | AWS ELB, K8s Services | Managed, auto-scaling | Cloud vendor lock-in |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
# Service registration and discovery with Consulimport consulimport socketimport timefrom threading import Thread class ServiceRegistry: """ Wrapper around Consul for service registration and discovery. Enables location-transparent RPC. """ def __init__(self, consul_host='localhost', consul_port=8500): self.consul = consul.Consul(host=consul_host, port=consul_port) self.registered_services = [] def register(self, service_name, port, tags=None, health_check_interval='10s'): """ Register this service instance with Consul. Includes health check so Consul knows when we're down. """ service_id = f"{service_name}-{socket.gethostname()}-{port}" # Register service with health check self.consul.agent.service.register( name=service_name, service_id=service_id, port=port, tags=tags or [], check=consul.Check.tcp( host='localhost', port=port, interval=health_check_interval, timeout='5s', deregister='1m', # Remove if unhealthy for 1 minute ) ) self.registered_services.append(service_id) print(f"Registered {service_id}") def discover(self, service_name, healthy_only=True): """ Discover all instances of a service. Returns list of (host, port) tuples. """ _, services = self.consul.health.service( service_name, passing=healthy_only ) instances = [] for service in services: host = service['Service']['Address'] or service['Node']['Address'] port = service['Service']['Port'] instances.append((host, port)) return instances def discover_one(self, service_name, strategy='random'): """ Discover a single instance (for load balancing). """ import random instances = self.discover(service_name) if not instances: raise ServiceNotFoundError(f"No healthy instances of {service_name}") if strategy == 'random': return random.choice(instances) elif strategy == 'first': return instances[0] # Could implement round-robin, least-connections, etc. def deregister_all(self): """Deregister all services on shutdown""" for service_id in self.registered_services: self.consul.agent.service.deregister(service_id) class RpcClient: """ RPC client with automatic service discovery. """ def __init__(self, registry: ServiceRegistry): self.registry = registry self.connections = {} # Cache connections def call(self, service_name, method, *args, **kwargs): """ Call a method on a named service. Handles discovery and connection management. """ # Discover a healthy instance host, port = self.registry.discover_one(service_name) # Get or create connection conn = self._get_connection(host, port) try: return conn.call(method, *args, **kwargs) except ConnectionError: # Connection failed - remove from cache, retry with different instance self._remove_connection(host, port) return self.call(service_name, method, *args, **kwargs) def _get_connection(self, host, port): key = (host, port) if key not in self.connections: self.connections[key] = create_rpc_connection(host, port) return self.connections[key] # Server-side usagedef run_server(): registry = ServiceRegistry() # Start server on a port server = CalculatorServer(port=50051) server_thread = Thread(target=server.serve) server_thread.start() # Register with Consul registry.register( 'calculator-service', port=50051, tags=['v1', 'production'], ) # On shutdown try: server_thread.join() finally: registry.deregister_all() # Client-side usage def client_example(): registry = ServiceRegistry() client = RpcClient(registry) # Call by service name - no hardcoded addresses! result = client.call('calculator-service', 'add', a=5, b=3) print(f"Result: {result}")Service registries must know when services are unhealthy. Without health checks, clients may be directed to crashed or overloaded servers. Implement both liveness checks (is the process running?) and readiness checks (is it ready to handle requests?). Kubernetes and modern service meshes distinguish these explicitly.
A local procedure call has simple semantics: call the function, it executes exactly once, and returns. Remote calls introduce complexity—what happens when the network fails or the server crashes?
The Failure Modes
The client cannot distinguish many of these scenarios. If no response arrives, did the server crash before or after execution? This ambiguity drives different call semantics.
| Semantics | Behavior on Timeout | Execution Count | Use Case |
|---|---|---|---|
| Maybe | Give up, report error | 0 or 1 times | Unimportant calls, metrics |
| At-least-once | Retry until acknowledged | 1 or more times | Idempotent operations |
| At-most-once | Timeout with no retry | 0 or 1 times | Non-idempotent operations |
| Exactly-once | Retry with deduplication | Exactly 1 time | Transactions (requires persistent state) |
At-Least-Once Semantics (Most Common)
The client retries until it gets a response:
This guarantees the procedure is executed if the server is ever reachable—but it may execute multiple times. The server must tolerate duplicates.
Idempotent Operations
An operation is idempotent if executing it multiple times has the same effect as executing it once:
set_balance(100), delete_file(x), GET /users/123increment_counter(), transfer_money(100), POST /ordersFor non-idempotent operations, at-least-once is dangerous. Retrying "transfer $100" might transfer $200 or $300.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
# RPC call semantics implementations import uuidimport timefrom abc import ABC, abstractmethod class RpcSemantics(ABC): """Base class for different RPC call semantics""" @abstractmethod def call(self, stub, method, *args, **kwargs): pass class MaybeSemantics(RpcSemantics): """ Maybe semantics: Try once, return error on failure. Execution: 0 or 1 times """ def call(self, stub, method, *args, timeout=5.0, **kwargs): try: return stub.call_with_timeout(method, args, kwargs, timeout) except TimeoutError: raise RpcError("Call failed - no retry attempted") class AtLeastOnceSemantics(RpcSemantics): """ At-least-once semantics: Retry until success. Execution: 1 or more times Warning: Only safe for idempotent operations! """ def __init__(self, max_retries=5, base_delay=1.0): self.max_retries = max_retries self.base_delay = base_delay def call(self, stub, method, *args, timeout=5.0, **kwargs): last_error = None for attempt in range(self.max_retries): try: return stub.call_with_timeout(method, args, kwargs, timeout) except TimeoutError as e: last_error = e delay = self.base_delay * (2 ** attempt) time.sleep(delay) raise RpcError(f"Call failed after {self.max_retries} retries") from last_error class AtMostOnceSemantics(RpcSemantics): """ At-most-once semantics: Try once, no automatic retry. Execution: 0 or 1 times Client can manually retry but must handle duplicates. """ def call(self, stub, method, *args, timeout=5.0, **kwargs): return MaybeSemantics().call(stub, method, *args, timeout=timeout, **kwargs) class ExactlyOnceSemantics(RpcSemantics): """ Exactly-once semantics: Request is executed exactly once. Requires: 1. Unique request ID per logical request 2. Server-side request deduplication 3. Persistent storage of processed IDs 4. Client retries with same request ID """ def __init__(self, max_retries=5, base_delay=1.0): self.max_retries = max_retries self.base_delay = base_delay def call(self, stub, method, *args, timeout=5.0, **kwargs): # Generate unique ID for this logical request # Same ID must be used across all retries request_id = kwargs.pop('_request_id', str(uuid.uuid4())) for attempt in range(self.max_retries): try: return stub.call_with_request_id( method, args, kwargs, request_id=request_id, timeout=timeout ) except TimeoutError: delay = self.base_delay * (2 ** attempt) time.sleep(delay) raise RpcError(f"Call failed after {self.max_retries} retries") class ExactlyOnceServer: """ Server-side support for exactly-once semantics. Tracks processed request IDs to detect and handle duplicates. """ def __init__(self, persistence): self.persistence = persistence # Must survive crashes! def handle_request(self, request_id, procedure, args, kwargs): # Check if we've already processed this request cached_result = self.persistence.get_result(request_id) if cached_result is not None: # Duplicate! Return cached result without re-executing return cached_result # First time seeing this request - execute result = self.execute(procedure, args, kwargs) # Persist result before responding (crash tolerance) self.persistence.store_result(request_id, result) return result def execute(self, procedure, args, kwargs): """Execute the actual procedure""" return procedure(*args, **kwargs) # Example usagedef example_client(): client = RpcClient(AtLeastOnceSemantics(max_retries=3)) # Safe: idempotent operation balance = client.call(bank_service, 'get_balance', account_id='12345') # Dangerous with at-least-once! # client.call(bank_service, 'transfer', amount=100, from_='A', to='B') # Safe: use exactly-once for non-idempotent operations tx_client = RpcClient(ExactlyOnceSemantics()) tx_client.call(bank_service, 'transfer', amount=100, from_='A', to='B', _request_id='user-generated-tx-id-12345')True exactly-once execution requires persistent storage of request IDs and results that survives server crashes. This adds significant complexity and storage overhead. Many systems opt for at-least-once with application-level idempotency instead—it's simpler and often sufficient.
Despite RPC's goal of transparency, remote calls are fundamentally different from local calls. Ignoring these differences leads to fragile systems. Jim Waldo and colleagues documented this in their classic paper "A Note on Distributed Computing" (1994).
The Fundamental Differences
| Aspect | Local Call | Remote Call |
|---|---|---|
| Latency | Nanoseconds | Milliseconds (10⁶× slower) |
| Failure modes | Crash or exception | Timeout, partial failure, network partition |
| Pass by reference | Supported | Impossible (different address spaces) |
| Memory sharing | Possible | Impossible |
| Progress guarantee | If thread runs, call completes | May never complete, may never know |
| Security | Trust same process | Must authenticate, encrypt |
| Resources | Stack space only | Sockets, buffers, network bandwidth |
Latency: The Silent Killer
A local function call takes tens of nanoseconds. An RPC over a local network takes hundreds of microseconds to milliseconds. Across the internet, it can take 50-200ms. This 10⁶× difference changes how you write code.
Consider a loop that makes 1000 calls:
# Local: 1000 × 50ns = 50 microseconds
# RPC (local network): 1000 × 1ms = 1 second
# RPC (cross-region): 1000 × 50ms = 50 seconds!
Code that's perfectly reasonable locally becomes catastrophically slow when distributed. This is why batch operations, streaming, and caching are essential in RPC systems.
Partial Failure
Local calls fail in simple ways: the program crashes, an exception is thrown. The caller always knows the outcome.
Remote calls introduce partial failure: the caller may not know if the call succeeded, failed, or is still running. The server might have crashed after executing but before responding. The response might be delayed in the network. This uncertainty requires fundamentally different error handling.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
# Patterns for handling RPC-specific challenges # 1. BATCH OPERATIONS# Instead of N round-trips, use one batch call # Bad: N remote callsdef process_users_bad(user_ids): results = [] for user_id in user_ids: user = user_service.get_user(user_id) # Remote call each time! results.append(user) return results # Good: One batch calldef process_users_good(user_ids): return user_service.get_users_batch(user_ids) # Single remote call # 2. ASYNCHRONOUS/PARALLEL CALLS# When calls are independent, run them concurrently import asyncio async def fetch_dashboard_data(user_id): # Three independent RPC calls # Sequential: 50ms + 50ms + 50ms = 150ms # Parallel: max(50ms, 50ms, 50ms) = 50ms user_task = asyncio.create_task(user_service.get_profile(user_id)) orders_task = asyncio.create_task(order_service.get_recent(user_id)) recs_task = asyncio.create_task(rec_service.get_recommendations(user_id)) user, orders, recommendations = await asyncio.gather( user_task, orders_task, recs_task ) return DashboardData(user, orders, recommendations) # 3. HANDLING PARTIAL FAILURE# Use defensive programming and explicit failure handling class UserServiceClient: def __init__(self, timeout=5.0, fallback_mode=True): self.timeout = timeout self.fallback_mode = fallback_mode self.cache = LocalCache() async def get_user(self, user_id): try: user = await asyncio.wait_for( self.stub.get_user(user_id), timeout=self.timeout ) self.cache.set(user_id, user) return user except asyncio.TimeoutError: # Timeout - RPC didn't complete in time # Could be: network issue, server slow, server dead if self.fallback_mode: cached = self.cache.get(user_id) if cached: return CachedUser(cached, stale=True) raise ServiceUnavailableError("User service timeout") except ConnectionError: # Connection failed - server definitely unreachable raise ServiceUnavailableError("User service unreachable") except RpcError as e: # Server returned an error if e.code == ErrorCode.NOT_FOUND: raise UserNotFoundError(user_id) raise # Re-raise unexpected errors # 4. TIMEOUTS AND DEADLINES# Always set timeouts! Propagate deadlines through call chains. class DeadlineContext: """Propagate deadline through RPC call chain""" def __init__(self, deadline_seconds): self.deadline = time.time() + deadline_seconds def remaining(self): return max(0, self.deadline - time.time()) def is_expired(self): return time.time() > self.deadline def api_handler(request): # API has 30s timeout ctx = DeadlineContext(30) # Each service call gets remaining time, minus buffer user = user_service.get(request.user_id, timeout=ctx.remaining() - 1) if ctx.is_expired(): raise TimeoutError("Deadline exceeded") orders = order_service.get_for_user(request.user_id, timeout=ctx.remaining() - 1) return Response(user, orders)Peter Deutsch's famous fallacies enumerate false assumptions developers make: (1) The network is reliable, (2) Latency is zero, (3) Bandwidth is infinite, (4) The network is secure, (5) Topology doesn't change, (6) There is one administrator, (7) Transport cost is zero, (8) The network is homogeneous. RPC abstractions can mask these realities, but they don't eliminate them.
Contemporary RPC frameworks build on decades of lessons learned. They provide type-safe interfaces, efficient binary serialization, multiplexed connections, and support for streaming—all while trying to make distribution less painful.
gRPC: The Industry Standard
Developed by Google, gRPC has become the de facto standard for internal service communication:
1234567891011121314151617181920212223242526272829303132333435363738394041424344
// gRPC service definitionsyntax = "proto3"; package calculator; option go_package = "github.com/example/calculator"; // Request/Response messagesmessage Number { double value = 1;} message BinaryOpRequest { double a = 1; double b = 2;} message NumberStream { repeated double values = 1;} message Statistics { double sum = 1; double average = 2; double min = 3; double max = 4; int64 count = 5;} // The Calculator service with all RPC patternsservice Calculator { // Unary: one request, one response rpc Add(BinaryOpRequest) returns (Number); rpc Multiply(BinaryOpRequest) returns (Number); // Server streaming: one request, stream of responses rpc GenerateSequence(Number) returns (stream Number); // Client streaming: stream of requests, one response rpc ComputeStatistics(stream Number) returns (Statistics); // Bidirectional streaming: both sides stream rpc StreamingCalculator(stream BinaryOpRequest) returns (stream Number);}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
# gRPC server and client implementation# Generated from: python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. calculator.proto import grpcfrom concurrent import futuresimport calculator_pb2import calculator_pb2_grpc # =================== SERVER IMPLEMENTATION =================== class CalculatorServicer(calculator_pb2_grpc.CalculatorServicer): """gRPC server implementation""" def Add(self, request, context): """Unary RPC: single request, single response""" result = request.a + request.b return calculator_pb2.Number(value=result) def GenerateSequence(self, request, context): """Server streaming: client sends one request, server sends multiple responses""" n = int(request.value) for i in range(n): yield calculator_pb2.Number(value=float(i)) def ComputeStatistics(self, request_iterator, context): """Client streaming: client streams values, server returns single summary""" values = [] for request in request_iterator: values.append(request.value) if not values: return calculator_pb2.Statistics() return calculator_pb2.Statistics( sum=sum(values), average=sum(values) / len(values), min=min(values), max=max(values), count=len(values), ) def StreamingCalculator(self, request_iterator, context): """Bidirectional streaming: both sides stream simultaneously""" for request in request_iterator: result = request.a + request.b yield calculator_pb2.Number(value=result) def serve(): """Start gRPC server""" server = grpc.server( futures.ThreadPoolExecutor(max_workers=10), options=[ ('grpc.max_receive_message_length', 10 * 1024 * 1024), ('grpc.keepalive_time_ms', 30000), ] ) calculator_pb2_grpc.add_CalculatorServicer_to_server( CalculatorServicer(), server ) server.add_insecure_port('[::]:50051') server.start() print("Server started on port 50051") server.wait_for_termination() # =================== CLIENT IMPLEMENTATION =================== def create_client(): """Create gRPC client with interceptors""" channel = grpc.insecure_channel( 'localhost:50051', options=[ ('grpc.keepalive_time_ms', 30000), ('grpc.keepalive_timeout_ms', 5000), ] ) return calculator_pb2_grpc.CalculatorStub(channel) def unary_example(stub): """Simple request-response call""" request = calculator_pb2.BinaryOpRequest(a=5.0, b=3.0) # With timeout/deadline response = stub.Add(request, timeout=5.0) print(f"5 + 3 = {response.value}") def server_streaming_example(stub): """Server streams multiple responses""" request = calculator_pb2.Number(value=10.0) # Returns an iterator of responses for response in stub.GenerateSequence(request): print(f"Received: {response.value}") def client_streaming_example(stub): """Client streams multiple requests""" def generate_numbers(): for i in range(100): yield calculator_pb2.Number(value=float(i)) response = stub.ComputeStatistics(generate_numbers()) print(f"Statistics: sum={response.sum}, avg={response.average}") def bidirectional_example(stub): """Both sides stream simultaneously""" def generate_requests(): pairs = [(1, 2), (3, 4), (5, 6), (7, 8)] for a, b in pairs: yield calculator_pb2.BinaryOpRequest(a=float(a), b=float(b)) # Call returns an iterator, we stream requests while getting responses responses = stub.StreamingCalculator(generate_requests()) for response in responses: print(f"Result: {response.value}") # =================== METADATA AND DEADLINES =================== def call_with_metadata(stub): """Pass metadata (like HTTP headers)""" metadata = [ ('authorization', 'Bearer token123'), ('x-request-id', 'req-abc-123'), ('x-correlation-id', 'corr-xyz-789'), ] request = calculator_pb2.BinaryOpRequest(a=5.0, b=3.0) response = stub.Add(request, metadata=metadata) def call_with_deadline(stub): """Set deadline that propagates through call chain""" request = calculator_pb2.BinaryOpRequest(a=5.0, b=3.0) try: # Deadline: call must complete within 100ms response = stub.Add(request, timeout=0.1) except grpc.RpcError as e: if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: print("Call timed out!") raiseUse gRPC for internal service-to-service communication where performance matters and you control both ends. Use REST/HTTP for public APIs where browser compatibility, human readability, and tool ecosystem (curl, Postman) are important. Many systems use REST at the edge and gRPC internally.
Real-world RPC systems require more than basic request-response. Here are essential patterns for production systems.
Interceptors/Middleware
Interceptors wrap RPC calls on both client and server sides, enabling cross-cutting concerns without modifying business logic:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
# gRPC interceptors for cross-cutting concerns import grpcimport timeimport loggingfrom typing import Callable, Any # =================== CLIENT INTERCEPTORS =================== class LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): """Log all outgoing RPC calls""" def intercept_unary_unary(self, continuation, client_call_details, request): method = client_call_details.method logging.info(f"Calling {method}") start_time = time.time() response = continuation(client_call_details, request) latency_ms = (time.time() - start_time) * 1000 logging.info(f"{method} completed in {latency_ms:.1f}ms") return response class RetryClientInterceptor(grpc.UnaryUnaryClientInterceptor): """Automatically retry failed calls""" def __init__(self, max_retries=3, retryable_codes=None): self.max_retries = max_retries self.retryable_codes = retryable_codes or { grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED, } def intercept_unary_unary(self, continuation, client_call_details, request): for attempt in range(self.max_retries + 1): try: return continuation(client_call_details, request) except grpc.RpcError as e: if e.code() not in self.retryable_codes: raise # Non-retryable error if attempt == self.max_retries: raise # Max retries exceeded delay = (2 ** attempt) * 0.1 # Exponential backoff time.sleep(delay) class TracingClientInterceptor(grpc.UnaryUnaryClientInterceptor): """Inject distributed trace context into calls""" def intercept_unary_unary(self, continuation, client_call_details, request): # Get current trace context (e.g., from OpenTelemetry) trace_id = get_current_trace_id() span_id = create_new_span_id() # Add trace headers to metadata new_metadata = list(client_call_details.metadata or []) new_metadata.extend([ ('x-trace-id', trace_id), ('x-span-id', span_id), ('x-parent-span-id', get_current_span_id()), ]) new_details = client_call_details._replace(metadata=new_metadata) return continuation(new_details, request) # =================== SERVER INTERCEPTORS =================== class AuthenticationServerInterceptor(grpc.ServerInterceptor): """Validate authentication before handling requests""" def __init__(self, validator): self.validator = validator def intercept_service(self, continuation, handler_call_details): method = handler_call_details.method # Skip auth for health checks if method == '/grpc.health.v1.Health/Check': return continuation(handler_call_details) # Extract token from metadata metadata = dict(handler_call_details.invocation_metadata) token = metadata.get('authorization') if not token: return self._abort(grpc.StatusCode.UNAUTHENTICATED, "Missing token") try: user = self.validator.validate(token) # Attach user to context for handler use handler_call_details.user = user return continuation(handler_call_details) except InvalidTokenError: return self._abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token") def _abort(self, code, message): def abort_handler(request, context): context.abort(code, message) return grpc.unary_unary_rpc_method_handler(abort_handler) class MetricsServerInterceptor(grpc.ServerInterceptor): """Collect request metrics""" def __init__(self, metrics_client): self.metrics = metrics_client def intercept_service(self, continuation, handler_call_details): method = handler_call_details.method start_time = time.time() try: response = continuation(handler_call_details) self.metrics.increment(f'rpc.{method}.success') return response except grpc.RpcError as e: self.metrics.increment(f'rpc.{method}.error.{e.code().name}') raise finally: latency_ms = (time.time() - start_time) * 1000 self.metrics.histogram(f'rpc.{method}.latency_ms', latency_ms) # =================== WIRING INTERCEPTORS =================== def create_client_with_interceptors(): """Create client channel with interceptor chain""" channel = grpc.insecure_channel('localhost:50051') # Interceptors execute in order: logging -> retry -> tracing channel = grpc.intercept_channel( channel, LoggingClientInterceptor(), RetryClientInterceptor(max_retries=3), TracingClientInterceptor(), ) return calculator_pb2_grpc.CalculatorStub(channel) def create_server_with_interceptors(): """Create server with interceptor chain""" server = grpc.server( futures.ThreadPoolExecutor(max_workers=10), interceptors=[ AuthenticationServerInterceptor(TokenValidator()), MetricsServerInterceptor(PrometheusClient()), ] ) return serverConnection Pooling and Multiplexing
gRPC uses HTTP/2, which supports multiple logical streams over a single TCP connection. This means:
Clients maintain connection pools to servers. The pool size affects both performance and server load.
RPC load balancing can happen at multiple layers: (1) DNS-based: Return different IP addresses for the same hostname. (2) L4 (connection-level): Load balancers distribute TCP connections. (3) L7 (request-level): gRPC-aware proxies/clients distribute individual RPCs. L7 is most flexible—it can balance based on request content and server health.
RPC transforms the primitive send/receive model of message passing into a familiar function call interface. While it cannot fully hide distribution's challenges, it dramatically simplifies distributed programming.
What's Next:
We'll explore REST and gRPC in more detail—comparing the two dominant approaches to service APIs. You'll see when to choose each, how they handle common challenges differently, and how to design APIs that serve both internal efficiency and external usability.
You now understand how RPC bridges the gap between message passing primitives and application-level programming. From stubs and marshaling to service discovery and failure semantics, you have the foundation to work with any RPC framework.