Loading content...
Online resharding—restructuring how data is partitioned while the database continues serving traffic—represents one of the most challenging operations in distributed systems. It requires simultaneously maintaining data consistency, ensuring availability, coordinating across distributed nodes, and handling the inevitable failures that occur in production environments.
Unlike scheduled maintenance windows where traffic can be paused, online resharding demands that the system remain fully operational. Users shouldn't notice that their data is being moved between shards. Transactions should complete correctly. Latencies should remain acceptable. Yet beneath this facade of normalcy, the database is undergoing fundamental surgery.
By the end of this page, you will understand the fundamental challenges of online resharding: data consistency during migration, coordination protocol complexity, failure handling during transitions, split-brain scenarios, performance degradation patterns, and the engineering tradeoffs that production systems must navigate.
The most fundamental challenge in online resharding is maintaining data consistency while data is in motion. When a key is being migrated from shard A to shard B, answering the question "Where is the authoritative copy?" becomes surprisingly complex.
The Window of Vulnerability:
During migration, data exists in a transitional state:
Time T0: Data is on Shard A only
Time T1: Migration starts, data begins copying to Shard B
Time T2: Data exists on both A and B (but which is authoritative?)
Time T3: Routing updated, Shard B is now authoritative
Time T4: Data deleted from Shard A (cleanup)
During the window T1-T3, multiple consistency problems can arise:
The Fundamental Tradeoff:
There's an inherent tension between:
Perfect consistency during migration typically requires:
Approaches to Migration Consistency:
| Approach | Consistency | Availability | Performance | Complexity |
|---|---|---|---|---|
| Stop-the-world | Strong | Zero during migration | N/A | Low |
| Lock-per-key | Strong | Degraded for migrating keys | High latency for locked keys | Medium |
| Double-write | Eventual | Full | 2x write latency | Medium |
| CDC + Cutover | Strong (at cutover) | Brief pause at cutover | Minimal overhead | High |
| Redirects | Strong | Full | Additional hop | High |
Every online resharding approach involves tradeoffs. The CAP theorem applies directly here: during migration, you're essentially operating in a partitioned state (data exists in multiple places), so you must choose between consistency and availability.
Online resharding requires coordinating state across multiple independent components: source shards, destination shards, routing layers, client caches, and orchestration controllers. The complexity of this coordination is often underestimated.
The Coordination Participants:
┌─────────────────────────────────────────────────────────────┐
│ Resharding Coordinator │
│ (Orchestrates the overall process, tracks progress) │
└─────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Router │ │ Router │ │ Router │
│ Layer │ │ Layer │ │ Layer │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Shard A │ ─────── │ Shard B │ ─────── │ Shard C │
│ (Source) │ migrate │ (Dest) │ │ │
└──────────┘ └──────────┘ └──────────┘
▲ ▲ ▲
│ │ │
Client Connections (with cached routing tables)
Coordination Challenges:
1. Distributed State Machine
All participants must agree on the current migration state:
States: IDLE → PREPARING → MIGRATING → SWITCHING → COMPLETED
Problem: What if coordinator sees state=MIGRATING but router sees state=IDLE?
Result: Router sends traffic to wrong shard; data inconsistency.
2. Ordering Guarantees
Certain operations must happen in specific order:
Required order:
1. Stop writes to source shard for migrating keys
2. Drain in-flight writes
3. Copy remaining data
4. Update routing
5. Enable writes on destination shard
Problem: Distributed systems have no global clock
Result: Operations may appear out of order to different participants
3. Participant Failures
Any participant can fail during any state:
| Failure Point | Critical Data at Risk | Recovery Strategy | Downtime Impact |
|---|---|---|---|
| Coordinator crash | Migration progress state | Fenced leadership, durable state log | Seconds to minutes |
| Source shard crash | Unmigrated keys | Promote replica, restart migration | Minutes to hours |
| Destination shard crash | Already migrated keys | Remigrate from source or replica | Minutes to hours |
| Router crash | None (stateless) | Automatic failover to other routers | Milliseconds |
| Network partition | Potentially all data | Halt migration, wait for partition heal | Depends on partition duration |
Robust online resharding typically requires a consensus system (ZooKeeper, etcd, Consul) to coordinate state across participants. This introduces dependency on the consensus system's availability but provides strong consistency guarantees for coordination metadata.
Split-brain occurs when different parts of the system have conflicting beliefs about the current state of the migration. This is one of the most dangerous scenarios in online resharding because it can lead to data corruption or loss.
Classic Split-Brain Scenario:
Timeline:
T0: Migration in progress, coordinator decides to switch routing
T1: Coordinator sends update to Router A (succeeds)
T2: Coordinator tries to send update to Router B (network partition!)
T3: Now split-brain:
- Router A believes Shard B is authoritative for key K
- Router B believes Shard A is authoritative for key K
T4: Client X writes K=100 through Router A → goes to Shard B
T5: Client Y writes K=200 through Router B → goes to Shard A
T6: Network heals, we have two different values for K
Fencing: The Primary Defense
Fencing uses monotonic tokens to prevent stale participants from accepting writes:
# Each routing update has a version number
routing_version = 42
# Shards reject writes that come with older routing versions
class Shard:
def __init__(self):
self.min_accepted_version = 0
def write(self, key, value, routing_version):
if routing_version < self.min_accepted_version:
raise StaleRoutingError("Rejecting write from outdated router")
return self._do_write(key, value)
def set_min_version(self, new_version):
# Called by coordinator during migration
self.min_accepted_version = max(self.min_accepted_version, new_version)
Lease-Based Ownership:
The source shard holds a lease on the data being migrated:
1. Source shard has lease on keys [K1, K2, K3] until time T+60s
2. Migration must complete within lease period
3. If migration completes, source shard's lease is not renewed
4. If migration fails, source shard automatically regains authority when lease expires
5. No split-brain possible: only lease holder can serve writes
Once split-brain has occurred and conflicting writes have been accepted, there is no fully automated recovery. Either one set of writes must be discarded, or manual conflict resolution is required. Prevention is far better than cure.
Transactions that span keys being migrated present unique challenges. The ACID properties become extremely difficult to maintain when the underlying data location changes mid-transaction.
The Cross-Migration Transaction Problem:
Transaction T1:
BEGIN
READ key A (currently on Shard 1)
READ key B (currently on Shard 1)
WRITE key A = A + B
COMMIT
During T1 execution:
- Key B is being migrated from Shard 1 to Shard 2
- T1's read of B may see pre-migration or post-migration data
- T1's isolation guarantees are broken
| Isolation Level | Risk During Migration | Mitigation Strategy |
|---|---|---|
| Read Committed | May read different versions from different shards | Pin transaction to shard version at start |
| Repeatable Read | Second read may see data on different shard | Lock migrating keys for transaction duration |
| Serializable | Serialization order undefined across shards | Two-phase locking across shards |
| Snapshot Isolation | Snapshot may span migration boundary | Epoch-based snapshots with migration awareness |
Two-Phase Commit with Migration:
When transactions must commit across migrating keys:
Phase 1 - Prepare:
Coordinator: "Prepare to commit write to key K"
Source Shard: "K is migrating, forwarding to destination"
Destination Shard: "Prepared" (key locked)
Phase 2 - Commit:
Coordinator: "Commit"
Destination Shard: "Committed" (release lock)
Problem: 2PC is blocking; participant failure blocks all operations
Problem: Coordinator must know current key location
Problem: Location may change between phases
Practical Approaches:
1. Block Transactions on Migrating Keys
Simplest approach: reject or queue transactions touching migrating keys:
def execute_transaction(txn, migrating_keys):
if txn.touches_any(migrating_keys):
raise RetryableError("Keys are migrating, please retry")
return txn.execute()
Pros: Simple, correct Cons: Reduced availability for migrating keys
2. Forward to Authoritative Shard
Have source shard forward requests to destination:
Client → Source Shard: "Write K=100"
Source Shard → Destination Shard: "Write K=100" (internal forward)
Destination Shard → Source Shard: "OK"
Source Shard → Client: "OK"
Pros: Transparent to clients Cons: Additional latency, complex error handling
Advanced systems use epoch numbers that change at migration boundaries. Transactions are tagged with their start epoch. If a transaction's epoch differs from the current epoch, the transaction is aborted and retried. This provides clean semantics at the cost of some transaction retries.
Even with well-designed resharding systems, performance degradation is nearly inevitable. Understanding and managing this degradation is crucial for maintaining SLAs during resharding operations.
Sources of Degradation:
| Metric | Normal | During Migration | Degradation |
|---|---|---|---|
| Read Latency (p50) | 2ms | 3ms | +50% |
| Read Latency (p99) | 10ms | 25ms | +150% |
| Write Latency (p50) | 5ms | 8ms | +60% |
| Write Latency (p99) | 20ms | 60ms | +200% |
| Throughput | 100K ops/s | 70K ops/s | -30% |
| Error Rate | 0.01% | 0.1% | +900% |
Mitigation Strategies:
1. Aggressive Throttling
Limit migration throughput based on observed latency:
def adaptive_throttle():
current_p99 = get_latency_p99()
target_p99 = SLA_LATENCY * 0.8 # Leave 20% headroom
if current_p99 > target_p99:
migration_rate *= 0.8 # Slow down
elif current_p99 < target_p99 * 0.5:
migration_rate *= 1.2 # Speed up
return migration_rate
2. Off-Peak Migration Windows
Run migration during low-traffic periods:
migration_schedule:
allowed_hours: "02:00-06:00 UTC" # Lowest traffic
max_rate_during_peak: "10MB/s"
max_rate_during_offpeak: "100MB/s"
3. Priority Queuing
Give user traffic priority over migration traffic:
Average latency can look acceptable while p99 is disaster. During resharding, always monitor tail latencies (p95, p99, p99.9). A migration that adds 10% to average latency might add 300% to p99, which is what users actually experience.
Failures during resharding are not exceptional—they're expected. The resharding system must be designed to handle failures gracefully and recover to a consistent state.
Failure Categories:
| Failure Type | Detection Time | Recovery Strategy | Data Risk |
|---|---|---|---|
| Process crash (restartable) | Seconds | Restart from checkpoint | None if checkpointing is frequent |
| Node failure (non-restartable) | 10-60 seconds | Failover to replica, restart migration | None if replicated |
| Network partition | Seconds to minutes | Pause migration, wait for healing | Potential if split-brain not prevented |
| Disk failure | Immediate | Reconstruct from replicas | None if replicated, full if not |
| Coordinator failure | Seconds | Leader election, resume from state log | None if state is durable |
| Cascading failure | Minutes | Circuit breakers, stop migration | Potentially significant |
Checkpointing for Recovery:
A well-designed resharding system maintains durable checkpoints:
class MigrationCheckpoint:
def __init__(self):
self.migration_id = uuid()
self.source_shard = "shard-1"
self.dest_shard = "shard-2"
self.key_range = ("user:0", "user:99999")
self.state = "MIGRATING"
self.last_migrated_key = "user:42000"
self.bytes_migrated = 1_500_000_000
self.checksum_so_far = "abc123..."
self.started_at = "2024-01-15T02:00:00Z"
self.last_checkpoint = "2024-01-15T04:30:00Z"
# On failure and restart:
def resume_migration(checkpoint):
# Verify destination has data up to last_migrated_key
dest_checksum = dest_shard.checksum(checkpoint.key_range.start,
checkpoint.last_migrated_key)
if dest_checksum != checkpoint.checksum_so_far:
# Data corruption or incomplete write
raise CorruptionError("Checkpoint mismatch")
# Resume from last_migrated_key
continue_migration(from_key=checkpoint.last_migrated_key)
Rollback Procedures:
When failure recovery isn't possible, clean rollback is essential:
Rollback Steps:
1. Stop all migration activity
2. Restore routing to source shard only
3. Delete partially migrated data on destination
4. Clear migration metadata
5. Verify source shard has complete data
6. Log rollback for post-mortem
7. Notify operators
Failure paths are rarely tested in development but frequently exercised in production. Use chaos engineering techniques to deliberately inject failures during resharding in test environments. Every failure scenario should have a tested recovery procedure.
Testing online resharding is exceptionally difficult because the challenging scenarios involve complex timing, failures, and concurrent operations that are hard to reproduce in test environments.
Why Resharding Is Hard to Test:
Testing Strategies:
1. Shadow Testing
Run resharding on a shadow copy of production data:
Production: Shard A → Shard B (actual migration)
Shadow: Shard A' → Shard B' (test migration on replica)
Compare:
- Did shadow migration produce identical results?
- Did shadow migration have different error patterns?
- Did shadow migration performance match predictions?
2. Canary Resharding
Start with a small, low-risk portion:
Phase 1: Migrate 0.1% of keys (test traffic)
Phase 2: Migrate 1% of keys (limited blast radius)
Phase 3: Migrate 10% of keys (confidence building)
Phase 4: Migrate remaining 89% (production rollout)
3. Consistency Checkers
Continuously verify data integrity:
def verify_migration_consistency():
for key in sample(migrated_keys, count=1000):
src_value = source_shard.read(key)
dst_value = dest_shard.read(key)
if src_value != dst_value:
alert("Consistency violation", key, src_value, dst_value)
return False
return True
# Run continuously during and after migration
while migration_in_progress:
if not verify_migration_consistency():
pause_migration()
sleep(60)
4. Chaos Engineering
Deliberately inject failures:
Even after successful migration, run full consistency checks comparing source and destination data. Some bugs only manifest after the migration is 'complete' but before source data is deleted. This is your last chance to catch errors.
Online resharding represents some of the most demanding challenges in distributed systems engineering. The key insights from this page:
What's Next:
The final page examines practical considerations for production resharding operations. We'll explore operational runbooks, monitoring requirements, rollback procedures, and the organizational aspects of executing large-scale database restructuring in real-world environments.
You now understand the deep technical challenges of online resharding. This knowledge is essential for designing robust resharding systems and anticipating the problems that will arise during execution.