Loading content...
Every individual machine has finite capacity—there's only so much CPU, memory, network bandwidth, and disk I/O available. Vertical scaling (upgrading to more powerful hardware) eventually hits physical limits and rapidly escalating costs. A server with twice the CPU might cost four times as much.
Horizontal scaling takes a fundamentally different approach: instead of making one machine more powerful, distribute work across many machines. If one server handles 1,000 requests per second, ten servers can handle 10,000. A hundred servers can handle 100,000.
The transformation:
Vertical Scaling Horizontal Scaling
┌─────────────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ │ │ Server │ │ Server │ │ Server │
│ BIG SERVER │ versus │ 1 │ │ 2 │ │ ... │
│ (expensive) │ │ (cheap) │ │ (cheap) │ │ (cheap) │
│ │ └─────────┘ └─────────┘ └─────────┘
└─────────────────┘ │
┌─────────▼─────────┐
│ Load Balancer │
└───────────────────┘
This page explores how to design systems that scale horizontally—the architectural patterns, challenges, and trade-offs that enable web-scale throughput.
This page covers horizontal scaling fundamentals: stateless service design, data partitioning strategies, distributed coordination, capacity planning, and the challenges of maintaining consistency across distributed nodes. You'll understand when horizontal scaling applies and how to architect systems that scale gracefully.
Both scaling strategies have their place. Understanding the trade-offs helps choose the right approach for each situation.
| Aspect | Vertical Scaling | Horizontal Scaling |
|---|---|---|
| Mechanism | Bigger/faster hardware | More machines |
| Limit | Physical hardware limits | Practically unlimited |
| Cost curve | Exponential (diminishing returns) | Linear (economies of scale) |
| Downtime | Often required for upgrades | Zero-downtime scaling possible |
| Complexity | Simple (no distribution) | Complex (distributed systems) |
| Failure domain | Single point of failure | Tolerates individual failures |
| Data consistency | Strong (single machine) | Requires coordination |
| Best for | Databases, stateful systems | Stateless services, reads |
The Cost Curve Reality:
Cost ($)
▲
│ ╱ Vertical (exponential)
│ ╱
│ ╱
│ ╱
│ ╱
│ ╱─────────── Horizontal (linear)
│ ╱
│ ╱
│ ╱
│ ╱
└────────────────────────────────▶ Capacity
Example:
- 1x capacity: $100/month (small server)
- 2x capacity vertical: $400/month (larger server)
- 2x capacity horizontal: $200/month (two small servers)
- 10x capacity vertical: Often impossible
- 10x capacity horizontal: $1000/month (ten small servers)
The key to horizontal scaling is statelessness. A stateless service doesn't store any data locally that needs to persist between requests. Any server can handle any request, enabling perfect load distribution.
Stateful vs Stateless:
Stateful Service (Cannot scale horizontally):
┌─────────────────────────────────┐
│ Server 1 │
│ ┌───────────────────────────┐ │ User session stored locally
│ │ User A session: {...} │ │ Request MUST go to Server 1
│ │ User B session: {...} │ │ Server failure = data loss
│ └───────────────────────────┘ │
└─────────────────────────────────┘
Stateless Service (Easy horizontal scaling):
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Server 1 │ │ Server 2 │ │ Server 3 │
│ (no state)│ │ (no state)│ │ (no state)│
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
└──────────────┼──────────────┘
│
┌──────▼──────┐
│ Shared State│
│ (Redis, DB) │
└─────────────┘
Any server can handle any request.
Load balancer distributes freely.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
// Patterns for stateless service design // Pattern 1: Externalized session with Redisimport session from 'express-session';import RedisStore from 'connect-redis';import { createClient } from 'redis'; const redisClient = createClient({ url: 'redis://session-cluster:6379' });await redisClient.connect(); app.use(session({ store: new RedisStore({ client: redisClient }), secret: process.env.SESSION_SECRET!, resave: false, saveUninitialized: false, cookie: { secure: true, maxAge: 24 * 60 * 60 * 1000 // 24 hours },})); // Session works across ALL servers in the clusterapp.get('/dashboard', (req, res) => { const userId = req.session.userId; // Retrieved from Redis // ...}); // Pattern 2: JWT for stateless authenticationimport jwt from 'jsonwebtoken'; // Login: Create token with user stateapp.post('/login', async (req, res) => { const user = await authenticateUser(req.body); const token = jwt.sign( { userId: user.id, roles: user.roles, // Include only necessary state - keep token small }, process.env.JWT_SECRET!, { expiresIn: '24h' } ); res.json({ token });}); // Protected route: Validate token (no database lookup needed!)app.get('/api/data', (req, res) => { const token = req.headers.authorization?.replace('Bearer ', ''); try { const payload = jwt.verify(token!, process.env.JWT_SECRET!); // payload.userId, payload.roles available without DB query // Any server can validate - pure stateless } catch (err) { res.status(401).json({ error: 'Invalid token' }); }}); // Pattern 3: Externalized cacheimport Redis from 'ioredis'; const cache = new Redis.Cluster([ { host: 'cache-1', port: 6379 }, { host: 'cache-2', port: 6379 }, { host: 'cache-3', port: 6379 },]); async function getUserWithCache(userId: string): Promise<User> { const cacheKey = `user:${userId}`; // Check shared cache (accessible from any server) const cached = await cache.get(cacheKey); if (cached) { return JSON.parse(cached); } // Cache miss: fetch from database const user = await db.users.findById(userId); // Store in shared cache await cache.setex(cacheKey, 3600, JSON.stringify(user)); return user;} // Pattern 4: File uploads to S3 (not local disk)import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'; const s3 = new S3Client({ region: 'us-east-1' }); app.post('/upload', upload.single('file'), async (req, res) => { const file = req.file!; const key = `uploads/${Date.now()}-${file.originalname}`; // Store in S3, not local filesystem await s3.send(new PutObjectCommand({ Bucket: 'my-app-uploads', Key: key, Body: file.buffer, ContentType: file.mimetype, })); // Return S3 URL - any server can serve this res.json({ url: `https://my-app-uploads.s3.amazonaws.com/${key}` });});With multiple servers, you need a strategy to distribute incoming requests. The distribution mechanism significantly impacts throughput and reliability.
Load Balancer Architecture:
Internet
│
▼
┌────────────────┐
│ DNS / CDN │
│ (Geographic) │
└───────┬────────┘
│
┌───────▼────────┐
│ Load Balancer │ ← L7: HTTP routing
│ (L4/L7) │ ← L4: TCP distribution
└───────┬────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ Server 1 │ │ Server 2 │ │ Server 3 │
│ (AZ-a) │ │ (AZ-b) │ │ (AZ-c) │
└───────────┘ └───────────┘ └───────────┘
| Algorithm | Best For | Weakness |
|---|---|---|
| Round Robin | Homogeneous workloads | Ignores server health/capacity |
| Weighted Round Robin | Mixed server capacities | Weights may become stale |
| Least Connections | Variable request duration | Doesn't account for server capacity |
| Least Response Time | Latency-sensitive apps | Measurement overhead |
| IP Hash | Sticky sessions without state | Uneven distribution with few clients |
| Consistent Hashing | Distributed caches | More complex to implement |
Load balancers must detect unhealthy servers and stop routing to them. Configure active health checks (periodic probes) or use passive health checks (track response codes). A load balancer routing to dead servers is worse than no load balancer at all.
While stateless services scale easily, stateful systems (databases, caches) require data partitioning (sharding) to scale horizontally. Sharding divides data across multiple nodes, each responsible for a subset.
Sharding Strategies:
Range-Based Sharding: Hash-Based Sharding:
Shard 1: user_id 1-1M Shard = hash(user_id) % N
Shard 2: user_id 1M-2M
Shard 3: user_id 2M-3M User 12345 → hash(12345) % 3 = 1 → Shard 1
User 67890 → hash(67890) % 3 = 0 → Shard 0
Pros: Range scans efficient
Cons: Hotspots if ranges uneven Pros: Even distribution
Cons: Range queries require all shards
Directory-Based Sharding: Geographic Sharding:
┌──────────────┐ ┌─────────────┐
│ Lookup │ │ US Data │── US Shard
│ Service │ │ EU Data │── EU Shard
│ user → shard │ │ AP Data │── AP Shard
└──────────────┘ └─────────────┘
Flexible but lookup overhead Data locality for latency
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
// Application-level sharding implementation interface ShardConfig { id: number; host: string; port: number; database: string;} class ShardedDatabase { private shards: Map<number, Database> = new Map(); private readonly shardCount: number; constructor(shardConfigs: ShardConfig[]) { this.shardCount = shardConfigs.length; for (const config of shardConfigs) { this.shards.set(config.id, new Database(config)); } } // Determine shard for a key private getShardId(shardKey: string): number { // Consistent hashing for stable distribution const hash = this.hashFunction(shardKey); return hash % this.shardCount; } private hashFunction(key: string): number { // Simple hash - use murmur3 or xxhash in production let hash = 0; for (let i = 0; i < key.length; i++) { hash = ((hash << 5) - hash) + key.charCodeAt(i); hash = hash & hash; // Convert to 32-bit integer } return Math.abs(hash); } // Get database connection for a shard key getShard(shardKey: string): Database { const shardId = this.getShardId(shardKey); const shard = this.shards.get(shardId); if (!shard) { throw new Error(`Shard ${shardId} not found`); } return shard; } // Insert to correct shard async insert(userId: string, data: Record<string, unknown>) { const shard = this.getShard(userId); return shard.insert('users', { userId, ...data }); } // Query single shard async findById(userId: string) { const shard = this.getShard(userId); return shard.findOne('users', { userId }); } // Scatter-gather for cross-shard queries async findAll(filter: Record<string, unknown>): Promise<any[]> { // Query all shards in parallel const results = await Promise.all( Array.from(this.shards.values()).map(shard => shard.find('users', filter) ) ); // Merge results return results.flat(); }} // Usageconst shardedDb = new ShardedDatabase([ { id: 0, host: 'shard-0', port: 5432, database: 'users' }, { id: 1, host: 'shard-1', port: 5432, database: 'users' }, { id: 2, host: 'shard-2', port: 5432, database: 'users' },]); // Single-shard operation (fast)const user = await shardedDb.findById('user-123'); // Cross-shard query (slower, hits all shards)const premiumUsers = await shardedDb.findAll({ plan: 'premium' }); // Consistent hashing for cache sharding (handles node changes gracefully)import { ConsistentHash } from 'consistent-hash'; const ring = new ConsistentHash();ring.add('cache-1');ring.add('cache-2');ring.add('cache-3'); function getCacheNode(key: string): string { return ring.get(key); // Returns node for this key} // Adding new node only reassigns ~1/N of keysring.add('cache-4'); // Minimal redistributionStatic horizontal scaling requires predicting peak load and provisioning accordingly. Auto-scaling adds and removes capacity dynamically based on actual demand, optimizing both cost and performance.
Auto-Scaling Loop:
┌───────────────────────────────────┐
│ │
▼ │
┌─────────────────┐ │
│ Collect Metrics │ │
│ (CPU, Memory, │ │
│ Queue Depth) │ │
└────────┬────────┘ │
│ │
▼ │
┌─────────────────┐ │
│ Evaluate Rules │ │
│ Scale up if │ │
│ CPU > 70% │ │
│ Scale down if │ │
│ CPU < 30% │ │
└────────┬────────┘ │
│ │
▼ │
┌─────────────────┐ │
│ Execute Scaling │ │
│ Add/remove │ │
│ instances │◄─── Cooldown ────────────┤
└────────┬────────┘ (prevent thrashing) │
│ │
└───────────────────────────────────┘
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
// AWS CDK Auto-Scaling configurationimport * as autoscaling from 'aws-cdk-lib/aws-autoscaling';import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch'; // EC2 Auto-Scaling Groupconst asg = new autoscaling.AutoScalingGroup(this, 'WebServers', { instanceType: ec2.InstanceType.of(ec2.InstanceClass.M5, ec2.InstanceSize.LARGE), machineImage: ec2.MachineImage.latestAmazonLinux(), vpc, // Capacity bounds minCapacity: 2, maxCapacity: 100, desiredCapacity: 4, // Health check healthCheck: autoscaling.HealthCheck.elb({ grace: Duration.minutes(5), // Time for new instances to warm up }),}); // Scale based on CPUasg.scaleOnCpuUtilization('CpuScaling', { targetUtilizationPercent: 70, cooldown: Duration.minutes(5), // Wait before another scaling action scaleInCooldown: Duration.minutes(10), // Longer cooldown for scale-in}); // Scale based on request countasg.scaleOnRequestCount('RequestScaling', { targetRequestsPerMinute: 10000, // Per instance cooldown: Duration.minutes(3),}); // Scale based on custom metric (active game sessions)asg.scaleOnMetric('SessionScaling', { metric: new cloudwatch.Metric({ namespace: 'GameServer', metricName: 'ActiveSessions', dimensionsMap: { AutoScalingGroup: asg.autoScalingGroupName, }, statistic: 'Sum', }), scalingSteps: [ { upper: 100, change: -2 }, // Below 100 sessions: remove 2 { lower: 500, change: +1 }, // Above 500: add 1 { lower: 1000, change: +3 }, // Above 1000: add 3 ], adjustmentType: autoscaling.AdjustmentType.CHANGE_IN_CAPACITY,}); // Kubernetes Horizontal Pod Autoscaler (HPA)// kubectl apply -f hpa.yaml /*apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: web-server-hpaspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: web-server minReplicas: 2 maxReplicas: 50 metrics: # CPU-based scaling - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 # Memory-based scaling - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 # Custom metric: requests per second - type: Pods pods: metric: name: http_requests_per_second target: type: AverageValue averageValue: "1000" # 1000 rps per pod behavior: scaleDown: stabilizationWindowSeconds: 300 # Wait 5 min before scale down policies: - type: Percent value: 25 # Remove max 25% of pods periodSeconds: 60 scaleUp: stabilizationWindowSeconds: 0 # Scale up immediately policies: - type: Percent value: 100 # Can double capacity periodSeconds: 60 - type: Pods value: 4 # Or add 4 pods periodSeconds: 60 selectPolicy: Max # Use whichever adds more*/Horizontal scaling introduces complexity that doesn't exist in single-machine systems. Understanding these challenges is essential for building reliable distributed systems.
The CAP Theorem Trade-off:
Consistency (C)
╱╲
╱ ╲
╱ ╲
╱ CP ╲
╱ systems╲
╱ e.g. ╲
╱ Zookeeper╲
╱─────────────╲
╱ ╲ ╲
╱ ╲ ╲
╱ CA ╲ AP ╲
╱ systems ╲systems╲
╱ e.g. ╲ e.g. ╲
╱ PostgreSQL ╲DynamoDB
▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔▔
Availability (A) Partition
Tolerance (P)
In a distributed system, you can only guarantee 2 of 3.
Since networks can always partition, you're really choosing
between CP (consistent but may be unavailable) and
AP (available but may be inconsistent).
Most web applications don't need strong consistency for every operation. User profile updates, feed posts, and product views can tolerate seconds of delay. Reserve strong consistency for critical operations: payments, inventory, and account balance. This hybrid approach enables horizontal scaling where it matters.
Effective horizontal scaling requires understanding current capacity and projecting future needs.
Capacity Planning Process:
1. Measure Current Performance
│
├── What's the maximum RPS per instance?
├── At what point does latency degrade?
├── What's the limiting resource (CPU, memory, network)?
│
▼
2. Project Future Load
│
├── Historical growth rate
├── Planned feature launches
├── Marketing campaigns / events
│
▼
3. Calculate Required Capacity
│
├── Peak instances = Peak RPS / (RPS per instance × 0.7)
│ ↑
│ Target 70% utilization for headroom
├── Add buffer for failures (N+1 or N+2)
│
▼
4. Implement with Headroom
│
├── Set auto-scaling min = projected average
├── Set auto-scaling max = projected peak × 1.5
└── Review quarterly
| Metric | Current | 6-Month Projection | 12-Month Projection |
|---|---|---|---|
| Average RPS | 5,000 | 10,000 | 20,000 |
| Peak RPS | 15,000 | 30,000 | 60,000 |
| RPS per instance | 500 | 500 | 500 |
| Instances at average | 10 | 20 | 40 |
| Instances at peak | 30 | 60 | 120 |
| Instance with buffer (+50%) | 45 | 90 | 180 |
| Monthly cost (est.) | $4,500 | $9,000 | $18,000 |
We've explored horizontal scaling as the ultimate throughput multiplier. Here are the key insights:
Module Conclusion:
This module has covered the core techniques for throughput optimization:
These techniques compound. A system that batches requests, reuses connections, processes asynchronously via queues, and scales horizontally can achieve throughput orders of magnitude beyond naive implementation. The art of system design is knowing which techniques apply to your specific constraints.
You've completed the Throughput Optimization module! You now have a comprehensive understanding of techniques to maximize the work your systems can accomplish—from parallelization and batching at the code level, through connection reuse and queue-based architectures, to horizontal scaling across distributed systems. Apply these techniques systematically to build systems that handle massive scale.