Loading learning content...
In an ideal world, your initial sharding design would serve forever. But reality is rarely so accommodating. Data grows beyond projections. Access patterns shift as products evolve. Hotspots emerge from unexpected sources. Performance requirements tighten. And eventually, every sharded system faces a moment of truth: the existing sharding scheme must change.
Resharding—the process of modifying a database's sharding configuration—is one of the most complex and risky operations in distributed systems. It requires moving potentially terabytes or petabytes of data while maintaining system availability, data consistency, and application correctness. It's expensive in engineering time, infrastructure resources, and organizational attention. And yet, it's sometimes unavoidable.
This page provides a comprehensive guide to resharding: understanding when it's necessary, the available strategies, the risks and mitigations, and the operational procedures that enable successful resharding operations even in the most demanding production environments.
By the end of this page, you will understand the triggers that necessitate resharding, the major resharding strategies (offline, online, shadow, and incremental), the risks and how to mitigate them, and operational best practices from organizations that have successfully resharded at scale. You'll be prepared to plan, execute, and recover from resharding operations.
Resharding is never undertaken lightly. It's expensive, risky, and disruptive. Understanding the legitimate triggers for resharding helps distinguish necessary evolution from premature optimization.
Trigger 1: Capacity Exhaustion
The most common driver is running out of capacity:
Symptom Timeline:
Month 1: Shard response time: 10ms avg
Month 6: Shard response time: 25ms avg ← Warning
Month 9: Shard response time: 100ms avg ← Critical
Month 10: Shard unresponsive ← Too late
Trigger 2: Shard Key Inadequacy
The original shard key no longer serves the workload:
| Trigger Category | Specific Indicator | Urgency | Typical Response |
|---|---|---|---|
| Storage Growth | Largest shard > 80% of target size | Medium | Add shards, split ranges |
| Hotspot Formation | One shard > 3x average load | High | Split hot shard, review shard key |
| Query Pattern Shift | 50% queries are cross-shard | High | Evaluate new shard key |
| Architecture Change | Moving to different database system | Planned | Migration project with resharding |
| Performance Degradation | P99 latency > 2x historical | High | Urgent investigation and remediation |
| Cardinality Limits | distinct shard keys ≈ # shards | Critical | Must change shard key strategy |
Trigger 3: Architectural Evolution
Sometimes resharding is necessary for architectural reasons:
Trigger 4: Disaster Recovery
In rare cases, resharding is forced by failures:
If you can predict that resharding will be needed within 6 months based on current growth trajectories, start planning now. Resharding projects typically take 2-4 months to plan and execute safely. Starting too late leads to rushed decisions and increased risk.
Resharding strategies vary dramatically in complexity, risk, and impact on system availability. The choice depends on data volume, downtime tolerance, and engineering resources.
Strategy 1: Offline Resharding (Stop-the-World)
The simplest approach: stop all traffic, move data, update routing, restart.
Timeline:
1. Announce maintenance window
2. Stop all application traffic to database
3. Export all data from current shards
4. Import data to new shards using new shard key/strategy
5. Update routing configuration
6. Verify data integrity
7. Resume traffic
Pros: Simplest to implement, easiest to reason about, clean cutover Cons: Requires downtime (hours to days for large datasets), not viable for high-availability requirements
Strategy 2: Online Resharding (Live Migration)
Migrate data while continuing to serve traffic:
Strategy 3: Shadow Resharding
Build the new sharding scheme entirely in shadow, then cutover:
Strategy 4: Incremental/Rolling Resharding
Migrate one shard or key range at a time:
Week 1: Migrate shard 1 → new topology
Week 2: Migrate shard 2 → new topology
...
Week N: All shards migrated, old topology retired
Comparison Matrix:
| Strategy | Downtime | Complexity | Risk | Data Volume Suitability |
|---|---|---|---|---|
| Offline | Hours-Days | Low | Low (but all-or-nothing) | Small-Medium (< 1TB) |
| Online/Live | Zero (planned) | High | Medium (gradual failures) | Large (1TB-100TB) |
| Shadow | Minutes (cutover) | Very High | Low (easy rollback) | Any size |
| Incremental | Zero | Medium-High | Low (isolated failures) | Very Large (> 100TB) |
Database systems like CockroachDB, TiDB, and Vitess have built-in online resharding capabilities. Rather than implementing custom resharding logic, consider using these systems or their resharding tools (like Vitess for MySQL sharding).
Online resharding—migrating data while the system continues serving traffic—is the gold standard for production systems that cannot afford downtime. It's also significantly more complex than offline approaches.
The Dual-Write Pattern:
During migration, writes must go to both old and new shards to maintain consistency:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
class DualWriteRouter: """ Routes writes to both old and new shard topologies during migration. Critical properties: - Old shards remain source of truth until cutover - New shards receive all writes for consistency - Backfill process copies historical data """ def __init__(self, old_router, new_router, migration_status): self.old_router = old_router self.new_router = new_router self.status = migration_status def write(self, key, value): """ Dual-write during migration. Write to old first (source of truth), then new. """ # Write to old shard (must succeed for operation to succeed) old_shard = self.old_router.route(key) result = old_shard.write(key, value) if not result.success: return result # Fail fast # Write to new shard (async, log failures for retry) new_shard = self.new_router.route(key) try: new_shard.write(key, value) except Exception as e: # Don't fail the operation, but log for reconciliation self.status.log_dual_write_failure(key, e) return result def read(self, key): """ During migration, reads come from old shards (source of truth). After cutover, reads switch to new shards. """ if self.status.phase == 'migration': return self.old_router.route(key).read(key) elif self.status.phase == 'cutover_complete': return self.new_router.route(key).read(key) else: raise ValueError(f"Unknown phase: {self.status.phase}") class BackfillProcess: """ Copies historical data from old shards to new shards. Must handle data that's being concurrently modified. """ def __init__(self, old_router, new_router, status): self.old = old_router self.new = new_router self.status = status def backfill_range(self, start_key, end_key, batch_size=1000): """ Copy a key range from old to new topology. Uses checkpointing for resumability. """ cursor = start_key while cursor < end_key: # Read batch from old shard old_shard = self.old.route(cursor) batch = old_shard.scan( start=cursor, limit=batch_size ) if not batch: break # Write to new shards (may go to different shards) for record in batch: new_shard = self.new.route(record.key) new_shard.write(record.key, record.value) # Checkpoint progress cursor = batch[-1].key self.status.update_backfill_progress(cursor) # Respect rate limits self._apply_throttle() def _apply_throttle(self): """Rate limit to avoid overwhelming target shards.""" time.sleep(self.status.throttle_ms / 1000)The Reconciliation Problem:
During backfill, data is being concurrently written. This creates race conditions:
Time 1: Backfill reads key K with value V1 from old shard
Time 2: Application writes key K with value V2 to old shard (dual-write succeeds)
Time 3: Backfill writes key K with value V1 to new shard
Result: New shard has stale value V1!
Solutions:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
class CDCBackfillStrategy: """ Use Change Data Capture for consistent backfill. Approach: 1. Start CDC stream from current position 2. Perform full snapshot backfill 3. Apply CDC events that occurred during backfill 4. Catch up to live traffic, then cutover """ def __init__(self, cdc_stream, old_shards, new_router): self.cdc = cdc_stream self.old = old_shards self.new = new_router def execute_migration(self): # Step 1: Mark CDC starting position cdc_position = self.cdc.get_current_position() print(f"Starting CDC from position: {cdc_position}") # Step 2: Full snapshot backfill (may take hours/days) print("Beginning snapshot backfill...") for shard in self.old: self.backfill_shard(shard) # Step 3: Apply CDC events since snapshot started print("Applying CDC events...") events_applied = 0 while not self.cdc.is_caught_up(): event = self.cdc.consume_next(since=cdc_position) self.apply_event_to_new_shard(event) events_applied += 1 print(f"Applied {events_applied} CDC events, fully caught up") # Step 4: Verify consistency print("Running consistency verification...") if self.verify_consistency(): print("✓ Consistency verified, ready for cutover") else: raise Exception("Consistency check failed!") def apply_event_to_new_shard(self, event): """Apply a CDC event (insert/update/delete) to new topology.""" new_shard = self.new.route(event.key) if event.type == 'INSERT' or event.type == 'UPDATE': new_shard.write(event.key, event.value) elif event.type == 'DELETE': new_shard.delete(event.key)Change Data Capture (Debezium, Maxwell, proprietary CDC) is now the preferred approach for online migrations. It provides a reliable stream of changes that can be replayed to bring the new topology into sync with the old, handling all the complex race conditions transparently.
The cutover is the moment when traffic switches from the old sharding topology to the new. It's the highest-risk phase of any resharding operation. The goal is to make it as fast and safe as possible.
Cutover Approach 1: Big Bang (Instant Switch)
At a predetermined moment, all traffic switches simultaneously:
Time 0: 100% traffic → old shards
Time 1: Update routing configuration
Time 2: 0% traffic → old shards, 100% → new shards
Pros: Simple, clean, fast Cons: All-or-nothing risk, no gradual validation
Cutover Approach 2: Gradual Traffic Shift
Ramp traffic to new shards progressively:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
class GradualCutoverController: """ Shifts traffic from old to new shards progressively. Allows monitoring and rollback at each step. """ def __init__(self, old_router, new_router): self.old = old_router self.new = new_router self.traffic_percentage_new = 0.0 def route_read(self, key): """Route reads based on current traffic percentage.""" if random.random() < self.traffic_percentage_new: return self.new.route(key).read(key) else: return self.old.route(key).read(key) def execute_gradual_cutover(self, steps=[5, 10, 25, 50, 75, 100]): """ Execute gradual cutover with validation at each step. Default: 5% → 10% → 25% → 50% → 75% → 100% """ for percentage in steps: print(f"--- Ramping to {percentage}% new shards ---") self.traffic_percentage_new = percentage / 100.0 # Allow traffic to flow at new ratio print(f"Running at {percentage}% for validation period...") time.sleep(300) # 5 minutes # Check health metrics if not self.health_check(): print(f"Health check failed at {percentage}%, rolling back!") self.rollback() return False # Check latency if not self.latency_check(): print(f"Latency degradation at {percentage}%, pausing...") if not self.wait_for_recovery(): self.rollback() return False print(f"✓ {percentage}% validation passed") print("🎉 Cutover complete: 100% traffic on new shards") return True def rollback(self): """Emergency rollback to old shards.""" print("ROLLBACK: Reverting all traffic to old shards") self.traffic_percentage_new = 0.0 def health_check(self): """Verify both old and new shards are healthy.""" error_rate = self.get_error_rate() return error_rate < 0.01 # Less than 1% errors def latency_check(self): """Verify latency hasn't degraded significantly.""" current_p99 = self.get_current_p99_latency() baseline_p99 = self.get_baseline_p99_latency() return current_p99 < baseline_p99 * 1.5 # Allow 50% increaseCutover Approach 3: Key-Range Based Cutover
Migrate specific key ranges while others remain on old shards:
Phase 1: Keys [A-M] → new shards, Keys [N-Z] → old shards
Phase 2: Keys [A-Z] → new shards (full cutover)
Cutover Approach 4: Read-Then-Write Staged Cutover
Separate the cutover of reads and writes:
Phase 1: Reads → new shards, Writes → both (dual-write)
Phase 2: Validate read correctness
Phase 3: Writes → new shards only
Phase 4: Decommission old shards
This is safer because read errors are less critical than write errors. If reads from new shards show problems, you can still fall back to old shards for reads.
| Strategy | Duration | Rollback Ease | Risk Level | Monitoring Requirement |
|---|---|---|---|---|
| Big Bang | Seconds | Hard (all data on new) | High | All-or-nothing |
| Gradual Percentage | Hours-Days | Easy (just reduce %) | Low | Continuous |
| Key-Range Based | Days-Weeks | Moderate | Low | Per-range |
| Read-Then-Write | Hours | Easy (reads are safe) | Low | Read vs. Write metrics |
Schedule cutovers for Tuesday or Wednesday mornings when the full team is available to monitor and respond. Never before weekends, holidays, or launches. The 24-48 hours after cutover are critical for catching subtle issues.
Resharding operations have many failure modes. Identifying risks upfront and preparing mitigations is essential for successful execution.
Risk Category 1: Data Consistency Failures
| Risk | Cause | Mitigation |
|---|---|---|
| Missing records | Backfill race conditions | Use CDC, verify counts |
| Stale data | Writes during backfill overwritten | Timestamp-based resolution |
| Duplicate records | Retry logic without idempotency | Idempotent operations |
| Corrupt data | Serialization/encoding issues | Hash verification |
Risk Category 2: Availability Failures
| Risk | Cause | Mitigation |
|---|---|---|
| Extended downtime | Migration takes longer than planned | Buffer time, incremental approach |
| Cascading overload | New shards can't handle load | Load testing, gradual cutover |
| Routing failures | Stale routing configuration | Aggressive cache invalidation |
| Application errors | Code doesn't handle new topology | Feature flags, compatibility testing |
Risk Category 3: Operational Failures
| Risk | Cause | Mitigation |
|---|---|---|
| Team unavailable | Key person sick/unreachable | Document runbooks, train multiple operators |
| Insufficient monitoring | Can't detect problems during migration | Pre-build dashboards, alerts |
| Rollback fails | Old shards no longer have current data | Maintain old topology until verified |
| External dependencies | Downstream systems can't handle changes | Coordinate with all stakeholders |
123456789101112131415161718192021222324252627282930313233343536373839
# Resharding Risk Checklist ## Pre-Migration- [ ] Full backup of all shards verified and tested- [ ] Rollback procedure documented and tested in staging- [ ] Load testing completed on new topology- [ ] Monitoring dashboards prepared for all critical metrics- [ ] On-call team identified and briefed- [ ] Stakeholders notified and communication plan in place- [ ] Maintenance window scheduled (if needed)- [ ] Feature flags ready to disable problematic features ## During Migration- [ ] Backfill progress tracked and within expected timeline- [ ] Error rates monitored and below threshold- [ ] Dual-write failures logged and accumulated count acceptable- [ ] Resource utilization on new shards within capacity- [ ] Application error rates unchanged ## Pre-Cutover- [ ] Data consistency verification completed- [ ] Record counts match between old and new topologies- [ ] Sample data spot-checked for correctness- [ ] Performance benchmarks meet requirements- [ ] Rollback tested on production-like environment ## Cutover Day- [ ] Team assembled and ready- [ ] Communication channels open- [ ] Monitoring displayed on shared screen- [ ] Rollback command ready to execute- [ ] Customer support briefed ## Post-Cutover- [ ] Monitor for 24-48 hours minimum- [ ] Compare metrics against baseline- [ ] Verify no data loss or corruption- [ ] Confirm rollback is still possible (keep old shards)- [ ] Collect lessons learnedThe most critical risk mitigation is a working rollback. Before any cutover, prove you can roll back safely. Keep the old topology intact and capable of serving traffic until the new topology is proven stable (typically 7-30 days post-cutover).
Learning from organizations that have successfully (and unsuccessfully) resharded provides invaluable lessons.
Case Study 1: Stripe's Sharding Migration
Stripe migrated their MongoDB cluster to a new sharding topology while maintaining their strict reliability requirements.
Challenge: Move billions of documents across shards without downtime or data loss
Approach:
Key Lesson: Investment in observability paid off. They caught subtle issues through shadow reads that would have caused production incidents.
Case Study 2: Slack's Vitess Migration
Slack migrated from custom MySQL sharding to Vitess to gain better operational tooling.
Challenge: Re-shard ~20TB of data across 400+ MySQL instances
Approach:
Key Lesson: Tenant-based incremental migration isolated failures and made rollback granular.
| Company | Data Volume | Duration | Approach | Downtime |
|---|---|---|---|---|
| Stripe | Billions of docs | ~4 months | CDC + Dual-write + Shadow | Zero |
| Slack | 20+ TB | ~6 months | Tenant-by-tenant via Vitess | Zero |
| Shopify | 100+ TB | ~1 year | Ghost tables + incremental | Zero |
| 50+ TB | ~8 months | Custom CDC pipeline | Zero | |
| GitHub | Multiple PB | Ongoing | ProxySQL + incremental | Minimal windows |
Case Study 3: A Failed Resharding (Lessons Learned)
A mid-sized fintech company attempted resharding that resulted in a 6-hour outage:
What Went Wrong:
Lessons:
The most successful resharding operations run the new topology in shadow mode for extended periods (weeks), comparing every response against the old topology. This catches subtle bugs, performance issues, and data discrepancies before they affect real users.
The best resharding is the one you never have to do. Proactive design decisions can dramatically reduce the likelihood and scope of future resharding needs.
Strategy 1: Overprovision Shards Initially
Start with more shards than immediately necessary, leaving room for data growth:
// Anticipated need: 8 shards for current data
// Better approach: Start with 32 shards
// - Current load distributed across 32 (headroom per shard)
// - Can grow 4x without resharding
// - Use consistent hashing to allow easy growth to 64, 128...
Strategy 2: Choose Shard Keys for Longevity
Invest heavily upfront in shard key analysis:
Strategy 3: Use Databases with Built-in Resharding
Modern distributed databases handle resharding automatically:
| Database | Resharding Capability |
|---|---|
| CockroachDB | Automatic range splitting and rebalancing |
| TiDB | Automatic region splitting based on size/load |
| Vitess | Online resharding with VReplication |
| MongoDB | Live resharding (4.4+) via chunk migration |
| YugabyteDB | Automatic tablet splitting |
| Google Spanner | Fully automated, transparent resharding |
Strategy 4: Design for Incremental Resharding
Even with manual sharding, design for incremental changes:
# Bad: Keys must be on exactly one of N shards
shard = hash(key) % 16 # Adding shard 17 moves ~all data
# Better: Consistent hashing allows gradual addition
ring = ConsistentHashRing(virtual_nodes=200)
ring.add_shard('shard-0') # Adding shard-1 moves ~1/N data
# Best: Hybrid with directory-based overrides
def route(key):
if key in special_routing_table:
return special_routing_table[key]
return ring.route(key)
# Can move individual keys without full resharding
The engineering effort to prevent resharding (better initial design, proper tooling, proactive monitoring) is typically 1/10th the cost of executing a resharding project. Invest upfront.
Resharding is one of the most challenging operations in distributed systems—but with proper preparation, tooling, and execution, it can be performed safely at any scale. Let's consolidate the essential knowledge:
Module Complete:
You have now completed the comprehensive study of database sharding—from foundational concepts through shard key selection, range and hash partitioning strategies, to the challenging realities of resharding. You possess the knowledge to design, implement, and evolve sharded database architectures at any scale.
Sharding is not just a technical skill but an architectural discipline that requires balancing immediate needs against future evolution, simplicity against capability, and risk against reward. The best database architects approach sharding with both deep technical knowledge and the wisdom to know when simpler solutions suffice.
Congratulations! You now understand sharding comprehensively—from core concepts and shard key selection through range and hash partitioning to the operational complexities of resharding. This knowledge positions you to architect distributed databases that scale gracefully while minimizing operational burden.