Loading learning content...
Search indexes are not databases of record—they're derived data stores that must stay synchronized with authoritative sources. This synchronization is straightforward when you can simply reindex everything, but that approach breaks down at scale.
Imagine you're running the search infrastructure for a major e-commerce platform with 500 million products. A full reindex takes 6 hours and consumes significant cluster resources. Now consider that 10 million products change every day—prices update, descriptions change, inventory fluctuates. You can't reindex everything daily, but you can't ignore changes either.
Delta indexing solves this problem by identifying and processing only the documents that have changed since the last index update. Done well, delta indexing reduces a 6-hour full reindex to a 15-minute incremental job. Done poorly, it becomes a source of subtle bugs, inconsistent search results, and operational nightmares.
Mastering delta indexing is what separates teams that struggle with search from teams that make it look effortless.
By the end of this page, you will understand how to track document changes, implement efficient delta detection, handle updates, deletes, and partial modifications, and design robust synchronization pipelines. You'll learn the patterns used by companies like Shopify, Airbnb, and Netflix to keep billions of documents synchronized.
Before designing delta indexing strategies, we must understand what happens when we update a search index. The mechanics differ significantly from traditional database updates.
Most search engines, including Lucene-based systems (Elasticsearch, Solr, OpenSearch), use immutable segments. When you "update" a document:
This design has profound implications:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
// 1. Full document update (replaces entire document)PUT /products/_doc/12345{ "product_id": "12345", "name": "Wireless Headphones", "price": 79.99, "inventory": 150, "category": "electronics", "last_updated": "2024-01-15T10:30:00Z"} // 2. Partial update (preferred for single field changes)// Still rewrites document internally, but API is simplerPOST /products/_update/12345{ "doc": { "price": 69.99, "last_updated": "2024-01-15T11:00:00Z" }} // 3. Scripted update (for computed changes)POST /products/_update/12345{ "script": { "source": "ctx._source.inventory -= params.sold", "params": { "sold": 5 } }} // 4. Upsert (update or insert if not exists)POST /products/_update/12345{ "doc": { "price": 69.99, "last_updated": "2024-01-15T11:00:00Z" }, "upsert": { "product_id": "12345", "name": "New Product", "price": 69.99, "inventory": 100, "last_updated": "2024-01-15T11:00:00Z" }} // 5. Bulk update (efficient for multiple documents)POST /_bulk{"update": {"_index": "products", "_id": "12345"}}{"doc": {"price": 69.99}}{"update": {"_index": "products", "_id": "12346"}}{"doc": {"price": 89.99}}{"update": {"_index": "products", "_id": "12347"}}{"doc": {"price": 49.99}}While APIs like _update appear to modify single fields, internally the search engine must fetch the current document, merge changes, and write the complete new version. This means partial updates to large documents are nearly as expensive as full replacements. Design your documents with update patterns in mind.
The foundation of delta indexing is change detection—identifying which documents have been created, modified, or deleted since the last indexing run. There are several approaches, each with distinct trade-offs.
The most common approach: every document has a last_modified timestamp, and delta queries select documents modified after the last checkpoint.
Advantages:
Disadvantages:
Capture changes at the database level using transaction logs, triggers, or database features like PostgreSQL's logical replication.
Advantages:
Disadvantages:
Every change is published as an event to a message queue, consumed by the indexing pipeline.
Advantages:
Disadvantages:
| Strategy | Detects Creates | Detects Updates | Detects Deletes | Ordering | Complexity |
|---|---|---|---|---|---|
| Timestamp Query | ✓ | ✓ | ✗ | Approximate | Low |
| Full Diff (Hash) | ✓ | ✓ | ✓ | N/A | High |
| CDC (Debezium) | ✓ | ✓ | ✓ | Exact | Medium |
| Event Sourcing | ✓ | ✓ | ✓ | Per-partition | Medium |
| Soft Deletes + Timestamps | ✓ | ✓ | ✓ | Approximate | Low-Medium |
Deletes deserve special attention because they're inherently difficult to detect. If a record is deleted from the source database, it's gone—there's nothing to query for "modified since" timestamps.
Common solutions:
Soft Deletes: Never delete records; mark them with a deleted_at timestamp. Delta queries include these, and the indexer removes them from the search index.
Tombstone Tables: On delete, write the deleted ID to a separate "tombstones" table with a timestamp. Delta queries check both the main table and tombstones.
Full ID Reconciliation: Periodically, compare all IDs in the search index against all IDs in the source database. Missing IDs in source = deleted.
CDC: Database-level change capture naturally includes delete events.
Most production systems use a combination: CDC or soft deletes for operational deletes, with periodic full reconciliation as a safety net.
A robust delta indexing pipeline consists of several coordinated components. Let's examine a production-grade architecture used by major platforms.
1. Change Source The authoritative database or event stream where changes originate. This could be:
2. Change Collector A service that polls or subscribes to the change source, maintaining a checkpoint of the last processed change. This component must be:
3. Document Enricher Changes often contain only IDs or partial data. The enricher fetches complete documents from source systems, resolving foreign keys, joining related data, and computing derived fields.
4. Document Transformer Converts source documents into the search index schema, applying normalization, tokenization, and field mappings.
5. Bulk Indexer Batches transformed documents and writes them to the search cluster using bulk APIs.
6. Checkpoint Manager Persists the latest successfully indexed change marker, enabling resumption after failures.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
"""Production Delta Indexing Pipeline This implementation demonstrates patterns used at companiesprocessing millions of document updates per hour.""" from dataclasses import dataclassfrom datetime import datetime, timedeltafrom typing import Generator, Optionalimport json @dataclassclass ChangeEvent: """Represents a single change in the source system.""" event_id: str # Unique identifier for idempotency entity_type: str # e.g., 'product', 'user', 'order' entity_id: str # Primary key of changed entity operation: str # 'INSERT', 'UPDATE', 'DELETE' timestamp: datetime # When the change occurred payload: Optional[dict] # Changed data (may be partial) @dataclassclass IndexCheckpoint: """Tracks progress through the change stream.""" last_event_id: str last_timestamp: datetime events_processed: int last_updated: datetime class DeltaIndexingPipeline: """ Coordinates delta indexing from change detection through bulk indexing with fault tolerance and checkpointing. """ def __init__( self, change_source: 'ChangeSource', document_enricher: 'DocumentEnricher', search_client: 'SearchClient', checkpoint_store: 'CheckpointStore', batch_size: int = 1000, max_batch_wait_seconds: int = 5 ): self.change_source = change_source self.enricher = document_enricher self.search = search_client self.checkpoints = checkpoint_store self.batch_size = batch_size self.max_wait = max_batch_wait_seconds async def run(self) -> None: """ Main loop: consume changes, enrich, transform, index. This loop is designed to run continuously in production, handling backpressure and failures gracefully. """ while True: checkpoint = await self.checkpoints.load() try: async for batch in self._consume_batches(checkpoint): # Separate deletes from upserts deletes = [c for c in batch if c.operation == 'DELETE'] upserts = [c for c in batch if c.operation != 'DELETE'] # Process upserts: enrich, transform, index if upserts: documents = await self._process_upserts(upserts) await self._bulk_index(documents) # Process deletes: remove from index if deletes: await self._bulk_delete( [c.entity_id for c in deletes] ) # Checkpoint after successful batch new_checkpoint = IndexCheckpoint( last_event_id=batch[-1].event_id, last_timestamp=batch[-1].timestamp, events_processed=( checkpoint.events_processed + len(batch) ), last_updated=datetime.utcnow() ) await self.checkpoints.save(new_checkpoint) checkpoint = new_checkpoint except Exception as e: # Log error, wait, retry from checkpoint await self._handle_failure(e, checkpoint) async def _consume_batches( self, checkpoint: IndexCheckpoint ) -> Generator[list[ChangeEvent], None, None]: """ Consume changes in batches, balancing latency and throughput. Uses time-based batching: emit batch when either size limit or time limit is reached, whichever comes first. """ batch = [] batch_start = datetime.utcnow() async for change in self.change_source.stream(checkpoint): batch.append(change) batch_age = (datetime.utcnow() - batch_start).seconds if len(batch) >= self.batch_size or batch_age >= self.max_wait: yield batch batch = [] batch_start = datetime.utcnow() # Emit remaining items if batch: yield batch async def _process_upserts( self, changes: list[ChangeEvent] ) -> list[dict]: """ Enrich and transform changes into index-ready documents. Handles the common case where change events contain only IDs, requiring enrichment from the source database. """ # Deduplicate: if same entity changed multiple times in batch, # only process the latest latest_changes = {} for change in changes: key = f"{change.entity_type}:{change.entity_id}" existing = latest_changes.get(key) if not existing or change.timestamp > existing.timestamp: latest_changes[key] = change # Enrich: fetch full documents from source enriched = await self.enricher.enrich_batch( list(latest_changes.values()) ) # Transform: convert to search schema documents = [] for entity_id, data in enriched.items(): doc = self._transform_to_search_schema(data) documents.append(doc) return documents async def _bulk_index(self, documents: list[dict]) -> None: """ Write documents to search index using bulk API. Implements retry with exponential backoff for transient failures. """ for attempt in range(3): try: result = await self.search.bulk_index(documents) if result.errors: # Handle partial failures failed_ids = [ item['_id'] for item in result.items if 'error' in item ] raise PartialIndexingError(failed_ids) return except TransientError as e: wait = 2 ** attempt await asyncio.sleep(wait) raise IndexingError("Max retries exceeded") async def _bulk_delete(self, entity_ids: list[str]) -> None: """Remove deleted entities from search index.""" await self.search.bulk_delete(entity_ids) def _transform_to_search_schema(self, data: dict) -> dict: """ Map source data to search index schema. This is where you apply field mappings, compute derived fields, and normalize data for search. """ return { '_id': data['id'], 'name': data['name'], 'description': data.get('description', ''), 'price': float(data['price']), 'category': data['category']['name'], 'brand': data.get('brand', {}).get('name'), 'in_stock': data.get('inventory', 0) > 0, 'popularity_score': self._compute_popularity(data), 'indexed_at': datetime.utcnow().isoformat() }Delta indexing pipelines must handle numerous edge cases that can cause subtle data inconsistencies if not addressed properly.
In distributed systems, updates can arrive out of order. If a product's price changes from $100 → $80 → $90, but updates arrive as $80, $90, $100, the index will show $100 (the last processed) instead of $90 (the current value).
Solutions:
if_seq_no and if_primary_term in Elasticsearch)When fetching full documents during enrichment, related data might be missing (e.g., a product references a deleted category).
Solutions:
When indexing 1,000 documents, 995 might succeed and 5 might fail due to mapping conflicts or validation errors.
Solutions:
Every operation in your delta indexing pipeline must be idempotent—safe to execute multiple times with the same result. When failures occur (and they will), you'll restart from the last checkpoint and reprocess some events. Non-idempotent operations cause data corruption or duplicates.
Delta indexing performance directly impacts how fresh your search results are. Slow delta processing means stale data. Here are proven optimization techniques:
The optimal batch size balances latency and throughput:
Delta pipelines often have embarrassingly parallel workloads:
Enrichment (fetching full documents) is often the bottleneck:
Minimize data transfer and connection overhead:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
/** * Performance-optimized delta indexing with parallel processing. * * Achieves 50,000+ documents/second on typical hardware. */ interface PerformanceConfig { // Parallelism settings enrichmentConcurrency: number; // Parallel enrichment workers indexingConcurrency: number; // Parallel bulk request streams // Batch settings enrichmentBatchSize: number; // IDs per enrichment query indexingBatchSize: number; // Documents per bulk request // Caching settings relationCacheTTLSeconds: number; // TTL for category/brand cache relationCacheMaxSize: number; // Max cached relations} const PRODUCTION_CONFIG: PerformanceConfig = { enrichmentConcurrency: 8, indexingConcurrency: 4, enrichmentBatchSize: 500, indexingBatchSize: 2000, relationCacheTTLSeconds: 300, relationCacheMaxSize: 100_000,}; class OptimizedDeltaPipeline { private enrichmentSemaphore: Semaphore; private indexingSemaphore: Semaphore; private relationCache: LRUCache<string, Relation>; constructor(private config: PerformanceConfig) { this.enrichmentSemaphore = new Semaphore(config.enrichmentConcurrency); this.indexingSemaphore = new Semaphore(config.indexingConcurrency); this.relationCache = new LRUCache({ maxSize: config.relationCacheMaxSize, ttl: config.relationCacheTTLSeconds * 1000, }); } /** * Process changes with optimal parallelism. * * Uses a pipeline pattern: enrichment and indexing * happen concurrently on different batches. */ async processBatch(changes: ChangeEvent[]): Promise<void> { // Split into enrichment batches const enrichmentBatches = chunk( changes, this.config.enrichmentBatchSize ); // Process enrichment batches in parallel const enrichedDocs = await Promise.all( enrichmentBatches.map(batch => this.enrichmentSemaphore.runExclusive(() => this.enrichBatch(batch) ) ) ); // Flatten and split into indexing batches const allDocs = enrichedDocs.flat(); const indexingBatches = chunk(allDocs, this.config.indexingBatchSize); // Index batches in parallel await Promise.all( indexingBatches.map(batch => this.indexingSemaphore.runExclusive(() => this.indexBatch(batch) ) ) ); } /** * Batch enrichment with caching for relations. */ private async enrichBatch(changes: ChangeEvent[]): Promise<Document[]> { const entityIds = changes.map(c => c.entity_id); // Single query for all entities const entities = await this.db.query(` SELECT * FROM products WHERE id = ANY($1::uuid[]) `, [entityIds]); // Collect unique relation IDs const categoryIds = new Set(entities.map(e => e.category_id)); const brandIds = new Set(entities.map(e => e.brand_id).filter(Boolean)); // Fetch uncached relations const uncachedCategoryIds = [...categoryIds].filter( id => !this.relationCache.has(`category:${id}`) ); const uncachedBrandIds = [...brandIds].filter( id => !this.relationCache.has(`brand:${id}`) ); if (uncachedCategoryIds.length > 0) { const categories = await this.db.query( 'SELECT * FROM categories WHERE id = ANY($1::uuid[])', [uncachedCategoryIds] ); categories.forEach(c => this.relationCache.set(`category:${c.id}`, c) ); } if (uncachedBrandIds.length > 0) { const brands = await this.db.query( 'SELECT * FROM brands WHERE id = ANY($1::uuid[])', [uncachedBrandIds] ); brands.forEach(b => this.relationCache.set(`brand:${b.id}`, b) ); } // Build enriched documents return entities.map(entity => ({ ...entity, category: this.relationCache.get(`category:${entity.category_id}`), brand: entity.brand_id ? this.relationCache.get(`brand:${entity.brand_id}`) : null, })); } /** * Bulk index with compression and connection reuse. */ private async indexBatch(documents: Document[]): Promise<void> { const body = documents.flatMap(doc => [ { index: { _index: 'products', _id: doc.id } }, this.transformDocument(doc) ]); await this.esClient.bulk({ body, refresh: false, // Don't refresh after each bulk timeout: '30s', // Enable request compression headers: { 'Content-Encoding': 'gzip' } }); }}A delta indexing pipeline is only as good as your ability to understand its behavior. Comprehensive observability is essential for maintaining data freshness and diagnosing issues.
Lag Metrics
Throughput Metrics
Error Metrics
Resource Metrics
| Alert | Condition | Severity | Response |
|---|---|---|---|
| Lag Exceeds Threshold | Consumer lag > 1 hour | High | Scale indexers, check for bottlenecks |
| Indexing Failure Rate | Failures > 1% of documents | High | Check mapping conflicts, document validation |
| Enrichment Timeout | DB queries > 30s | Medium | Check database performance, add indexes |
| Checkpoint Staleness | No checkpoint update in 10min | Critical | Check pipeline health, possible crash |
| Memory Pressure | Heap > 90% | Medium | Reduce batch sizes, check for memory leaks |
| Dead Letter Queue Growth | DLQ size increasing | Medium | Investigate failed documents, fix root cause |
Define a freshness Service Level Objective (SLO), such as '99% of changes indexed within 5 minutes.' Monitor this continuously, alert when violated, and use it in capacity planning. This transforms abstract 'delta indexing performance' into a concrete, measurable target.
Change Data Capture (CDC) represents the most robust approach to delta detection. By tapping into database transaction logs, CDC captures every change reliably, including deletes, with guaranteed ordering.
Traditional timestamp-based polling has fundamental limitations:
CDC solves all of these by streaming changes as they're committed, providing:
Debezium: Open-source platform supporting PostgreSQL, MySQL, MongoDB, SQL Server. Runs on Kafka Connect.
Maxwell: Lightweight MySQL CDC to Kafka. Simple but limited to MySQL.
AWS DMS: Managed CDC service for AWS databases. Supports various targets.
Google Datastream: CDC for Cloud SQL, AlloyDB to BigQuery, GCS, or Pub/Sub.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
// Debezium PostgreSQL connector configuration// Captures changes from products, categories, and brands tables { "name": "search-indexing-connector", "config": { // Connector class for PostgreSQL "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // Database connection "database.hostname": "primary.db.example.com", "database.port": "5432", "database.user": "replication_user", "database.password": "${secrets.db_password}", "database.dbname": "ecommerce", // Logical replication slot name "slot.name": "search_indexing_slot", "plugin.name": "pgoutput", // What to capture "table.include.list": "public.products,public.categories,public.brands", // Kafka topic routing "topic.prefix": "cdc", // Produces topics: cdc.public.products, cdc.public.categories, etc. // Schema handling "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "true", // Snapshot mode: what to do on first start "snapshot.mode": "initial", // Take snapshot then stream changes // Heartbeat for health monitoring "heartbeat.interval.ms": "10000", // Transforms for convenience "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.fields": "op,ts_ms", "transforms.unwrap.delete.handling.mode": "rewrite" }} // Sample Debezium change event (after ExtractNewRecordState transform){ "id": "12345", "name": "Wireless Headphones", "price": 79.99, "category_id": "cat-001", "brand_id": "brand-abc", "updated_at": "2024-01-15T10:30:00.000000Z", "__op": "u", // 'c' = create, 'u' = update, 'd' = delete "__ts_ms": 1705315800000, "__deleted": "false"}CDC introduces new failure modes: replication slot growth during connector outages, schema changes breaking parsing, and slot invalidation from long-running transactions. Plan for these scenarios with monitoring, slot size limits, and schema evolution strategies.
Delta indexing is the bridge between your authoritative data stores and your search indexes. Mastering it ensures fresh, consistent search results without the cost of full rebuilds. Let's consolidate our learning:
What's next:
Delta indexing handles incremental updates, but sometimes you need to completely rebuild an index—due to schema changes, data structure evolution, or accumulated inconsistencies. Next, we'll explore reindexing strategies for performing full rebuilds without disrupting production search.
You now understand how to keep search indexes synchronized with source data through delta indexing. This capability is essential for maintaining fresh search results at scale. Next, we'll tackle the challenge of full reindexing when incremental updates aren't enough.