Loading learning content...
When your data outgrows a single database—when vertical scaling is exhausted, read replicas can't keep up with writes, and functional partitioning doesn't help because a single table has billions of rows—you arrive at the final frontier of SQL database scaling: sharding.
Sharding horizontally partitions data across multiple database instances. Instead of all users in one database, you distribute them: users 1-1M on shard 1, 1M-2M on shard 2, and so on. Each shard is a complete, independent database containing a subset of the total data.
Application-level sharding means the application—not the database—is responsible for routing queries to the correct shard. This gives you maximum control but requires significant engineering investment.
Sharding is the most powerful scaling strategy but also the most complex. Before implementing sharding, exhaust every other option: query optimization, indexing, read replicas, caching, and functional partitioning. Many teams shard prematurely and suffer years of unnecessary complexity.
By the end of this page, you will understand the mechanics of application-level sharding, how to choose and implement shard keys, routing strategies (hash-based, range-based, directory-based), handling cross-shard queries and transactions, and the operational challenges of resharding.
A sharded database system consists of several components working together:
Shards: Independent database instances, each holding a partition of the data. Shards may have their own replicas for read scaling and high availability.
Shard Key: The column (or columns) used to determine which shard a row belongs to. For a users table, this might be user_id. All rows with the same shard key value reside on the same shard.
Shard Map/Directory: A mapping from shard key values (or ranges) to shard locations. This might be a configuration file, a separate metadata database, or an in-memory structure.
Router/Proxy: Logic that intercepts queries, extracts the shard key, consults the shard map, and routes the query to the appropriate shard. In application-level sharding, this logic lives in your application code.
12345678910111213141516171819202122232425262728293031323334353637383940
Application-Level Sharding Architecture═══════════════════════════════════════════════════════════════ ┌─────────────────────────────────────────────────────────────┐│ APPLICATION ││ ││ ┌─────────────────────────────────────────────────────┐ ││ │ SHARD ROUTER │ ││ │ │ ││ │ 1. Extract shard key from query/context │ ││ │ 2. Compute shard ID: hash(user_id) % num_shards │ ││ │ 3. Look up shard connection from shard map │ ││ │ 4. Execute query on target shard │ ││ │ │ ││ │ ┌─────────────────────────────────────────────┐ │ ││ │ │ SHARD MAP / DIRECTORY │ │ ││ │ │ │ │ ││ │ │ Shard 0: postgresql://shard0.db:5432 │ │ ││ │ │ Shard 1: postgresql://shard1.db:5432 │ │ ││ │ │ Shard 2: postgresql://shard2.db:5432 │ │ ││ │ │ Shard 3: postgresql://shard3.db:5432 │ │ ││ │ └─────────────────────────────────────────────┘ │ ││ └─────────────────────────────────────────────────────┘ │└───────────────────────┬───────────────┬───────────────┬─────┘ │ │ │ ┌─────────▼─────────┐ │ ┌─────────▼─────────┐ │ SHARD 0 │ │ │ SHARD 3 │ │ (users 0-999K) │ │ │ (users 3M+) │ │ ┌──────────────┐ │ │ │ ┌──────────────┐ │ │ │ users │ │ │ │ │ users │ │ │ │ orders │ │ │ │ │ orders │ │ │ │ profiles │ │ │ │ │ profiles │ │ │ └──────────────┘ │ │ │ └──────────────┘ │ │ + replicas │ │ │ + replicas │ └───────────────────┘ │ └───────────────────┘ │ ┌─────────▼─────────┐ │ SHARDS 1 & 2 │ │ (similar setup) │ └───────────────────┘Not every table needs sharding. In a typical e-commerce system:
Sharded tables (large, tied to shard key):
users — Sharded by user_idorders — Sharded by user_id (colocated with users)user_preferences — Sharded by user_idGlobal tables (small, read-mostly, needed everywhere):
countries — Replicated to every shardproduct_categories — Replicated to every shardconfig_settings — Replicated or stored centrallyCentralized tables (not sharded, stored separately):
products — In a separate product database, not sharded by useraudit_logs — Centralized for complianceThe decision of what to shard depends on your access patterns and data relationships.
The choice of shard key is the most critical decision in sharding. A poor shard key creates hot spots, makes common queries inefficient, and is extremely expensive to change later.
High Cardinality: The key should have many distinct values. Sharding by country (200 values) creates large, uneven shards. Sharding by user_id (millions of values) distributes data evenly.
Even Distribution: Values should distribute evenly across the key space. If 50% of users are in shard 0, that shard becomes a bottleneck.
Query Alignment: Most queries should include the shard key. If you shard by user_id but frequently query by email, every email lookup is a scatter-gather across all shards.
Colocation Support: Related data should share a shard key. User orders should be on the same shard as the user, so queries joining users and orders don't cross shards.
| Candidate Key | Cardinality | Distribution | Query Alignment | Verdict |
|---|---|---|---|---|
| user_id | High (millions) | Even (if using hash) | Good for user-centric apps | ✅ Excellent choice |
| tenant_id | Medium | May be uneven (large tenants) | Good for multi-tenant SaaS | ⚠️ Works if tenants balanced |
| created_date | High | Even | Poor (queries rarely filter by date alone) | ❌ Bad—causes hot spots on current date |
| country_code | Low (<200) | Very uneven | Moderate | ❌ Bad—shards too uneven |
| order_id | High | Even | Poor (usually query by user, not order) | ❌ Bad—separates user's orders |
| (user_id, tenant_id) | Very high | Even | Good for multi-tenant | ✅ Composite key can work |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
-- Analyze potential shard key distribution -- 1. Cardinality analysisSELECT COUNT(DISTINCT user_id) AS distinct_users, COUNT(DISTINCT tenant_id) AS distinct_tenants, COUNT(DISTINCT DATE(created_at)) AS distinct_dates, COUNT(*) AS total_rowsFROM orders; -- 2. Distribution analysis for user_id (simulating hash distribution)-- Check if hash-based sharding would be evenWITH shard_distribution AS ( SELECT -- Simulate 8-shard distribution using modulo of hash abs(hashtext(user_id::text)) % 8 AS simulated_shard, COUNT(*) AS row_count FROM orders GROUP BY abs(hashtext(user_id::text)) % 8)SELECT simulated_shard, row_count, round(100.0 * row_count / SUM(row_count) OVER (), 2) AS percentageFROM shard_distributionORDER BY simulated_shard; -- 3. Hot spot detection: Top users by row count-- If few users dominate, sharding by user_id still creates hot shardsSELECT user_id, COUNT(*) AS order_count, round(100.0 * COUNT(*) / (SELECT COUNT(*) FROM orders), 2) AS pct_of_totalFROM ordersGROUP BY user_idORDER BY order_count DESCLIMIT 20; -- 4. Query pattern analysis: What columns appear in WHERE clauses?-- This requires pg_stat_statements or application-level query loggingSELECT query, calls, total_time, -- Look for patterns in WHERE clauses CASE WHEN query ILIKE '%WHERE%user_id%' THEN 'user_id' WHEN query ILIKE '%WHERE%tenant_id%' THEN 'tenant_id' WHEN query ILIKE '%WHERE%order_id%' THEN 'order_id' ELSE 'other/none' END AS likely_shard_keyFROM pg_stat_statementsWHERE query ILIKE '%orders%'ORDER BY calls DESCLIMIT 50;Choose a shard key that appears in 90%+ of your queries. If most queries include user_id, shard by user_id. If most include tenant_id, shard by tenant_id. The goal is single-shard queries wherever possible.
Given a shard key, how do you map key values to shards? Three primary strategies exist, each with distinct trade-offs.
Compute a hash of the shard key and use modulo to select a shard:
shard_id = hash(shard_key) % number_of_shards
Pros:
Cons:
Divide the key space into ranges:
Shards:
Shard 0: user_id 1 to 1,000,000
Shard 1: user_id 1,000,001 to 2,000,000
Shard 2: user_id 2,000,001 to 3,000,000
Pros:
Cons:
Maintain an explicit mapping from shard key to shard:
Directory Table:
user_id 12345 → Shard 2
user_id 67890 → Shard 5
Pros:
Cons:
| Strategy | Distribution | Resharding | Range Queries | Complexity |
|---|---|---|---|---|
| Hash-Based | Excellent | Hard (full rehash) | Poor (scatter-gather) | Low |
| Range-Based | Variable | Moderate (split ranges) | Good (single shard) | Medium |
| Directory-Based | Complete control | Easy (update mapping) | Depends on mapping | High |
| Consistent Hashing | Good | Minimized movement | Poor | Medium |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
/** * Shard routing implementations */ import { createHash } from 'crypto';import { Pool } from 'pg'; interface ShardConfig { id: number; connectionString: string; pool: Pool;} // Abstract router interfaceinterface ShardRouter { getShardForKey(key: string): ShardConfig; getAllShards(): ShardConfig[];} // Hash-based routerclass HashBasedRouter implements ShardRouter { constructor(private shards: ShardConfig[]) {} getShardForKey(key: string): ShardConfig { // Consistent hash using MD5 (fast, good distribution) const hash = createHash('md5') .update(key) .digest(); // Use first 4 bytes as unsigned integer const hashValue = hash.readUInt32BE(0); const shardIndex = hashValue % this.shards.length; return this.shards[shardIndex]; } getAllShards(): ShardConfig[] { return this.shards; }} // Range-based routerclass RangeBasedRouter implements ShardRouter { private ranges: Array<{ maxKey: number; shard: ShardConfig }>; constructor(ranges: Array<{ maxKey: number; shard: ShardConfig }>) { // Ranges must be sorted by maxKey this.ranges = ranges.sort((a, b) => a.maxKey - b.maxKey); } getShardForKey(key: string): ShardConfig { const keyValue = parseInt(key, 10); for (const range of this.ranges) { if (keyValue <= range.maxKey) { return range.shard; } } // Key exceeds all ranges—use last shard return this.ranges[this.ranges.length - 1].shard; } getAllShards(): ShardConfig[] { return this.ranges.map(r => r.shard); }} // Directory-based routerclass DirectoryBasedRouter implements ShardRouter { private defaultShard: ShardConfig; constructor( private shards: ShardConfig[], private directoryDb: Pool, defaultShardId: number ) { this.defaultShard = shards.find(s => s.id === defaultShardId)!; } async getShardForKey(key: string): Promise<ShardConfig> { // Look up in directory const result = await this.directoryDb.query( 'SELECT shard_id FROM shard_directory WHERE shard_key = $1', [key] ); if (result.rows.length > 0) { const shardId = result.rows[0].shard_id; return this.shards.find(s => s.id === shardId)!; } // Key not in directory—assign to default shard // (or compute based on hash and insert into directory) return this.defaultShard; } getAllShards(): ShardConfig[] { return this.shards; }} // Usage example: ShardedUserRepositoryclass ShardedUserRepository { constructor(private router: ShardRouter) {} async getUser(userId: string): Promise<User | null> { const shard = this.router.getShardForKey(userId); const result = await shard.pool.query( 'SELECT * FROM users WHERE id = $1', [userId] ); return result.rows[0] || null; } async createUser(user: CreateUserInput): Promise<User> { const shard = this.router.getShardForKey(user.id); const result = await shard.pool.query( 'INSERT INTO users (id, email, name) VALUES ($1, $2, $3) RETURNING *', [user.id, user.email, user.name] ); return result.rows[0]; } // Scatter-gather: Query all shards (expensive!) async searchUsers(query: string): Promise<User[]> { const allShards = this.router.getAllShards(); const results = await Promise.all( allShards.map(shard => shard.pool.query( 'SELECT * FROM users WHERE name ILIKE $1 LIMIT 100', [`%${query}%`] ) ) ); // Merge results from all shards return results.flatMap(r => r.rows); }}Simple hash-based sharding has a critical flaw: adding or removing a shard requires rehashing nearly all keys. Consistent hashing minimizes this data movement.
Imagine the hash space as a ring (0 to 2³² wrapping around to 0). Both shard identifiers and data keys are hashed onto this ring:
Minimal Redistribution: Adding a new shard only affects keys between the new shard and its clockwise neighbor. On average, only 1/N of keys move (where N is the number of shards).
Virtual Nodes: To improve distribution, each physical shard is represented by multiple "virtual nodes" on the ring. A shard with 100 virtual nodes appears at 100 ring positions, smoothing out distribution.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
/** * Consistent hashing implementation with virtual nodes */ import { createHash } from 'crypto'; interface ConsistentHashNode { nodeId: string; data: any;} class ConsistentHashRing<T> { private ring: Map<number, { nodeId: string; data: T }> = new Map(); private sortedHashes: number[] = []; private virtualNodeCount: number; constructor(virtualNodeCount = 150) { this.virtualNodeCount = virtualNodeCount; } private hash(key: string): number { const hash = createHash('md5').update(key).digest(); return hash.readUInt32BE(0); } addNode(nodeId: string, data: T): void { // Add virtual nodes for this physical node for (let i = 0; i < this.virtualNodeCount; i++) { const virtualKey = `${nodeId}:${i}`; const hashValue = this.hash(virtualKey); this.ring.set(hashValue, { nodeId, data }); } // Rebuild sorted hash list this.sortedHashes = Array.from(this.ring.keys()).sort((a, b) => a - b); } removeNode(nodeId: string): void { // Remove all virtual nodes for this physical node for (let i = 0; i < this.virtualNodeCount; i++) { const virtualKey = `${nodeId}:${i}`; const hashValue = this.hash(virtualKey); this.ring.delete(hashValue); } this.sortedHashes = Array.from(this.ring.keys()).sort((a, b) => a - b); } getNode(key: string): { nodeId: string; data: T } | null { if (this.sortedHashes.length === 0) { return null; } const keyHash = this.hash(key); // Binary search for first hash >= keyHash let low = 0; let high = this.sortedHashes.length; while (low < high) { const mid = Math.floor((low + high) / 2); if (this.sortedHashes[mid] < keyHash) { low = mid + 1; } else { high = mid; } } // Wrap around if necessary const selectedHash = low < this.sortedHashes.length ? this.sortedHashes[low] : this.sortedHashes[0]; return this.ring.get(selectedHash)!; } // Get nodes affected by adding a new node getAffectedRange(newNodeId: string): { from: number; to: number }[] { const ranges: { from: number; to: number }[] = []; for (let i = 0; i < this.virtualNodeCount; i++) { const virtualKey = `${newNodeId}:${i}`; const newHash = this.hash(virtualKey); // Find the next node clockwise (which will lose keys to new node) const nextNodeHash = this.sortedHashes.find(h => h > newHash) || this.sortedHashes[0]; // Find the previous node counter-clockwise (range start) const prevIndex = this.sortedHashes.findIndex(h => h >= newHash) - 1; const prevNodeHash = prevIndex >= 0 ? this.sortedHashes[prevIndex] : this.sortedHashes[this.sortedHashes.length - 1]; ranges.push({ from: prevNodeHash, to: newHash }); } return ranges; }} // Usage: Shard routing with consistent hashinginterface ShardInfo { host: string; port: number;} const shardRing = new ConsistentHashRing<ShardInfo>(200); // Add shardsshardRing.addNode('shard-1', { host: 'shard1.db', port: 5432 });shardRing.addNode('shard-2', { host: 'shard2.db', port: 5432 });shardRing.addNode('shard-3', { host: 'shard3.db', port: 5432 }); // Route a keyconst userId = 'user-12345';const shard = shardRing.getNode(userId);console.log(`User ${userId} → ${shard?.nodeId}`); // Adding a new shard moves only ~25% of keys (1/4 with 4 shards)shardRing.addNode('shard-4', { host: 'shard4.db', port: 5432 });More virtual nodes = better distribution but more memory for the ring structure and slower lookups. 100-200 virtual nodes per physical node is a common balance. Production systems like Cassandra use similar numbers.
Some operations inherently span multiple shards. These are the most challenging aspects of sharded systems.
When a query doesn't include the shard key, you must query all shards and merge results:
-- Find all users with email ending in '@company.com'
-- Shard key is user_id, not email
-- Must query all shards!
SELECT * FROM users WHERE email LIKE '%@company.com';
Performance implications:
Some data is needed on every shard but doesn't fit the shard key model:
Strategies:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
/** * Cross-shard query patterns */ interface ShardedQuery<T> { // Execute on specific shard executeSingle(shardKey: string): Promise<T>; // Execute on all shards and merge executeScatterGather(): Promise<T[]>;} class ShardedQueryExecutor { constructor( private router: ShardRouter, private timeout: number = 5000 ) {} // Simple scatter-gather async scatterGather<T>( buildQuery: (shard: ShardConfig) => Promise<T[]> ): Promise<T[]> { const shards = this.router.getAllShards(); const results = await Promise.all( shards.map(shard => Promise.race([ buildQuery(shard), this.timeoutPromise<T[]>(this.timeout, []) ]) ) ); return results.flat(); } // Scatter-gather with ORDER BY and LIMIT async scatterGatherTopN<T>( buildQuery: (shard: ShardConfig, limit: number) => Promise<T[]>, compareFn: (a: T, b: T) => number, limit: number ): Promise<T[]> { const shards = this.router.getAllShards(); // Request 'limit' from each shard const results = await Promise.all( shards.map(shard => buildQuery(shard, limit) ) ); // Merge and re-sort const merged = results.flat(); merged.sort(compareFn); return merged.slice(0, limit); } // Scatter-gather aggregation async scatterGatherAggregate<TAgg>( buildAggQuery: (shard: ShardConfig) => Promise<TAgg>, combineAggregates: (aggs: TAgg[]) => TAgg ): Promise<TAgg> { const shards = this.router.getAllShards(); const partialResults = await Promise.all( shards.map(shard => buildAggQuery(shard)) ); return combineAggregates(partialResults); } private timeoutPromise<T>(ms: number, fallback: T): Promise<T> { return new Promise(resolve => setTimeout(() => resolve(fallback), ms) ); }} // Example: Search users across all shardsasync function searchUsers( executor: ShardedQueryExecutor, query: string, limit: number): Promise<User[]> { return executor.scatterGatherTopN<User>( async (shard, shardLimit) => { const result = await shard.pool.query( `SELECT * FROM users WHERE name ILIKE $1 ORDER BY created_at DESC LIMIT $2`, [`%${query}%`, shardLimit] ); return result.rows; }, // Sort by created_at descending (a, b) => new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), limit );} // Example: Count all usersasync function countAllUsers( executor: ShardedQueryExecutor): Promise<number> { return executor.scatterGatherAggregate<number>( async (shard) => { const result = await shard.pool.query( 'SELECT COUNT(*) as count FROM users' ); return parseInt(result.rows[0].count, 10); }, (counts) => counts.reduce((sum, c) => sum + c, 0) );}You cannot efficiently JOIN tables across shards. If you need to join users with orders and they're on different shards, you must fetch from each shard separately and join in application code. Design your sharding so related data is colocated on the same shard.
ACID transactions don't naturally span shards. When business logic requires atomic operations across shards, you have limited options:
The classic distributed transaction protocol:
Problems with 2PC:
As discussed in functional partitioning, sagas are the preferred approach for cross-shard operations:
The best strategy is avoiding cross-shard transactions entirely:
Colocate related data: If user and orders share a shard key (user_id), an order placement transaction is single-shard.
Denormalize: Store necessary data locally instead of referencing another shard.
Accept eventual consistency: For some operations, strict atomicity isn't required. Transfer money between users? Process as two events with reconciliation.
Resharding—adding, removing, or rebalancing shards—is one of the most operationally complex procedures in database management. Poor resharding can cause extended downtime or data loss.
Online resharding (preferred when possible):
Offline resharding (simpler but causes downtime):
With consistent hashing, adding a shard moves only a fraction of keys. With 4 shards becoming 5, only ~20% of keys move. With simple hash mod, all keys need reconsideration.
1234567891011121314151617181920212223242526272829303132333435363738394041424344
# Online Resharding Procedure ## Prerequisites- [ ] New shard database provisioned- [ ] Replication configured from source shard(s)- [ ] Monitoring for new shard in place- [ ] Rollback procedure documented ## Phase 1: Replication Setup (No Impact)1. Create tables on new shard with same schema2. Set up logical replication from source shard(s) - PostgreSQL: Use pg_logical or Debezium - MySQL: Set up row-based replication3. Wait for replication to catch up (lag < 1s)4. Monitor for errors ## Phase 2: Dual-Write (Write Amplification)1. Update application to write to BOTH source and destination2. Validate data consistency between shards3. Monitor write latency (should not double) ## Phase 3: Read Migration (Gradual)1. Update routing config: 5% reads to new shard2. Monitor error rates and latency3. Increase to 25%, 50%, 75%, 100%4. If issues, roll back to 0% and investigate ## Phase 4: Write Migration (Critical)1. Enable write routing to new shard2. Disable writes to source shard for migrated keys3. Verify no writes going to old location ## Phase 5: Cleanup1. Stop replication from source2. Remove migrated data from source shard3. Update shard map to remove old references4. Run VACUUM on source shard to reclaim space ## Rollback ProcedureAt any phase:1. Revert routing config to 100% source2. Stop dual-writes (keep source as truth)3. Discard new shard data4. Investigate failurePlan for resharding to take days or weeks for large datasets. Rushing causes data loss or extended outages. Test the procedure on staging with production-like data volumes before attempting in production.
Let's consolidate the key insights from our exploration of application-level sharding:
What's Next:
For organizations that find application-level sharding too complex to build and maintain, there's an alternative: NewSQL databases. These systems provide the horizontal scalability of sharding with the familiar interface and ACID guarantees of traditional SQL databases. We'll explore when and how to consider these alternatives.
You now understand the mechanics, strategies, and trade-offs of application-level sharding. This is the most powerful—and most complex—SQL database scaling strategy. Use it when other options are exhausted and the benefits justify the significant engineering investment.