Loading content...
No matter how sophisticated your delta indexing pipeline, there will come a day when you need to rebuild your search index from scratch. Perhaps you're changing the analyzer to improve search relevance. Maybe you're adding new fields or restructuring the document schema. Or perhaps accumulated inconsistencies have drifted the index away from the source of truth.
Reindexing is not a failure—it's a feature.
Search indexes are derived data stores, and treating them as immutable artifacts that can be rebuilt at will is a sign of mature architecture. The challenge is performing this rebuild without disrupting the users who depend on search being available and consistent.
Consider the stakes: For a major e-commerce platform, search is often responsible for 50% or more of product discovery. An hour of search downtime during peak shopping can cost millions in lost revenue. Even degraded search quality (stale data, missing products) erodes customer trust.
This page teaches you how to reindex at scale with zero downtime, zero data loss, and minimal operational stress.
By the end of this page, you will understand when full reindexing is necessary, how to plan and execute zero-downtime reindexing, strategies for handling reindexing at different scales, and the operational patterns that make reindexing routine rather than risky.
Understanding when reindexing is required—versus when delta updates suffice—is crucial for operational planning. Some changes can be applied to existing indexes; others fundamentally require rebuilding.
Analyzer Changes Analyzers determine how text is tokenized and normalized. Changing an analyzer (e.g., adding stemming, changing tokenization rules) means existing documents were indexed with different token representations than new documents. Search quality degrades without reindexing.
Mapping Type Changes
Certain field type changes are incompatible. Changing a field from keyword to text (or vice versa), or from integer to float, requires reindexing because the underlying data structures differ.
New Computed Fields If you add a field that requires computation at index time (ML embeddings, derived scores), existing documents don't have this field. Backfilling via delta updates works for small datasets but becomes impractical at scale.
Index Settings Changes Some settings (number of primary shards, specific analyzer configurations) are fixed at index creation and cannot be modified. Changing these requires a new index.
Data Corruption or Drift Over time, bugs, failed updates, or edge cases can cause the search index to diverge from the source of truth. Periodic full reindexing acts as a reconciliation mechanism.
Some organizations adopt a regular reindexing cadence—weekly or monthly—regardless of whether changes require it. This ensures the index never drifts too far from source truth, simplifies operational processes (reindexing becomes routine), and catches issues early. If reindexing is rare and scary, you'll postpone necessary changes. If it's routine and safe, you'll iterate faster.
The core principle of zero-downtime reindexing is simple: build the new index alongside the old one, then atomically switch traffic. The implementation requires careful coordination.
During reindexing, your system operates in dual-write mode:
products_v1)products_v2)The critical challenge is ensuring no updates are lost between the bulk historical load completing and the live stream being established.
Consider this timeline:
The key insight: you need to capture changes starting from T0, not from T1. If you start streaming changes after the bulk load completes, you've lost everything that happened during the bulk load.
Solution: Start your CDC or change capture before beginning the bulk load. Buffer changes while bulk loading proceeds. After bulk load completes, replay the buffered changes to catch up.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
"""Zero-Downtime Reindexing with Change Capture This implementation demonstrates the dual-write pattern withproper change buffering to ensure no updates are lost.""" from datetime import datetimefrom typing import Optionalimport asyncio class ZeroDowntimeReindexer: """ Orchestrates zero-downtime reindexing with these guarantees: - All documents from source are indexed - All changes during reindex are captured - Production traffic is unaffected - Switchover is atomic """ def __init__( self, source_db: 'DatabaseClient', search_client: 'ElasticsearchClient', change_source: 'ChangeDataCapture', index_alias: str, ): self.source_db = source_db self.es = search_client self.cdc = change_source self.alias = index_alias async def reindex( self, new_index_name: str, new_mappings: dict, new_settings: dict, ) -> ReindexResult: """ Execute full reindex with zero downtime. Returns details about the reindex operation including timing, document counts, and any issues encountered. """ start_time = datetime.utcnow() # Step 1: Create new index with target mappings/settings await self._create_new_index( new_index_name, new_mappings, new_settings ) # Step 2: Start capturing changes BEFORE bulk load # This is critical - we need changes from this moment forward change_cursor = await self.cdc.start_capture() capture_start_time = datetime.utcnow() # Step 3: Bulk load all documents from source bulk_load_result = await self._bulk_load_from_source( new_index_name, snapshot_time=capture_start_time ) # Step 4: Replay accumulated changes to catch up catchup_result = await self._replay_changes( new_index_name, change_cursor, until=datetime.utcnow() ) # Step 5: Enter dual-write mode briefly # New changes go to both old and new index await self._enable_dual_write(new_index_name) # Step 6: Verify new index consistency verification = await self._verify_index(new_index_name) if not verification.passed: raise ReindexVerificationError(verification.issues) # Step 7: Atomic alias switch old_index = await self._get_current_index() await self._atomic_alias_switch(old_index, new_index_name) # Step 8: Cleanup await self._disable_dual_write() await self._schedule_old_index_deletion(old_index) return ReindexResult( success=True, old_index=old_index, new_index=new_index_name, documents_loaded=bulk_load_result.count, changes_replayed=catchup_result.count, duration=datetime.utcnow() - start_time ) async def _create_new_index( self, name: str, mappings: dict, settings: dict ) -> None: """Create new index with bulk-optimized settings.""" # Start with settings optimized for bulk loading bulk_settings = { **settings, "index": { **settings.get("index", {}), "refresh_interval": "-1", # Disable refresh during load "number_of_replicas": 0, # No replicas during load } } await self.es.indices.create( index=name, body={ "mappings": mappings, "settings": bulk_settings } ) async def _bulk_load_from_source( self, index_name: str, snapshot_time: datetime ) -> BulkLoadResult: """ Stream all documents from source database to new index. Uses cursor-based pagination to handle billions of docs without memory issues. """ total_indexed = 0 batch_size = 5000 async for batch in self.source_db.stream_all_documents( batch_size=batch_size, as_of=snapshot_time # Consistent read as of snapshot time ): transformed = [ self._transform_document(doc) for doc in batch ] await self.es.bulk_index( index=index_name, documents=transformed ) total_indexed += len(batch) # Log progress periodically if total_indexed % 100000 == 0: self._log_progress(total_indexed, "bulk_load") return BulkLoadResult(count=total_indexed) async def _replay_changes( self, index_name: str, cursor: 'ChangeCursor', until: datetime ) -> CatchupResult: """ Apply changes that occurred during bulk load. This is the "catchup" phase that brings the new index up to current state. """ changes_applied = 0 async for change_batch in self.cdc.read_changes( cursor=cursor, until=until ): for change in change_batch: if change.operation == 'DELETE': await self.es.delete( index=index_name, id=change.document_id, ignore=[404] # May not exist yet ) else: doc = self._transform_document(change.document) await self.es.index( index=index_name, id=change.document_id, body=doc ) changes_applied += 1 return CatchupResult(count=changes_applied) async def _atomic_alias_switch( self, old_index: str, new_index: str ) -> None: """ Atomically move alias from old to new index. This is a single atomic operation - queries see either the old index or the new one, never both or neither. """ await self.es.indices.update_aliases( body={ "actions": [ {"remove": {"index": old_index, "alias": self.alias}}, {"add": {"index": new_index, "alias": self.alias}} ] } )Reindexing strategies vary dramatically based on data volume. What works for millions of documents fails catastrophically for billions.
At this scale, simplicity wins:
_reindex API can copy between indexesComplexity increases:
Specialized infrastructure becomes necessary:
| Scale | Duration | Approach | Key Challenges |
|---|---|---|---|
| < 1M docs | Minutes | Simple script, _reindex API | None significant |
| 1-10M docs | 30-60 min | Bulk load from source | Network throughput |
| 10-100M docs | 1-4 hours | Parallel workers, chunking | Progress tracking, restartability |
| 100-500M docs | 4-12 hours | Distributed processing | Resource coordination, verification |
| 500M-1B docs | 12-48 hours | Spark/Flink, dedicated infra | Consistency window, catchup volume |
1B docs | Multiple days | Specialized pipeline, staged rollout | Everything above + operational coordination |
Reindexing time isn't just about the bulk load—it's also about the catchup phase. If your system sees 1M changes per hour and bulk load takes 6 hours, you'll have ~6M changes to replay. If replay is slower than incoming changes, you'll never catch up. Model this before starting.
Elasticsearch provides a built-in _reindex API that can copy documents between indexes within a cluster or across clusters. Understanding its capabilities and limitations is essential.
Good fit:
Poor fit:
Source Transformations: Use scripts to modify documents during copy
Slicing: Parallelize reindexing across multiple slices
Throttling: Limit requests per second to avoid overwhelming cluster
Async Execution: Run as background task with status checking
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
// Basic reindex: copy all documents from source to destPOST /_reindex{ "source": { "index": "products_v1" }, "dest": { "index": "products_v2" }} // Reindex with transformation using Painless scriptPOST /_reindex{ "source": { "index": "products_v1" }, "dest": { "index": "products_v2" }, "script": { "source": """ // Add new computed field ctx._source.search_boost = ctx._source.sales_count * 0.01 + ctx._source.rating * 0.5; // Rename field ctx._source.product_name = ctx._source.remove('name'); // Remove deprecated field ctx._source.remove('legacy_category'); // Conditional transformation if (ctx._source.price < 0) { ctx._source.price = 0; } """, "lang": "painless" }} // Parallel reindex with slicing (automatic slicing)POST /_reindex?slices=auto&wait_for_completion=false{ "source": { "index": "products_v1", "size": 5000 // Batch size per slice }, "dest": { "index": "products_v2" }}// Returns task ID for monitoring // Check reindex progressGET /_tasks/node_id:task_id // Reindex with throttling to limit cluster impactPOST /_reindex?requests_per_second=5000{ "source": { "index": "products_v1" }, "dest": { "index": "products_v2" }} // Reindex from remote clusterPOST /_reindex{ "source": { "remote": { "host": "http://old-cluster:9200", "username": "reindex_user", "password": "secret" }, "index": "products" }, "dest": { "index": "products_v2" }} // Reindex subset with queryPOST /_reindex{ "source": { "index": "products_v1", "query": { "range": { "updated_at": { "gte": "2024-01-01" } } } }, "dest": { "index": "products_v2" }}The _reindex API operates at the Elasticsearch level—it doesn't have access to your source database. If your transformation requires enrichment from external systems (fetching related entities, computing ML features), you need to build a custom reindexing pipeline instead of using the built-in API.
Switching production traffic to a new index without verification is like deploying code without tests. A rigorous validation process is essential.
The simplest check: does the new index have the expected number of documents?
GET /products_v2/_count
Compare against:
Watch for:
Randomly sample documents and verify field values match the source of truth:
Run production query samples against both old and new index:
Acceptable divergence: New analyzers might produce different results that are better. Review manually.
Unacceptable divergence: Missing documents, crash on valid queries, dramatically different result counts.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
/** * Comprehensive verification suite for reindexed indexes. * * Run this BEFORE switching the production alias. */ interface VerificationResult { passed: boolean; checks: CheckResult[]; summary: string;} interface CheckResult { name: string; passed: boolean; details: string; severity: 'critical' | 'warning' | 'info';} class ReindexVerification { constructor( private esClient: ElasticsearchClient, private sourceDb: DatabaseClient, private oldIndex: string, private newIndex: string, ) {} async runFullVerification(): Promise<VerificationResult> { const checks: CheckResult[] = []; // 1. Document count verification checks.push(await this.verifyDocumentCount()); // 2. Random sample verification checks.push(await this.verifyRandomSample(1000)); // 3. Edge case verification checks.push(await this.verifyEdgeCases()); // 4. Query parity verification checks.push(await this.verifyQueryParity()); // 5. Performance baseline checks.push(await this.verifyPerformance()); // 6. Mapping verification checks.push(await this.verifyMappings()); const criticalFailures = checks.filter( c => !c.passed && c.severity === 'critical' ); return { passed: criticalFailures.length === 0, checks, summary: this.generateSummary(checks), }; } private async verifyDocumentCount(): Promise<CheckResult> { const [sourceCount, newIndexCount, oldIndexCount] = await Promise.all([ this.sourceDb.count('products'), this.esClient.count({ index: this.newIndex }), this.esClient.count({ index: this.oldIndex }), ]); const percentDiff = Math.abs( (newIndexCount - sourceCount) / sourceCount * 100 ); const passed = percentDiff < 0.01; // Less than 0.01% difference return { name: 'Document Count', passed, severity: 'critical', details: `Source: ${sourceCount}, New Index: ${newIndexCount}, \ Old Index: ${oldIndexCount}, Diff: ${percentDiff.toFixed(4)}%`, }; } private async verifyRandomSample(sampleSize: number): Promise<CheckResult> { // Get random document IDs from source const randomIds = await this.sourceDb.getRandomIds('products', sampleSize); let matches = 0; let mismatches: Array<{ id: string; field: string; expected: any; actual: any }> = []; for (const id of randomIds) { const [sourceDoc, indexDoc] = await Promise.all([ this.sourceDb.get('products', id), this.esClient.get({ index: this.newIndex, id }).catch(() => null), ]); if (!indexDoc) { mismatches.push({ id, field: '_exists', expected: true, actual: false }); continue; } // Compare key fields const fieldsToCheck = ['name', 'price', 'category', 'brand']; let docMatches = true; for (const field of fieldsToCheck) { const expected = this.transformField(field, sourceDoc[field]); const actual = indexDoc._source[field]; if (JSON.stringify(expected) !== JSON.stringify(actual)) { mismatches.push({ id, field, expected, actual }); docMatches = false; } } if (docMatches) matches++; } const matchRate = matches / sampleSize; const passed = matchRate >= 0.999; // 99.9% match rate required return { name: 'Random Sample', passed, severity: 'critical', details: `${matches}/${sampleSize} documents matched (\ ${(matchRate * 100).toFixed(2)}%). \ Sample mismatches: ${JSON.stringify(mismatches.slice(0, 5))}`, }; } private async verifyQueryParity(): Promise<CheckResult> { // Load sample queries from production logs const sampleQueries = await this.loadProductionQueries(100); let parityCases = 0; let divergentCases: Array<{ query: string; oldCount: number; newCount: number }> = []; for (const query of sampleQueries) { const [oldResult, newResult] = await Promise.all([ this.esClient.search({ index: this.oldIndex, body: query }), this.esClient.search({ index: this.newIndex, body: query }), ]); const oldCount = oldResult.hits.total.value; const newCount = newResult.hits.total.value; // Allow 10% divergence (new analyzers may change results) const divergence = Math.abs(oldCount - newCount) / Math.max(oldCount, 1); if (divergence <= 0.1) { parityCases++; } else { divergentCases.push({ query: JSON.stringify(query).slice(0, 100), oldCount, newCount }); } } const parityRate = parityCases / sampleQueries.length; const passed = parityRate >= 0.9; // 90% of queries have parity return { name: 'Query Parity', passed, severity: 'warning', // Not critical - intentional changes may cause divergence details: `${parityCases}/${sampleQueries.length} queries had result parity. \ Divergent examples: ${JSON.stringify(divergentCases.slice(0, 3))}`, }; } private async verifyPerformance(): Promise<CheckResult> { const testQueries = await this.loadPerformanceTestQueries(); const results: Array<{ query: string; oldP99: number; newP99: number }> = []; for (const query of testQueries) { const oldLatencies: number[] = []; const newLatencies: number[] = []; // Run each query 10 times to get stable latency for (let i = 0; i < 10; i++) { const [oldResult, newResult] = await Promise.all([ this.timeQuery(this.oldIndex, query), this.timeQuery(this.newIndex, query), ]); oldLatencies.push(oldResult); newLatencies.push(newResult); } results.push({ query: JSON.stringify(query).slice(0, 50), oldP99: this.percentile(oldLatencies, 99), newP99: this.percentile(newLatencies, 99), }); } // New index should not be more than 20% slower const regressions = results.filter(r => r.newP99 > r.oldP99 * 1.2); const passed = regressions.length === 0; return { name: 'Performance', passed, severity: 'warning', details: passed ? 'No significant performance regressions detected' : `Regressions: ${JSON.stringify(regressions)}`, }; }}Despite verification, issues can emerge after switching to the new index. Having a reliable rollback plan is essential for operational confidence.
After switching to the new index, maintain the old index for a rollback window—typically 24-72 hours. During this period:
If using aliases properly, rollback is a single API call:
POST /_aliases
{
"actions": [
{ "remove": { "index": "products_v2", "alias": "products" } },
{ "add": { "index": "products_v1", "alias": "products" } }
]
}
This is atomic—no queries see an inconsistent state.
Data Freshness After Rollback: If you've been dual-writing to both indexes, the old index has current data. If not, you're rolling back to stale data.
Changed Mappings: If clients have started using new fields only available in the new index, rollback may cause client errors.
Database Schema Changes: If source database schemas changed alongside the reindex, rollback may be complicated.
Don't wait for an emergency to test your rollback procedure. Periodically practice rollbacks in staging environments. Time how long it takes. Document the exact commands. When you need to rollback in production at 3 AM, you'll be glad you practiced.
Reindexing is inherently risky. These operational practices reduce risk and improve reliability.
Maintain a runbook that includes:
| Category | Check | Verification |
|---|---|---|
| Cluster | Sufficient disk space (3x current index) | GET /_cat/allocation |
| Cluster | All nodes healthy | GET /_cluster/health |
| Cluster | No pending tasks | GET /_cluster/pending_tasks |
| New Index | Mappings validated | Create index, verify settings |
| Pipeline | CDC cursor established | Verify change capture is running |
| Pipeline | Bulk ingestion tested | Index sample batch successfully |
| Monitoring | Dashboards ready | Verify metrics flowing to Grafana |
| Communication | Stakeholders notified | Send pre-reindex notification |
| Rollback | Old index will be retained | Document retention plan |
| Team | On-call engineer available | Confirm availability during window |
Reindexing is a fundamental capability for any search system. Done poorly, it's a source of outages and stress. Done well, it becomes a routine operation that enables continuous improvement.
What's next:
We've covered full reindexing, but production systems need an abstraction layer to manage multiple index versions seamlessly. Next, we'll explore index aliases—the mechanism that enables atomic switching, query routing, and version management.
You now understand how to plan and execute zero-downtime reindexing operations. This capability transforms reindexing from a dreaded maintenance task into a routine operational procedure. Next, we'll explore index aliases for seamless version management.