Loading learning content...
When resource planning reveals that current infrastructure will be exhausted, the question becomes: how do we add capacity?
Database scaling is neither simple nor straightforward. Unlike stateless application servers that can be cloned trivially, databases carry the complexity of data consistency, query routing, and transaction semantics. The choice of scaling strategy has profound implications for application architecture, operational procedures, and long-term system evolution.
This page explores the spectrum of database scaling strategies—from the simplest vertical upgrades to complex distributed architectures. Each approach offers distinct trade-offs between capability, complexity, and cost. Understanding these trade-offs enables informed decisions that balance immediate needs with long-term flexibility.
By the end of this page, you will understand vertical and horizontal scaling paradigms, read replica architectures, sharding strategies, and connection pooling techniques. You'll learn when each approach is appropriate, how to implement them, and the operational implications of each choice.
Vertical scaling means adding more resources to a single server—more CPU cores, more RAM, faster storage, higher network bandwidth. It's the simplest scaling approach because it requires no application changes and maintains the single-server operational model.
Advantages of vertical scaling:
Limitations of vertical scaling:
| Resource | Upgrade Options | Typical Impact | Implementation Complexity |
|---|---|---|---|
| CPU | More cores, higher frequency | Linear throughput for parallel workloads | Usually requires restart |
| Memory | Add RAM modules, larger instance | Cache hit improvement, reduced I/O | May be hot-addable in some systems |
| Storage | Faster drives (HDD→SSD→NVMe) | 10-100x IOPS improvement | May require data migration |
| Storage Capacity | Add drives, expand volumes | Extends growth runway | Often online, RAID-dependent |
| Network | Faster NICs, bonding | Higher client and replication throughput | May require reconfiguration |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
-- Assess vertical scaling potential and constraints -- Current resource utilization vs. server capacityWITH current_resources AS ( SELECT (SELECT setting::int FROM pg_settings WHERE name = 'max_connections') AS configured_max_connections, (SELECT COUNT(*) FROM pg_stat_activity) AS current_connections, (SELECT setting FROM pg_settings WHERE name = 'shared_buffers') AS configured_shared_buffers, (SELECT pg_size_pretty(pg_database_size(current_database()))) AS database_size, (SELECT COUNT(*) FROM pg_stat_activity WHERE state = 'active') AS active_queries), -- Hardware capacity (would be gathered from OS, shown as example values)hardware_specs AS ( SELECT 64 AS total_ram_gb, -- Current server RAM 16 AS total_cpu_cores, -- Current CPU cores 2000 AS total_storage_gb, -- Current storage capacity 50000 AS max_iops -- Current storage IOPS capability), -- Upgrade options and costs (example based on cloud pricing patterns)upgrade_tiers AS ( SELECT tier_name, ram_gb, cpu_cores, monthly_cost, storage_iops, ram_gb * 1.0 / 64 AS memory_multiplier, -- Relative to current cpu_cores * 1.0 / 16 AS cpu_multiplier FROM (VALUES ('Current', 64, 16, 800, 50000), ('Medium Upgrade', 128, 32, 1600, 80000), ('Large Upgrade', 256, 64, 3500, 100000), ('Maximum Available', 512, 96, 8000, 150000) ) AS tiers(tier_name, ram_gb, cpu_cores, monthly_cost, storage_iops)) SELECT u.tier_name, u.ram_gb || ' GB' AS memory, u.cpu_cores || ' cores' AS cpu, u.storage_iops AS iops, '$' || u.monthly_cost AS monthly_cost, ROUND(u.memory_multiplier, 1) || 'x' AS memory_increase, ROUND(u.cpu_multiplier, 1) || 'x' AS cpu_increase, ROUND(u.monthly_cost::numeric / 800, 1) || 'x' AS cost_increase, CASE WHEN u.tier_name = 'Maximum Available' THEN 'No further vertical scaling possible' ELSE '' END AS notesFROM upgrade_tiers uORDER BY u.monthly_cost; -- Estimate headroom at each tierSELECT u.tier_name, -- Memory headroom: how much larger could working set be? ROUND(u.ram_gb * 0.75 / 50.0, 1) AS estimated_hot_data_capacity_gb, -- CPU headroom: estimated QPS capacity ROUND(u.cpu_cores * 500.0, 0) AS estimated_simple_qps_capacity, -- Growth months at current trajectory (assuming 10% monthly growth) CASE WHEN u.tier_name = 'Current' THEN 0 ELSE ROUND(LN(u.memory_multiplier) / LN(1.10), 1) END AS months_of_growth_at_10pct_monthlyFROM upgrade_tiers u;Vertical scaling is ideal when you're not near hardware limits, when simplicity is paramount, when workload doesn't exceed single-server capacity, and when you have budget for premium hardware. Start vertical—add horizontal complexity only when necessary.
Horizontal scaling means distributing workload across multiple database servers. This overcomes single-server hardware limits but introduces significant complexity around data distribution, consistency, and query routing.
Horizontal scaling paradigms:
Complexity implications:
Horizontal scaling trades operational simplicity for capacity. Each approach introduces new failure modes, consistency considerations, and operational procedures:
| Approach | Write Scaling | Read Scaling | Consistency | Operational Complexity |
|---|---|---|---|---|
| Read Replicas | None (single primary) | Linear with replicas | Eventual (lag) | Low-Medium |
| Multi-Primary | Limited (conflict resolution) | Good | Complex (conflicts) | High |
| Sharding | Linear with shards | Linear with shards | Strong per-shard | Very High |
| Caching Layers | None | Excellent (cache hits) | Eventual (TTL) | Medium |
| Database Federation | Distributed | Distributed | Per-database | Medium-High |
Every horizontal scaling approach pays a 'distributed systems tax'—network latency between nodes, partial failure handling, consistency protocol overhead, and operational complexity. Only scale horizontally when vertical limits are reached or when specific requirements (read scaling, geographic distribution) demand it.
Read replicas are the most common first step in horizontal scaling. A primary server handles all writes while replica servers maintain synchronized copies for read queries. This approach is powerful for read-heavy workloads, which describe most applications.
Read replica topology:
Replication mechanisms:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
-- PostgreSQL Read Replica Setup and Monitoring -- On Primary: Configure for replication-- postgresql.conf settings:/*wal_level = replicamax_wal_senders = 10max_replication_slots = 10synchronous_standby_names = '' -- Empty for async, 'replica1,replica2' for sync*/ -- Create replication slot for each replicaSELECT pg_create_physical_replication_slot('replica_1_slot');SELECT pg_create_physical_replication_slot('replica_2_slot'); -- Create replication userCREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password'; -- Monitor replication statusSELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, -- Lag in bytes pg_wal_lsn_diff(sent_lsn, replay_lsn) AS replication_lag_bytes, -- Approximate lag in seconds (if known) EXTRACT(EPOCH FROM (NOW() - backend_start)) AS connection_age_seconds, sync_stateFROM pg_stat_replication; -- Check replication slotsSELECT slot_name, slot_type, active, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal_sizeFROM pg_replication_slots; -- On Replica: Check replay statusSELECT pg_is_in_recovery() AS is_replica, pg_last_wal_receive_lsn() AS last_received, pg_last_wal_replay_lsn() AS last_replayed, pg_last_xact_replay_timestamp() AS last_replay_time, NOW() - pg_last_xact_replay_timestamp() AS replay_lag; -- Application-level read/write splitting logic/*Connection routing pseudocode: function getConnection(queryType): if queryType == 'WRITE' or requiresStrongConsistency: return primaryConnection elif requiresReadAfterWrite: # Route to primary or wait for replica to catch up return primaryConnection # Safest approach else: return loadBalancer.getReadReplica()*/ -- Query to identify read vs write query ratio (for replica sizing)SELECT CASE WHEN query ~* '^(SELECT|WITH .* SELECT)' THEN 'READ' WHEN query ~* '^(INSERT|UPDATE|DELETE|MERGE)' THEN 'WRITE' ELSE 'OTHER' END AS query_type, COUNT(*) AS query_count, ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 1) AS percentageFROM pg_stat_statementsGROUP BY 1ORDER BY 2 DESC;Replication lag means replicas may not have the latest data. For read-after-write scenarios, either route reads to the primary, implement session-based replica affinity, or use a lag-aware connection pool that avoids replicas with excessive lag.
Sharding partitions data across multiple database servers, with each shard owning a subset of the data. Unlike replicas (which hold copies), shards together hold the complete dataset. Sharding enables both read and write scaling but significantly increases architectural complexity.
Shard key selection:
The shard key determines how data is distributed. Choosing the right key is the most critical sharding decision—it affects query efficiency, data distribution, and future flexibility.
| Criterion | Description | Example |
|---|---|---|
| High Cardinality | Many distinct values to enable even distribution | user_id (millions of values) vs. country (hundreds) |
| Query Affinity | Key appears in most query WHERE clauses | tenant_id for multi-tenant apps; user_id for user-centric apps |
| Even Distribution | Values spread workload equally across shards | UUID distributes well; timestamps create hotspots |
| Immutability | Key value doesn't change over entity lifetime | user_id rarely changes; status changes frequently |
| Growth Stability | Distribution remains balanced as data grows | Sequential IDs create hotspots on newest shard |
Sharding schemes:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
"""Database Sharding Implementation Patterns Demonstrates shard routing logic and consistent hashingfor distributed database architectures.""" from typing import Dict, List, Optional, Tupleimport hashlibfrom dataclasses import dataclassfrom bisect import bisect_right @dataclassclass ShardConfig: """Configuration for a single shard""" shard_id: str host: str port: int weight: int = 1 # For weighted distribution class HashShardRouter: """ Simple hash-based shard routing. Routes keys to shards using modulo hashing. """ def __init__(self, shards: List[ShardConfig]): self.shards = sorted(shards, key=lambda s: s.shard_id) self.shard_count = len(shards) def get_shard(self, shard_key: str) -> ShardConfig: """Get shard for a given key using hash distribution""" key_hash = int(hashlib.md5(str(shard_key).encode()).hexdigest(), 16) shard_index = key_hash % self.shard_count return self.shards[shard_index] def get_all_shards(self) -> List[ShardConfig]: """For queries that must fan out to all shards""" return self.shards class ConsistentHashRouter: """ Consistent hashing for shard routing. Minimizes data movement when adding/removing shards. """ def __init__(self, shards: List[ShardConfig], virtual_nodes: int = 150): self.shards = {s.shard_id: s for s in shards} self.virtual_nodes = virtual_nodes self.ring: List[Tuple[int, str]] = [] for shard in shards: for i in range(virtual_nodes * shard.weight): key = f"{shard.shard_id}:{i}" hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16) self.ring.append((hash_val, shard.shard_id)) self.ring.sort(key=lambda x: x[0]) self.ring_keys = [r[0] for r in self.ring] def get_shard(self, shard_key: str) -> ShardConfig: """Get shard using consistent hashing""" key_hash = int(hashlib.md5(str(shard_key).encode()).hexdigest(), 16) # Find first node with hash >= key_hash idx = bisect_right(self.ring_keys, key_hash) if idx >= len(self.ring): idx = 0 # Wrap around shard_id = self.ring[idx][1] return self.shards[shard_id] def add_shard(self, shard: ShardConfig): """Add a new shard with minimal disruption""" self.shards[shard.shard_id] = shard for i in range(self.virtual_nodes * shard.weight): key = f"{shard.shard_id}:{i}" hash_val = int(hashlib.md5(key.encode()).hexdigest(), 16) self.ring.append((hash_val, shard.shard_id)) self.ring.sort(key=lambda x: x[0]) self.ring_keys = [r[0] for r in self.ring] class ShardedQueryExecutor: """ Executes queries across sharded databases. Handles both single-shard and scatter-gather patterns. """ def __init__(self, router: ConsistentHashRouter): self.router = router self.connections: Dict[str, 'Connection'] = {} def execute_single_shard(self, shard_key: str, query: str, params: tuple): """Execute query on the shard that owns the key""" shard = self.router.get_shard(shard_key) conn = self._get_connection(shard) return conn.execute(query, params) def execute_all_shards(self, query: str, params: tuple) -> List: """ Execute query on all shards and aggregate results. Used for queries without shard key in WHERE clause. WARNING: This is expensive! Avoid in production for frequent queries. """ results = [] for shard in self.router.shards.values(): conn = self._get_connection(shard) shard_results = conn.execute(query, params) results.extend(shard_results) return results def execute_scatter_gather(self, query: str, params: tuple, aggregation: str = 'UNION') -> List: """ Scatter query to all shards, gather and aggregate results. Supports UNION, SUM, COUNT, MAX, MIN aggregations. """ shard_results = self.execute_all_shards(query, params) if aggregation == 'UNION': return shard_results elif aggregation == 'COUNT': return [{'count': sum(r.get('count', 0) for r in shard_results)}] elif aggregation == 'SUM': return [{'sum': sum(r.get('sum', 0) for r in shard_results)}] elif aggregation == 'MAX': return [{'max': max(r.get('max', 0) for r in shard_results)}] elif aggregation == 'MIN': return [{'min': min(r.get('min', 0) for r in shard_results)}] return shard_results def _get_connection(self, shard: ShardConfig) -> 'Connection': """Get or create connection to shard""" if shard.shard_id not in self.connections: # In real implementation, create actual DB connection self.connections[shard.shard_id] = MockConnection(shard) return self.connections[shard.shard_id] # Example usagedef demonstrate_sharding(): shards = [ ShardConfig('shard-1', 'db1.example.com', 5432), ShardConfig('shard-2', 'db2.example.com', 5432), ShardConfig('shard-3', 'db3.example.com', 5432), ShardConfig('shard-4', 'db4.example.com', 5432), ] router = ConsistentHashRouter(shards) # Test key distribution distribution = {s.shard_id: 0 for s in shards} for i in range(10000): shard = router.get_shard(f"user_{i}") distribution[shard.shard_id] += 1 print("Key distribution across shards:") for shard_id, count in distribution.items(): print(f" {shard_id}: {count} keys ({count/100:.1f}%)") # Simulate adding a shard - consistent hashing moves minimal keys router.add_shard(ShardConfig('shard-5', 'db5.example.com', 5432)) new_distribution = {s: 0 for s in router.shards} for i in range(10000): shard = router.get_shard(f"user_{i}") new_distribution[shard.shard_id] += 1 print("\nAfter adding shard-5:") for shard_id, count in new_distribution.items(): print(f" {shard_id}: {count} keys ({count/100:.1f}%)")Queries without shard key in WHERE clause must scatter to all shards and gather results. This is O(n) in shard count and adds network latency. Design schemas and queries to minimize cross-shard operations. Consider denormalization or maintaining lookup tables.
Database connections are expensive resources. Each connection consumes memory on the server (5-50MB typically) and has establishment overhead (TCP handshake, authentication, session setup). Connection pooling and intelligent load balancing are essential for efficient scaling.
Connection pooling benefits:
Pooling architectures:
| Approach | Description | Pros | Cons |
|---|---|---|---|
| Application-level Pool | Pool embedded in each app server | Simplest; no additional component | Pool per app = total connections multiply |
| Proxy Pool (PgBouncer, ProxySQL) | Centralized proxy handles pooling | Fewer total connections; transparent to app | Additional component; single point of failure |
| Sidecar Pool | Pool proxy co-located with each app pod | Kubernetes-native; no network hop | More complex deployment |
| Database-native Pool | Built into database (connection broker) | No external components | Database-specific; may not exist |
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
; PgBouncer Configuration for Production; Centralized connection pooling for PostgreSQL [databases]; Database connection definitions; Format: dbname = host=hostname port=port dbname=actualdb auth_user=user production = host=primary.db.internal port=5432 dbname=app_productionanalytics = host=replica1.db.internal port=5432 dbname=app_productionreadonly = host=replica-pool.db.internal port=5432 dbname=app_production pool_mode=statement [pgbouncer]; Pooling mode: ; session - Connection assigned per session (safest, least efficient); transaction - Connection assigned per transaction (balanced); statement - Connection assigned per statement (most efficient, most restrictions)pool_mode = transaction ; Pool size limitsdefault_pool_size = 20 ; Connections per database user pairmin_pool_size = 5 ; Minimum kept openreserve_pool_size = 5 ; Emergency connections when pool exhaustedreserve_pool_timeout = 3 ; Seconds before using reserve pool ; Connection limitsmax_client_conn = 1000 ; Total client connections allowedmax_db_connections = 100 ; Maximum connections to actual database ; Connection behaviorserver_reset_query = DISCARD ALLserver_reset_query_always = 0server_check_query = SELECT 1server_check_delay = 30 ; Query behaviorquery_timeout = 120 ; Kill queries running longer than 120squery_wait_timeout = 30 ; Max time to wait for available connectionclient_idle_timeout = 600 ; Disconnect idle clients after 10 minutes ; Authenticationauth_type = scram-sha-256auth_file = /etc/pgbouncer/userlist.txt ; Logginglog_connections = 1log_disconnections = 1log_pooler_errors = 1stats_period = 60 [users]; User-specific settingsadmin = pool_mode=session ; Admin connections not pooledreadonly_user = pool_mode=statement ; Aggressive pooling for readonly ; Monitoring query for pool statistics; SELECT * FROM pgbouncer.pools;; SELECT * FROM pgbouncer.stats;; SELECT * FROM pgbouncer.clients;; SELECT * FROM pgbouncer.servers;1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
-- PgBouncer pool monitoring queries (connect to pgbouncer admin) -- Pool status overviewSELECT database, pool_mode, user, cl_active AS client_active, cl_waiting AS client_waiting, sv_active AS server_active, sv_idle AS server_idle, sv_used AS server_used, maxwait AS max_wait_seconds, pool_sizeFROM pgbouncer.pools; -- Pool efficiency metricsSELECT database, total_xact_count AS transactions, total_query_count AS queries, total_received AS bytes_received, total_sent AS bytes_sent, total_xact_time / 1000000.0 AS total_transaction_time_sec, total_query_time / 1000000.0 AS total_query_time_sec, ROUND(total_query_time::numeric / NULLIF(total_query_count, 0) / 1000, 2) AS avg_query_ms, avg_xact_count AS xact_per_second, avg_query_count AS qpsFROM pgbouncer.stats; -- Client connection distributionSELECT addr AS client_address, COUNT(*) AS connection_count, STRING_AGG(DISTINCT database, ', ') AS databases_used, STRING_AGG(DISTINCT state, ', ') AS connection_statesFROM pgbouncer.clientsGROUP BY addrORDER BY connection_count DESCLIMIT 20; -- Pool sizing validationWITH pool_data AS ( SELECT database, user, cl_active + cl_waiting AS total_clients, sv_active + sv_idle + sv_used AS total_servers, pool_size AS max_pool_size FROM pgbouncer.pools)SELECT database, user, total_clients, total_servers, max_pool_size, ROUND(100.0 * total_servers / NULLIF(max_pool_size, 0), 1) AS pool_utilization_pct, CASE WHEN (100.0 * total_servers / NULLIF(max_pool_size, 0)) > 90 THEN 'CRITICAL - Near pool limit' WHEN (100.0 * total_servers / NULLIF(max_pool_size, 0)) > 70 THEN 'WARNING - High utilization' ELSE 'OK' END AS statusFROM pool_data;Use 'transaction' pool mode for most applications—it balances efficiency with compatibility. Use 'statement' mode only for stateless read-only workloads. Avoid 'session' mode unless you need session-level state like LISTEN/NOTIFY or prepared statements that can't be recreated.
Selecting the appropriate scaling strategy depends on workload characteristics, operational capacity, and future growth trajectory. A decision framework helps navigate these choices systematically.
| Characteristic | Vertical | Read Replicas | Sharding |
|---|---|---|---|
| Read/Write Ratio | Any | Read-heavy (>80% reads) | Any |
| Data Size | <10TB typically | Any | 1TB typically |
| Write Volume | Any | Limited by primary | High writes |
| Query Complexity | Any | Cross-replica aggregation possible | Simple per-shard preferred |
| Operational Maturity | Any | Medium+ | Advanced |
| Consistency Requirements | Strong | Eventual acceptable | Strong per-shard |
| Geographic Distribution | Single location | Multi-region reads | Multi-region all ops |
| Cost Sensitivity | Premium hardware budget | Medium | Higher total cost |
Decision flowchart:
Begin with vertical scaling. Add read replicas when read load exceeds primary capacity. Consider sharding only when write throughput is the bottleneck and replicas don't help. Each layer of complexity should be justified by clear capacity requirements.
Moving between scaling strategies requires careful planning. Migrations involve downtime risk, data consistency concerns, and application changes. Understanding migration paths and techniques minimizes disruption.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
# Production Sharding Migration Checklist ## Phase 1: Assessment (Weeks 1-2)- [ ] Identify shard key candidates- [ ] Analyze query patterns for cross-shard impact- [ ] Inventory all tables - determine sharding strategy per table- [ ] Identify lookup/reference tables (replicated to all shards)- [ ] Document foreign key relationships (most will break)- [ ] Estimate total data movement volume and timeline- [ ] Capacity plan: how many shards needed at launch? In 2 years? ## Phase 2: Application Preparation (Weeks 3-6)- [ ] Abstract database layer to support multi-shard- [ ] Implement shard router in application- [ ] Add shard key to all write operations- [ ] Modify reads to include shard key where possible- [ ] Identify and rewrite cross-shard queries- [ ] Implement scatter-gather for unavoidable cross-shard ops- [ ] Add monitoring for shard distribution and cross-shard queries- [ ] Load test with sharded configuration in staging ## Phase 3: Infrastructure Preparation (Weeks 5-7) - [ ] Provision shard database servers- [ ] Configure networking between app and all shards- [ ] Set up connection pooling for multi-shard- [ ] Configure monitoring and alerting per shard- [ ] Prepare backup and recovery procedures per shard- [ ] Document runbooks for common shard operations ## Phase 4: Data Migration (Weeks 7-9)- [ ] Create empty schema on all shards- [ ] Migrate reference/lookup tables (full copy to each shard)- [ ] Begin dual-write phase: writes go to old DB and new shards- [ ] Backfill historical data to shards (can be slow)- [ ] Verify row counts and checksums between old and new- [ ] Run validation queries comparing old DB to shard aggregate ## Phase 5: Cutover (Week 10)- [ ] Schedule maintenance window if needed- [ ] Verify all shards are caught up- [ ] Switch application reads to shards- [ ] Monitor for errors and latency- [ ] Gradual traffic shift (if possible) vs. full cutover- [ ] Disable writes to old database- [ ] Final validation- [ ] Declare migration complete ## Phase 6: Cleanup (Week 11+)- [ ] Keep old database in read-only mode (safety net)- [ ] Remove dual-write code paths after stabilization- [ ] Archive and eventually decommission old database- [ ] Document new architecture- [ ] Update disaster recovery proceduresMoving to a sharded architecture is extremely difficult to reverse. The application changes, operational expertise, and data distribution become deeply embedded. Consider sharding a major commitment—invest heavily in getting it right initially.
Database scaling strategies exist on a spectrum from simple (vertical) to complex (sharding). The right choice depends on current constraints, growth trajectory, and operational capabilities. Starting simple and adding complexity only when necessary minimizes operational burden while maintaining flexibility.
What's next:
With scaling strategies understood, the next consideration is cost. The next page covers Cost Optimization—techniques for minimizing database infrastructure expense while maintaining performance and reliability.
You now understand the spectrum of database scaling strategies and their trade-offs. This knowledge enables informed decisions about when and how to add capacity, balancing capability against complexity. Next, we'll explore optimizing the cost of database infrastructure.