Loading content...
In the physical world, organizing things by ranges feels intuitive—books on a library shelf arranged alphabetically, files in a cabinet sorted by date, customer records divided A-H, I-P, Q-Z. This natural ordering principle extends to database sharding through range sharding, a partitioning strategy that assigns data to shards based on contiguous ranges of the shard key.
Range sharding is one of the oldest and most widely understood sharding strategies. It offers powerful benefits for range-based queries while presenting unique challenges around hotspots and rebalancing. Understanding when to use range sharding—and when to avoid it—is essential knowledge for any database architect.
This page provides a comprehensive exploration of range sharding: its mechanics, advantages, pitfalls, and the operational realities of maintaining range-sharded systems in production.
By the end of this page, you will understand how range sharding works, when it excels, its inherent vulnerabilities, how to design range boundaries effectively, and the operational procedures for maintaining range-sharded systems. You'll develop the judgment to know when range sharding is the right choice for your workload.
In range sharding, the shard key space is divided into non-overlapping, contiguous ranges, with each range assigned to a specific shard. A routing table (shard map) maintains the mapping between ranges and shards.
Formal Definition:
Range sharding partitions a dataset such that records with shard key values in the range [Ri, Ri+1) are stored on shard Si. The complete set of ranges {[R0, R1), [R1, R2), ..., [Rn-1, ∞)} covers the entire key space with no gaps or overlaps.
Example Range Partition:
| Shard | Range | Data Example |
|---|---|---|
| Shard 1 | user_id: [1, 1000000) | Users 1 through 999,999 |
| Shard 2 | user_id: [1000000, 2000000) | Users 1,000,000 through 1,999,999 |
| Shard 3 | user_id: [2000000, 3000000) | Users 2,000,000 through 2,999,999 |
| Shard 4 | user_id: [3000000, ∞) | Users 3,000,000 and above |
Routing Logic:
The routing algorithm for range sharding performs a range lookup:
def route_to_shard(key, range_map):
"""
Find the shard for a given key using binary search on range boundaries.
range_map: sorted list of (lower_bound, shard_id)
"""
# Binary search for the largest lower_bound ≤ key
for i in range(len(range_map) - 1, -1, -1):
if key >= range_map[i].lower_bound:
return range_map[i].shard_id
raise ValueError(f"No range found for key: {key}")
Range Map Data Structure:
The range map is typically stored in a metadata service or configuration store, replicated to all routing components:
{
"ranges": [
{ "lower": 1, "upper": 1000000, "shard": "shard-1" },
{ "lower": 1000000, "upper": 2000000, "shard": "shard-2" },
{ "lower": 2000000, "upper": 3000000, "shard": "shard-3" },
{ "lower": 3000000, "upper": null, "shard": "shard-4" }
]
}
Range boundaries are typically defined as half-open intervals [lower, upper) where the lower bound is inclusive and the upper bound is exclusive. This ensures every possible key falls into exactly one range with no ambiguity. The final range often has an unbounded upper limit to capture all keys above the last explicit boundary.
Range sharding has specific characteristics that make it ideal for certain workloads. Understanding these sweet spots helps identify when range sharding is the right choice.
Use Case 1: Range Queries Are Primary Access Pattern
When most queries request data within a contiguous range of the shard key, range sharding enables single-shard or minimal-shard queries:
-- With range sharding by created_at (time buckets):
SELECT * FROM events
WHERE created_at BETWEEN '2024-01-01' AND '2024-01-31';
-- If January 2024 is entirely on Shard 5:
-- → Query hits ONLY Shard 5, not all shards!
Contrast with hash sharding where every range query becomes a scatter-gather across all shards.
WHERE date BETWEEN Jan 1 AND Jan 31Use Case 2: Ordered Data Processing
When data needs to be processed or scanned in order, range sharding preserves sort order within and across shards:
Use Case 3: Data Lifecycle Management
Range sharding enables efficient bulk operations on data segments:
-- Drop an entire shard when data expires
-- Instead of: DELETE FROM events WHERE created_at < '2023-01-01'
-- Simply: DROP SHARD shard_2023 (conceptually)
-- Or move historical shards to cheaper storage
ALTER SHARD shard_2023 SET STORAGE = 'cold_tier';
Use Case 4: Predictable Data Locality
When the application knows where data will land based on the key value, it can optimize accordingly:
| Workload Characteristic | Why Range Sharding Helps | Example |
|---|---|---|
| Time-series queries | Time ranges map to shard ranges efficiently | IoT sensor data, log analytics |
| Alphabetical access | Name ranges enable prefix queries | Directory services, contact lists |
| Geographic proximity | Nearby locations in same range | Mapping, location services |
| Data archival | Old ranges can be dropped/archived as units | Compliance, data retention |
| Batch processing | Processing order matches shard order | ETL pipelines, aggregation jobs |
Range sharding's killer feature is efficient range queries. If your workload frequently queries contiguous subsets of data (time windows, alphabetical ranges, numeric spans), range sharding can provide dramatic performance benefits over hash sharding.
Range sharding's greatest vulnerability is hotspots—situations where one or more shards receive disproportionate traffic while others sit idle. Understanding hotspot causes and mitigations is essential for successful range sharding deployments.
Hotspot Cause 1: Monotonically Increasing Keys
The most infamous hotspot scenario occurs when the shard key increases monotonically (auto-increment IDs, timestamps):
Timeline:
- Day 1: Shard 1 receives ALL new writes (IDs 1-1M)
- Day 2: Shard 1 full, Shard 2 receives ALL new writes
- Day 3: Shard 2 receives ALL new writes
- ...
- Current: Whatever shard holds 'now' receives 100% of writes!
This creates a perpetual single-shard bottleneck for writes, completely defeating the purpose of sharding.
Hotspot Cause 2: Skewed Access Patterns
Even with uniform data distribution, access patterns may be skewed:
-- Data is evenly distributed across shards A-Z
-- But 90% of searches are for names A-C!
-- → Shards A, B, C are overwhelmed
Hotspot Cause 3: Poor Range Boundary Design
If ranges don't align with data density, some shards become overloaded:
| Hotspot Cause | Symptom | Mitigation |
|---|---|---|
| Monotonic keys | Latest shard always overloaded | Use hash sharding or composite keys with hash prefix |
| Temporal skew | Current time range overloaded | Time-based micro-sharding, read replicas for hot ranges |
| Popular data clustering | Specific range gets most queries | Split popular ranges, add caching layer |
| Static boundaries | Some shards fill faster | Automatic range splitting based on size/traffic |
| Write-heavy periods | All writes to current range | Pre-split ranges for anticipated growth |
NEVER use raw timestamps as the sole shard key with range sharding. This is the single most common mistake and guarantees a write hotspot. If time-based sharding is required, use composite keys (e.g., hash(entity_id) || timestamp) or hash sharding with secondary time-based indexing.
Effective range sharding requires thoughtful design of range boundaries. Poorly designed boundaries lead to imbalanced shards, hotspots, and frequent resharding. Well-designed boundaries enable smooth scaling and predictable performance.
Boundary Design Principles:
1. Size-Based Boundaries
Set boundaries so each shard holds approximately equal amounts of data:
def calculate_range_boundaries(total_keys, num_shards, key_distribution):
"""
Calculate boundaries that give equal data per shard.
Accounts for non-uniform key distribution.
"""
target_per_shard = total_keys / num_shards
boundaries = []
cumulative = 0
for key_value, count in key_distribution:
cumulative += count
if cumulative >= target_per_shard * (len(boundaries) + 1):
boundaries.append(key_value)
return boundaries
2. Traffic-Based Boundaries
In read-heavy systems, balance by query load rather than data volume:
-- Analyze query patterns to find traffic-balanced splits
SELECT
NTILE(4) OVER (ORDER BY user_id) AS proposed_shard,
MIN(user_id) AS range_start,
MAX(user_id) AS range_end,
SUM(query_count) AS expected_traffic
FROM user_query_stats
GROUP BY proposed_shard;
3. Pre-Splitting for Anticipated Growth
Before bulk loads or known growth events, pre-split ranges:
# Before importing 10M new users (IDs 5M - 15M)
# Pre-split the receiving range into smaller chunks
original_ranges = [
(1, 5_000_000, 'shard-1'),
(5_000_000, float('inf'), 'shard-2'), # Will receive 10M users!
]
pre_split_ranges = [
(1, 5_000_000, 'shard-1'),
(5_000_000, 7_500_000, 'shard-2'),
(7_500_000, 10_000_000, 'shard-3'),
(10_000_000, 12_500_000, 'shard-4'),
(12_500_000, float('inf'), 'shard-5'),
]
# Load distributed across 4 new shards instead of 1
4. Automatic Split Detection
Monitor shard metrics and trigger splits when thresholds are exceeded:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
# Automatic Range Split Detection and Execution class ShardMonitor: def __init__(self, config): self.max_shard_size_gb = config.get('max_shard_size_gb', 100) self.max_qps_per_shard = config.get('max_qps_per_shard', 10000) self.split_threshold = config.get('split_threshold', 0.8) # 80% def check_shard_health(self, shard_id: str) -> dict: """ Evaluate if a shard needs splitting. """ metrics = self.get_shard_metrics(shard_id) size_ratio = metrics['size_gb'] / self.max_shard_size_gb qps_ratio = metrics['current_qps'] / self.max_qps_per_shard needs_split = ( size_ratio > self.split_threshold or qps_ratio > self.split_threshold ) return { 'shard_id': shard_id, 'size_ratio': size_ratio, 'qps_ratio': qps_ratio, 'needs_split': needs_split, 'recommended_action': 'SPLIT' if needs_split else 'OK' } def calculate_split_point(self, shard_id: str) -> int: """ Find optimal split point for a range. Goal: Equal data distribution between resulting shards. """ range_info = self.get_range_for_shard(shard_id) # Query to find median key value in this range median_key = self.db.query(f''' SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY shard_key) as median FROM data_table WHERE shard_key >= {range_info['lower']} AND shard_key < {range_info['upper']} ''') return median_key def execute_split(self, shard_id: str, split_point: int): """ Perform the range split. 1. Create new shard 2. Update routing table 3. Migrate data 4. Verify and cutover """ # This is a complex operation typically handled by # the database system's built-in resharding capabilities passModern distributed databases like CockroachDB, TiDB, and Google Spanner handle range splitting automatically. They monitor range sizes and split them transparently. If using a manual sharding approach, implementing automatic splitting is a significant engineering investment.
Implementing range sharding requires careful attention to routing logic, metadata management, and handling edge cases. Here are production-tested patterns.
Pattern 1: Centralized Metadata with Cached Routing
Store authoritative range map in a metadata store (etcd, ZooKeeper, database). Routers cache the map locally and refresh periodically or on miss.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
import bisectfrom dataclasses import dataclassfrom typing import Optional, Listimport threadingimport time @dataclassclass RangeEntry: lower_bound: int upper_bound: Optional[int] # None for unbounded shard_id: str class RangeShardRouter: """ Production range sharding router with caching. """ def __init__(self, metadata_store, cache_ttl_seconds: int = 60): self.metadata_store = metadata_store self.cache_ttl = cache_ttl_seconds self.ranges: List[RangeEntry] = [] self.lower_bounds: List[int] = [] # For binary search self.cache_timestamp = 0 self.lock = threading.RLock() self._refresh_cache() def _refresh_cache(self): """ Fetch latest range map from metadata store. """ with self.lock: raw_ranges = self.metadata_store.get_ranges() self.ranges = [ RangeEntry(r['lower'], r.get('upper'), r['shard']) for r in sorted(raw_ranges, key=lambda x: x['lower']) ] self.lower_bounds = [r.lower_bound for r in self.ranges] self.cache_timestamp = time.time() def _ensure_cache_fresh(self): """ Refresh cache if TTL exceeded. """ if time.time() - self.cache_timestamp > self.cache_ttl: self._refresh_cache() def route(self, key: int) -> str: """ Find shard for a given key using binary search. O(log n) lookup where n = number of ranges. """ self._ensure_cache_fresh() with self.lock: # Find rightmost lower_bound <= key idx = bisect.bisect_right(self.lower_bounds, key) - 1 if idx < 0: raise ValueError(f"Key {key} below all ranges") range_entry = self.ranges[idx] # Validate key is within range if range_entry.upper_bound and key >= range_entry.upper_bound: raise ValueError(f"Key {key} not in any range (gap detected)") return range_entry.shard_id def route_range_query(self, lower_key: int, upper_key: int) -> List[str]: """ Find all shards that overlap with the query range. """ self._ensure_cache_fresh() with self.lock: result = [] for range_entry in self.ranges: range_lower = range_entry.lower_bound range_upper = range_entry.upper_bound or float('inf') # Check for overlap if range_lower < upper_key and range_upper > lower_key: result.append(range_entry.shard_id) return result def invalidate_cache(self): """ Force cache refresh on next query. Called when metadata changes are detected. """ self.cache_timestamp = 0Pattern 2: Directory-Based Lookup Table
For very large number of ranges or complex routing logic, use a dedicated lookup table:
-- Range directory table
CREATE TABLE range_directory (
range_id SERIAL PRIMARY KEY,
lower_bound BIGINT NOT NULL,
upper_bound BIGINT, -- NULL for unbounded
shard_id VARCHAR(50) NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
CONSTRAINT no_overlap CHECK (
-- Enforced by application; complex to do in constraints
TRUE
)
);
CREATE INDEX idx_range_lookup ON range_directory (lower_bound);
-- Route a key
SELECT shard_id FROM range_directory
WHERE lower_bound <= 1500000
AND (upper_bound IS NULL OR upper_bound > 1500000)
LIMIT 1;
Pattern 3: Consistent Hashing with Virtual Nodes (Hybrid)
For systems needing both range efficiency and automatic rebalancing, use consistent hashing with ordered virtual nodes:
12345678910111213141516171819202122232425262728293031323334353637383940414243
# Hybrid: Range-like behavior with consistent hashing properties class ConsistentRangeRouter: """ Uses consistent hashing ring but maintains range semantics. Each physical shard owns contiguous arcs of the ring. """ def __init__(self, shards: list, virtual_nodes_per_shard: int = 100): self.ring = {} # position -> shard_id self.sorted_positions = [] for shard_id in shards: for v in range(virtual_nodes_per_shard): # Deterministic position on ring pos = self.hash_to_ring(f"{shard_id}:{v}") self.ring[pos] = shard_id self.sorted_positions = sorted(self.ring.keys()) def hash_to_ring(self, key: str) -> int: """Map key to position on ring (0 to 2^32 - 1)""" import hashlib return int(hashlib.md5(key.encode()).hexdigest(), 16) % (2**32) def route(self, key: int) -> str: """Find shard owning this key's position on ring.""" pos = self.hash_to_ring(str(key)) # Binary search for first position >= pos idx = bisect.bisect_left(self.sorted_positions, pos) if idx >= len(self.sorted_positions): idx = 0 # Wrap around return self.ring[self.sorted_positions[idx]] def add_shard(self, shard_id: str, virtual_nodes: int = 100): """Add shard with minimal data movement.""" # Only keys in the new shard's arcs need to move for v in range(virtual_nodes): pos = self.hash_to_ring(f"{shard_id}:{v}") self.ring[pos] = shard_id self.sorted_positions = sorted(self.ring.keys())When ranges are modified (splits, merges, migrations), all routing caches must be invalidated or updated. Use pub/sub mechanisms, cache TTLs, or explicit invalidation protocols. Stale routing information causes queries to hit wrong shards, leading to missing data or errors.
Operating range-sharded systems requires ongoing attention to balance, performance, and evolution. Here are the key operational concerns and practices.
Monitoring and Alerting:
Range-sharded systems require specific metrics:
| Metric | Warning Threshold | Critical Threshold | Action |
|---|---|---|---|
| Shard size ratio (max/min) | 2x | 5x | Rebalance or split large shards |
| Shard QPS ratio (max/min) | 3x | 10x | Investigate hotspot, consider split |
| Range map staleness | 5 minutes | 30 minutes | Investigate metadata sync issues |
| Cross-shard query percentage | 30% | 60% | Review shard key choice |
| Shard disk utilization | 70% | 85% | Add capacity, split, or archive |
Rebalancing Strategies:
When shards become imbalanced, several strategies can restore equilibrium:
1. Range Splitting: Divide an oversized range into smaller ranges, each assigned to a different shard.
Before: Shard A owns [1, 1000000) — 80GB
After: Shard A owns [1, 500000) — 40GB
Shard B owns [500000, 1000000) — 40GB
2. Range Merging: Combine adjacent underutilized ranges.
Before: Shard X owns [1000, 2000) — 1GB
Shard Y owns [2000, 3000) — 1GB
After: Shard X owns [1000, 3000) — 2GB
3. Range Migration: Move a range from one physical shard to another without changing boundaries.
# Move range [5000, 6000) from shard-3 to shard-7
1. Create replica of range on shard-7
2. Catch up to current state
3. Update routing to point to shard-7
4. Decommission range from shard-3
Schema Changes Across Shards:
DDL operations must be coordinated across all shards:
-- Adding a column requires:
-- 1. Apply to all shards (sequentially or in parallel)
-- 2. Handle partial failure scenarios
-- 3. Ensure application compatibility during rollout
-- Options:
-- a) Online DDL with compatibility window
-- b) Ghost tables / shadow table approach
-- c) Database-specific online schema change tools (pt-osc, gh-ost)
Backup and Recovery:
The range map (shard metadata) is the most critical component. If it's lost or corrupted, the system cannot route queries. Ensure the metadata store has its own replication, backup, and recovery procedures. Many organizations use battle-tested systems like etcd or ZooKeeper specifically for this purpose.
Range sharding and hash sharding represent fundamentally different tradeoffs. Understanding when to use each is essential for system design.
Comprehensive Comparison:
| Characteristic | Range Sharding | Hash Sharding |
|---|---|---|
| Range queries | ✅ Efficient (single/few shards) | ❌ Expensive (all shards) |
| Point queries | ✅ Efficient (single shard) | ✅ Efficient (single shard) |
| Write distribution | ⚠️ Risk of hotspots | ✅ Naturally uniform |
| Hotspot resistance | ❌ Vulnerable to monotonic keys | ✅ Distributes evenly by design |
| Data locality | ✅ Related keys are adjacent | ❌ Related keys scattered |
| Rebalancing | ⚠️ Requires careful splits | ✅ Adding shards easy |
| Implementation complexity | ⚠️ Range map management | ✅ Simple hash function |
| Ordered scans | ✅ Preserved within shards | ❌ Requires merge across shards |
Decision Framework:
1234567891011121314151617181920212223242526
FUNCTION choose_sharding_strategy(workload): # Check for range sharding disqualifiers IF shard_key is monotonically_increasing (timestamp, auto-increment): RETURN "HASH" -- Range would create write hotspot IF write_heavy AND uniform_distribution_required: RETURN "HASH" -- Best for balanced writes # Check for range sharding qualifiers IF range_queries > 30% of workload: RETURN "RANGE" -- Major benefit from range locality IF ordered_scans_required: RETURN "RANGE" -- Preserves order IF data_lifecycle_by_key_ranges (archival, TTL): RETURN "RANGE" -- Enables bulk operations on ranges # Default for general workloads IF point_queries_dominate AND no_special_requirements: RETURN "HASH" -- Simpler, auto-balanced # Consider hybrid approaches IF mixed_requirements: RETURN "COMPOSITE" -- e.g., hash prefix + range suffixIf your workload doesn't have strong requirements for range queries or ordered access, hash sharding is often the safer choice. It's simpler to implement, automatically distributes load, and resists hotspots. You can always migrate to range sharding later if specific needs emerge—though migration is expensive.
Range sharding is a powerful strategy with specific strengths and well-known weaknesses. Let's consolidate the key insights:
What's Next:
The next page explores hash sharding—the alternative partitioning strategy that trades range query efficiency for automatic load distribution and hotspot resistance. Understanding hash sharding completes your toolkit for choosing the right partitioning approach for any workload.
You now possess comprehensive knowledge of range sharding—its mechanics, advantages, vulnerabilities, implementation patterns, and operational requirements. You can evaluate when range sharding is appropriate and design effective range-sharded systems.