Loading content...
In the scaling journey of almost every distributed system, the database becomes the first and most persistent bottleneck. Application servers can be multiplied effortlessly; databases cannot. While you can spin up 100 stateless API servers in minutes, scaling the database layer requires careful architecture and often fundamental trade-offs.
The database scaling paradox: The very properties that make databases valuable—ACID transactions, referential integrity, complex queries, consistent views—are exactly what makes them hard to distribute.
This page explores the patterns that experienced engineers use to scale databases, from simple replication to sophisticated sharding strategies, and the emerging technologies that promise to ease this fundamental challenge.
By the end of this page, you will understand the complete database scaling toolkit: read replicas, connection pooling, partitioning strategies, sharding approaches, and the trade-offs of each. You'll be equipped to design database architectures that scale to millions of users.
Database scaling typically follows a predictable progression. Understanding this journey helps you make the right investments at the right time.
The Typical Progression:
STAGE 1: Single Database (Day 0 - ~100K users)┌─────────────────────────────────────┐│ Single Database Server ││ - Simple, no complexity ││ - Limited by single machine ││ - SPOF (single point of failure) │└─────────────────────────────────────┘Action: Optimize queries, add indexes, tune config STAGE 2: Primary + Read Replicas (~100K - 1M users)┌───────────────────┐ ┌──────────────────┐│ Primary (writes) │────►│ Read Replicas ││ (single server) │ │ (multiple) │└───────────────────┘ └──────────────────┘Action: Route reads to replicas, handle replication lag STAGE 3: Vertical Scaling + Optimization (~1-5M users)┌─────────────────────────────────────┐│ Bigger Primary Server ││ - More CPU, RAM, faster SSD ││ - Connection pooler (PgBouncer) ││ - Query optimization, denormalization│└─────────────────────────────────────┘Action: Squeeze more from single primary before sharding STAGE 4: Functional Partitioning (~5-20M users)┌────────────┐ ┌────────────┐ ┌────────────┐│ Users DB │ │ Orders DB │ │ Products ││ (primary) │ │ (primary) │ │ DB │└────────────┘ └────────────┘ └────────────┘Action: Separate databases by domain (microservices) STAGE 5: Horizontal Sharding (20M+ users)┌───────────┐ ┌───────────┐ ┌───────────┐│ Shard 1 │ │ Shard 2 │ │ Shard N ││ users 1-M │ │ users M+1 │ │ users ... │└───────────┘ └───────────┘ └───────────┘Action: Distribute single table across multiple serversSharding is the most complex scaling pattern with the highest operational overhead. Most systems should exhaust simpler options—vertical scaling, read replicas, caching, query optimization—before sharding. A well-tuned single PostgreSQL instance can handle millions of transactions per day.
Read replication is the most common database scaling pattern because it's relatively simple and dramatically effective for read-heavy workloads.
How Replication Works:
WRITES READS │ │ ▼ │ ┌───────────────┐ │ │ Primary │ │ │ (Leader) │ │ │ │ │ │ WAL Stream │ │ └───────┬───────┘ │ │ │ ┌────────────────┼────────────────┐ │ │ │ │ │ ▼ ▼ ▼ ▼┌──────────┐ ┌──────────┐ ┌──────────┐│ Replica │ │ Replica │ │ Replica ││ #1 │ │ #2 │ │ #3 ││ │ │ │ │ ││ (read) │ │ (read) │ │ (read) │└──────────┘ └──────────┘ └──────────┘ Replication Types:┌─────────────────────────────────────────────────────────────┐│ SYNCHRONOUS │ ASYNCHRONOUS ││ ──────────────────────────── │ ─────────────────────────── ││ ✓ Zero data loss │ ✗ Potential data loss ││ ✗ Higher write latency │ ✓ Fast writes ││ ✗ Replica failure blocks │ ✓ Replica failure OK ││ primary │ ││ Use: Financial transactions │ Use: Most applications │└─────────────────────────────────────────────────────────────┘Replication Lag: The Critical Challenge
Asynchronous replicas lag behind the primary. This lag—typically milliseconds but potentially seconds under load—creates consistency issues:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
// THE PROBLEM: "Read-your-writes" inconsistencyasync function updateProfile(userId: string, data: ProfileData) { // Write goes to PRIMARY await primary.update('users', userId, data); // Redirect to profile page return redirect(`/users/${userId}`);} async function getProfile(userId: string) { // Read goes to REPLICA (might be stale!) const user = await replica.query('SELECT * FROM users WHERE id = ?', [userId]); // User sees OLD data after just updating! // "Why didn't my changes save??" return user;} // SOLUTION 1: Read from primary after writeasync function getProfile(userId: string, { afterWrite = false } = {}) { const source = afterWrite ? primary : replica; return source.query('SELECT * FROM users WHERE id = ?', [userId]);} // SOLUTION 2: Causal consistency tokensasync function updateProfile(userId: string, data: ProfileData) { await primary.update('users', userId, data); const lsn = await primary.getCurrentLSN(); // Log Sequence Number // Pass LSN to next read return redirect(`/users/${userId}?lsn=${lsn}`);} async function getProfile(userId: string, minLsn?: string) { // Wait for replica to catch up to LSN if (minLsn) { await replica.waitForLSN(minLsn, { timeout: 5000 }); } return replica.query('SELECT * FROM users WHERE id = ?', [userId]);} // SOLUTION 3: Session-based routingclass SmartRouter { private recentWrites = new Map<string, number>(); async getConnection(userId: string, isWrite: boolean) { if (isWrite) { this.recentWrites.set(userId, Date.now()); return primary; } // Route to primary for 5 seconds after any write const lastWrite = this.recentWrites.get(userId); if (lastWrite && Date.now() - lastWrite < 5000) { return primary; } return this.selectReplica(); }}Always monitor replication lag in production. Alert when lag exceeds acceptable thresholds (e.g., 1 second). Under unexpected high write loads, replicas can fall behind by minutes—at which point you're effectively serving stale reads. Consider automatic failover to reading from primary if lag exceeds thresholds.
Before scaling hardware, scale efficiency. Connection pooling is one of the most impactful—and underappreciated—database scaling techniques.
The Connection Problem:
Database connections are expensive resources. Each PostgreSQL connection consumes ~10MB of memory. Each MySQL connection uses dedicated threads. At scale, applications opening thousands of connections can overwhelm databases.
| Scenario | Connections Required | Database Impact |
|---|---|---|
| 10 app servers × 20 connections each | 200 connections | ~2GB RAM just for connections |
| 100 app servers × 20 connections each | 2,000 connections | ~20GB RAM, severe performance hit |
| K8s autoscaling: 500 pods × 20 each | 10,000 connections | Database refuses connections, crashes |
Solution: External Connection Poolers
A dedicated connection pooler sits between applications and the database, multiplexing many client connections onto fewer database connections.
WITHOUT POOLER:┌──────────────────────────────────────────────────────────┐│ 100 App Servers ││ Each opens 20 connections directly to database ││ = 2,000 database connections │└────────────────────────────┬─────────────────────────────┘ │ 2,000 connections ▼ ┌────────────────┐ │ Database │ │ (overwhelmed) │ └────────────────┘ WITH POOLER (e.g., PgBouncer):┌──────────────────────────────────────────────────────────┐│ 100 App Servers ││ Each opens 20 connections to pooler ││ = 2,000 client connections to pooler │└────────────────────────────┬─────────────────────────────┘ │ 2,000 client connections ▼ ┌────────────────┐ │ PgBouncer │ │ (pooler) │ │ │ │ Multiplexes to │ │ 50-100 actual │ │ connections │ └───────┬────────┘ │ 50-100 database connections ▼ ┌────────────────┐ │ Database │ │ (happy!) │ └────────────────┘Pooling Modes:
| Mode | Description | Use Case |
|---|---|---|
| Session pooling | Connection held for full session | Apps using prepared statements |
| Transaction pooling | Connection returned after each transaction | Most web applications (best efficiency) |
| Statement pooling | Connection returned after each statement | Extreme efficiency, limited feature support |
1234567891011121314151617181920212223242526272829303132
; PgBouncer Configuration for Production[databases]; Route all connections to primary or replicasprimary = host=primary.internal dbname=myappreplica = host=replica.internal dbname=myapp [pgbouncer]; Pooler settingspool_mode = transaction ; Best for web appsmax_client_conn = 10000 ; Handle many app connectionsdefault_pool_size = 50 ; Connections per user/database pairmin_pool_size = 10 ; Always keep some warmreserve_pool_size = 25 ; Emergency overflowreserve_pool_timeout = 5 ; Wait 5s before using reserve ; Performancetcp_keepalive = 1tcp_keepcnt = 5 tcp_keepidle = 30tcp_keepintvl = 10 ; Timeoutsquery_timeout = 30 ; Kill long queriesclient_idle_timeout = 0 ; Don't timeout idle clientsserver_connect_timeout = 3 ; Connect to DB timeoutserver_idle_timeout = 60 ; Close idle DB connections ; Logginglog_connections = 0 ; Reduce log spam in productionlog_disconnections = 0log_pooler_errors = 1stats_period = 60Serverless functions (AWS Lambda, Vercel) exacerbate connection problems because each function instance opens its own connection. AWS RDS Proxy, Neon's connection pooler, and PlanetScale's serverless driver are purpose-built for this challenge.
Before horizontal sharding, consider vertical partitioning—splitting tables across databases based on function or access patterns.
Types of Vertical Partitioning:
1. Functional Partitioning (Database per Service)
Separate logical domains into different databases. This often aligns with microservices boundaries.
MONOLITH DATABASE:┌─────────────────────────────────────────────────────────┐│ Single Database ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │ users │ │ orders │ │ products│ │ payments│ ││ │ profiles│ │ items │ │ reviews │ │ invoices│ ││ │ sessions│ │ carts │ │ inventory│ │ refunds │ ││ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││ ││ All tables compete for same resources ││ One slow query affects everything ││ Scaling requires scaling everything │└─────────────────────────────────────────────────────────┘ FUNCTIONALLY PARTITIONED:┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ Users DB │ │ Orders DB │ │ Products DB │ │ Payments DB ││ │ │ │ │ │ │ ││ users │ │ orders │ │ products │ │ payments ││ profiles │ │ items │ │ reviews │ │ invoices ││ sessions │ │ carts │ │ inventory │ │ refunds ││ │ │ │ │ │ │ ││ Scale: │ │ Scale: │ │ Scale: │ │ Scale: ││ Read heavy │ │ Write heavy│ │ Read heavy │ │ Consistent │└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ Benefits:- Independent scaling per domain- Isolation: orders spike doesn't affect user authentication- Different databases for different needs (SQL for orders, NoSQL for products)- Clear ownership boundaries Challenges:- Cross-database JOINs are impossible- Distributed transactions are complex- Data duplication may be necessary2. Column Partitioning (Table Splitting)
Split large tables into multiple tables based on access patterns—frequently accessed columns in one table, less accessed in another.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
-- BEFORE: Wide table with mixed access patternsCREATE TABLE users ( id BIGINT PRIMARY KEY, email VARCHAR(255), -- High read password_hash VARCHAR(255), -- High read (auth) name VARCHAR(255), -- High read created_at TIMESTAMP, -- Low read bio TEXT, -- Low read avatar_url VARCHAR(512), -- Low read preferences JSONB, -- Low read last_login TIMESTAMP, -- Medium read/write login_count INT, -- Medium read/write -- ... 20 more columns); -- Problem: Every query loads all columns into memory-- Large bio/preferences columns slow down hot authentication queries -- AFTER: Split by access patternCREATE TABLE users_core ( id BIGINT PRIMARY KEY, email VARCHAR(255), password_hash VARCHAR(255), name VARCHAR(255), last_login TIMESTAMP, login_count INT);-- Hot table: fits in memory, fast auth queries CREATE TABLE users_profile ( user_id BIGINT PRIMARY KEY REFERENCES users_core(id), bio TEXT, avatar_url VARCHAR(512), preferences JSONB, created_at TIMESTAMP);-- Cold table: loaded only when viewing profile -- Auth query only touches hot tableSELECT id, email, password_hash FROM users_core WHERE email = 'user@example.com'; -- Profile query joins when neededSELECT c.*, p.*FROM users_core cJOIN users_profile p ON c.id = p.user_idWHERE c.id = 12345;A common pattern is separating 'hot' (frequently accessed) and 'cold' (rarely accessed) data. Keep hot data in high-performance storage; archive cold data to cheaper storage. This often reduces storage costs by 60-80% while improving hot path performance.
When vertical scaling and replication are exhausted, horizontal sharding—distributing rows of a single table across multiple databases—becomes necessary. This is the most powerful scaling technique but also the most operationally complex.
Sharding Fundamentals:
SHARDING: Distribute rows across multiple databases SINGLE DATABASE:┌─────────────────────────────────────────────────────────┐│ orders table ││ ┌─────────────────────────────────────────────────────┤│ │ order_id │ user_id │ total │ created_at │ ... ││ │ 1 │ 1001 │ $99 │ 2024-01-01 │ ││ │ 2 │ 1002 │ $150 │ 2024-01-01 │ ││ │ 3 │ 1001 │ $200 │ 2024-01-02 │ ││ │ ... │ ... │ ... │ ... │ ││ │ 10M │ 5000 │ $42 │ 2024-12-31 │ ││ └─────────────────────────────────────────────────────┤│ Problem: 10M rows, indexes don't fit in memory │└─────────────────────────────────────────────────────────┘ SHARDED BY user_id:┌────────────────────┐ ┌────────────────────┐ ┌────────────────────┐│ Shard 1 │ │ Shard 2 │ │ Shard 3 ││ user_id % 3 = 0 │ │ user_id % 3 = 1 │ │ user_id % 3 = 2 │├────────────────────┤ ├────────────────────┤ ├────────────────────┤│ order_id │ user_id │ │ order_id │ user_id │ │ order_id │ user_id ││ 3 │ 1002 │ │ 1 │ 1001 │ │ 2 │ 1002 ││ ... │ ... │ │ 3 │ 1001 │ │ ... │ ... ││3.3M rows │ │ │3.3M rows │ │ │3.3M rows │ │└────────────────────┘ └────────────────────┘ └────────────────────┘ Each shard is a regular database with full functionalityAll user 1001's orders are on Shard 2 (1001 % 3 = 2)Shard Key Selection: The Most Critical Decision
The shard key determines how data is distributed. Poor choices lead to hot spots, expensive cross-shard queries, and eventual re-sharding pain.
| Shard Key | Pros | Cons | Good For |
|---|---|---|---|
| User ID | All user data colocated; user queries are single-shard | Power users create hot shards; cross-user queries expensive | User-centric apps (social, e-commerce) |
| Tenant ID | Perfect tenant isolation; natural for multi-tenant | Large tenants may need their own shard | SaaS applications |
| Timestamp | Range queries efficient; natural for time-series | Recent shard becomes hot spot; old shards underutilized | Time-series data (with range sharding) |
| Geographic region | Data locality; compliance requirements | Global users require cross-shard; uneven traffic by region | Geo-specific applications |
| Random/UUID | Perfect distribution; no hot spots | All queries are cross-shard; expensive | Rarely a good choice |
Sharding Strategies:
1. Hash Sharding
12345678910111213141516171819202122232425
// Simple modulo hash shardingfunction getShardId(userId: number, numShards: number): number { return userId % numShards;} // Problem: Adding shards requires reshuffling ALL data// 3 shards: user 7 → shard 1 (7 % 3 = 1)// 4 shards: user 7 → shard 3 (7 % 4 = 3) // Different shard! // Solution: Consistent hashing (or virtual shards)const VIRTUAL_SHARDS = 1024; // Create many virtual shards initially function getVirtualShardId(userId: number): number { return hash(userId) % VIRTUAL_SHARDS;} const shardMapping = new Map<number, string>();// Map virtual shards to physical shards// Virtual 0-255 → Physical Shard 1// Virtual 256-511 → Physical Shard 2// Virtual 512-767 → Physical Shard 3// Virtual 768-1023 → Physical Shard 4 // Adding Shard 5: Only move virtual shards 768-869// Each physical shard adds/removes ~20% data instead of ~50%2. Range Sharding
Range sharding: Data distributed by key ranges User ID ranges:Shard 1: users 1 - 1,000,000Shard 2: users 1,000,001 - 2,000,000Shard 3: users 2,000,001 - 3,000,000 Pros:- Range queries span few shards- Natural for sequential IDs- Easy to understand and debug Cons:- New users always hit latest shard (hot spot)- Uneven distribution as shards age- Manual rebalancing requiredThe biggest sharding challenge is operations that span shards. JOINs across shards require fetching data from multiple databases and merging in application code. Transactions across shards require distributed transaction protocols (2PC) which are slow and complex. Design your shard key to minimize cross-shard operations.
There are several approaches to implementing sharding in your application:
Pattern 1: Application-Level Sharding
The application determines which shard to query based on the shard key.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
// Application-level sharding logicclass ShardRouter { private shardConnections: Map<number, DatabaseConnection>; private shardCount: number; constructor(shardConfigs: ShardConfig[]) { this.shardConnections = new Map(); shardConfigs.forEach((config, index) => { this.shardConnections.set(index, new DatabaseConnection(config)); }); this.shardCount = shardConfigs.length; } // Determine shard from user ID private getShardId(userId: number): number { return Math.abs(hash(userId)) % this.shardCount; } // Get connection for a specific user getConnectionForUser(userId: number): DatabaseConnection { const shardId = this.getShardId(userId); return this.shardConnections.get(shardId)!; } // Execute query on correct shard async getUserOrders(userId: number): Promise<Order[]> { const connection = this.getConnectionForUser(userId); return connection.query( 'SELECT * FROM orders WHERE user_id = ?', [userId] ); } // Cross-shard query: get orders for multiple users async getOrdersForUsers(userIds: number[]): Promise<Order[]> { // Group users by shard const usersByShard = new Map<number, number[]>(); for (const userId of userIds) { const shardId = this.getShardId(userId); if (!usersByShard.has(shardId)) { usersByShard.set(shardId, []); } usersByShard.get(shardId)!.push(userId); } // Query each shard in parallel const promises = Array.from(usersByShard.entries()).map( ([shardId, shardUserIds]) => { const connection = this.shardConnections.get(shardId)!; return connection.query( 'SELECT * FROM orders WHERE user_id IN (?)', [shardUserIds] ); } ); const results = await Promise.all(promises); return results.flat(); // Merge results from all shards }}Pattern 2: Proxy-Level Sharding
A sharding proxy sits between the application and databases, routing queries transparently.
┌─────────────────────────────────────────────────────────┐│ Application ││ Writes normal SQL queries ││ Unaware of sharding │└────────────────────────────┬────────────────────────────┘ │ Standard SQL ▼ ┌────────────────┐ │ Sharding Proxy │ │ │ │ - Parses SQL │ │ - Extracts │ │ shard key │ │ - Routes query │ │ - Merges │ │ results │ └───────┬────────┘ ┌─────────────────┼─────────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Shard 1 │ │ Shard 2 │ │ Shard N │ └─────────┘ └─────────┘ └─────────┘ Examples:- Vitess (MySQL): Used by YouTube, Slack, HubSpot- Citus (PostgreSQL): Distributed PostgreSQL- ProxySQL: MySQL proxy with sharding supportPattern 3: Database-Native Sharding
Some databases have built-in sharding capabilities.
| Database | Solution | Approach |
|---|---|---|
| MongoDB | Native sharding | Auto-sharding with chunk balancing |
| PostgreSQL | Citus extension | Distributed tables with automatic routing |
| MySQL | Vitess | Sharding middleware (proxy) |
| Cassandra | Native | Partitioning by primary key, automatic |
| CockroachDB | Native | Transparent distribution with SQL interface |
| TiDB | Native | MySQL-compatible distributed SQL |
NewSQL databases (CockroachDB, TiDB, Spanner) promise SQL interfaces with automatic horizontal scaling. They handle sharding, replication, and distributed transactions internally. For new projects, these are increasingly attractive alternatives to manual sharding.
Sharding introduces significant operational and application complexity. Understanding these challenges upfront is essential for successful implementation:
Challenge Deep Dive: Distributed IDs
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
// Problem: Auto-increment IDs conflict across shards// Shard 1: order_id 1, 2, 3...// Shard 2: order_id 1, 2, 3... // Collision! // Solution 1: UUIDfunction generateUUID(): string { return crypto.randomUUID(); // 550e8400-e29b-41d4-a716-446655440000}// Pros: No coordination, globally unique// Cons: 128 bits (vs 64 for long), not sortable // Solution 2: Snowflake ID (Twitter's approach)// 64-bit ID: timestamp + datacenter + worker + sequenceclass SnowflakeIdGenerator { private static EPOCH = 1288834974657n; // Custom epoch private sequence = 0n; private lastTimestamp = -1n; constructor( private datacenterId: bigint, // 5 bits private workerId: bigint // 5 bits ) {} nextId(): bigint { let timestamp = BigInt(Date.now()) - SnowflakeIdGenerator.EPOCH; if (timestamp === this.lastTimestamp) { this.sequence = (this.sequence + 1n) & 4095n; // 12 bits if (this.sequence === 0n) { // Wait for next millisecond while (timestamp <= this.lastTimestamp) { timestamp = BigInt(Date.now()) - SnowflakeIdGenerator.EPOCH; } } } else { this.sequence = 0n; } this.lastTimestamp = timestamp; // Combine: timestamp (41 bits) | datacenter (5) | worker (5) | sequence (12) return (timestamp << 22n) | (this.datacenterId << 17n) | (this.workerId << 12n) | this.sequence; }}// Pros: Sortable by time, 64-bit, no coordination// Cons: Requires unique worker IDs // Solution 3: Central ID Service (with batching)class IdService { private cache: Map<string, number[]> = new Map(); private batchSize = 1000; // Fetch 1000 IDs at once async getNextId(table: string): Promise<number> { let batch = this.cache.get(table); if (!batch || batch.length === 0) { // Fetch new batch from central service batch = await this.fetchBatch(table, this.batchSize); this.cache.set(table, batch); } return batch.shift()!; } private async fetchBatch(table: string, count: number): Promise<number[]> { // Central database maintains counters // UPDATE id_counters SET value = value + 1000 WHERE table_name = ? // Returns the range [old_value, old_value + 1000) }}// Pros: Sequential IDs, minimal coordination// Cons: Central service is SPOF, ID gaps on service restartSnowflake IDs are the most common choice for sharded systems—they provide time-sortability (useful for range queries), are compact (64 bits), and require no coordination. Twitter, Discord, and Instagram variants are battle-tested at massive scale.
Database scaling is the most complex aspect of distributed systems. Let's consolidate the key insights:
What's Next:
With database scaling patterns understood, we'll explore cache scaling patterns—how to build scalable caching layers that reduce database load, improve latency, and handle massive read volumes. Caching and databases work together as complementary scaling strategies.
You now understand the complete database scaling toolkit, from read replicas to horizontal sharding. You can evaluate when each pattern is appropriate and understand the trade-offs involved. This knowledge is essential for designing systems that can grow to millions of users.