Loading content...
Triggers and application-level enforcement aim to maintain denormalized data consistency at write time—synchronously or with minimal delay. But even the most carefully designed systems can develop inconsistencies:
Batch synchronization provides a safety net. Scheduled processes periodically scan denormalized data, compare it against authoritative sources, and correct any discrepancies found. This approach accepts that perfect real-time consistency may not be achievable and instead guarantees that inconsistencies are bounded in duration.
This page provides a comprehensive exploration of batch synchronization—from fundamental concepts through production-grade implementations that handle terabyte-scale datasets efficiently.
Batch synchronization is not a replacement for real-time enforcement—it's an additional layer. Production systems should use triggers or application logic for immediate consistency, with batch processes as periodic verification and correction. Think of it as both audit and remediation.
Batch synchronization encompasses two related but distinct activities:
These can be run together or separately, depending on operational needs. Detection-only runs help assess data quality without making changes; correction runs fix identified issues.
| Mode | Detection | Correction | Use Case |
|---|---|---|---|
| Audit Only | Yes | No | Assess data quality; generate reports; no system changes |
| Detect and Alert | Yes | No | Identify issues and notify operators for manual review |
| Auto-Correct | Yes | Yes | Automatically fix all detected inconsistencies |
| Selective Correct | Yes | Conditional | Auto-fix low-risk issues; escalate high-risk for review |
| Full Rebuild | N/A | Yes | Regenerate all denormalized data from source (expensive) |
The Reconciliation Loop:
A typical batch sync process follows this pattern:
1. Select a batch of records to check
2. For each record:
a. Fetch current denormalized values
b. Compute expected values from source
c. Compare current vs. expected
d. If different: log discrepancy, optionally correct
3. Update progress tracking
4. If more records remain, repeat from step 1
5. Generate summary report
This simple loop can be optimized in many ways, but the fundamental pattern remains consistent across implementations.
When implementing batch sync for the first time, start with detection-only mode. Run it, analyze the results, and understand why inconsistencies exist before enabling auto-correction. Blind auto-correction can mask underlying bugs that should be fixed at the source.
When and how often to run batch synchronization depends on data volume, consistency requirements, and system load patterns.
| Data Volume | Consistency SLA | Recommended Approach |
|---|---|---|
| Small (< 100K rows) | Hours acceptable | Hourly or daily fixed schedule |
| Medium (100K - 10M rows) | Hours acceptable | Nightly full sync + hourly incremental |
| Large (10M - 1B rows) | Minutes desired | Continuous rolling sync with sharding |
| Very Large (> 1B rows) | Any | Event-driven with periodic sampling audits |
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
// Scheduling configuration examples interface SyncScheduleConfig { type: 'fixed' | 'interval' | 'continuous' | 'adaptive'; // Fixed schedule options cronExpression?: string; // e.g., "0 2 * * *" for 2 AM daily // Interval options intervalMinutes?: number; allowOverlap?: boolean; // Continuous options batchSize?: number; pauseBetweenBatches?: number; // Adaptive options minIntervalMinutes?: number; maxIntervalMinutes?: number; increaseThreshold?: number; // Error rate to speed up decreaseThreshold?: number; // Error rate to slow down} // Example configurations for different scenarios const smallDatasetConfig: SyncScheduleConfig = { type: 'fixed', cronExpression: '0 2 * * *', // Daily at 2 AM}; const mediumDatasetConfig: SyncScheduleConfig = { type: 'interval', intervalMinutes: 60, allowOverlap: false,}; const largeDatasetConfig: SyncScheduleConfig = { type: 'continuous', batchSize: 10000, pauseBetweenBatches: 100, // 100ms between batches to limit load}; const adaptiveConfig: SyncScheduleConfig = { type: 'adaptive', minIntervalMinutes: 5, maxIntervalMinutes: 360, // Max 6 hours between runs increaseThreshold: 0.001, // Speed up if >0.1% errors decreaseThreshold: 0.0001, // Slow down if <0.01% errors};The choice between incremental and full synchronization significantly impacts performance, resource usage, and guaranteed consistency.
The Hybrid Approach:
Many production systems combine both strategies:
This provides responsive correction for most issues while guaranteeing eventual detection of any issue through full scans.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
// Incremental sync implementation with high-water mark interface SyncCheckpoint { tableName: string; lastSyncedAt: Date; lastProcessedId: string | null; version: number;} class IncrementalSyncProcessor { constructor( private db: Database, private checkpointStore: CheckpointStore ) {} async syncCustomerNames(): Promise<SyncResult> { const checkpoint = await this.checkpointStore.get('customer_name_sync'); const result: SyncResult = { checked: 0, fixed: 0, errors: 0 }; // Fetch changes since last sync const changedCustomers = await this.db.customers.find({ updatedAt: { $gt: checkpoint.lastSyncedAt }, // Or use a cursor for ID-based pagination: // id: { $gt: checkpoint.lastProcessedId } }, { orderBy: { updatedAt: 'asc' }, limit: 10000 }); for (const customer of changedCustomers) { try { result.checked++; // Find orders with stale customer name const staleOrders = await this.db.orders.find({ customerId: customer.id, customerName: { $ne: customer.name } }); if (staleOrders.length > 0) { await this.db.orders.updateMany( { customerId: customer.id }, { customerName: customer.name, denormSyncedAt: new Date() } ); result.fixed += staleOrders.length; } // Update checkpoint progressively await this.checkpointStore.update('customer_name_sync', { lastSyncedAt: customer.updatedAt, lastProcessedId: customer.id, version: checkpoint.version + 1 }); } catch (error) { result.errors++; await this.logError('customer_name_sync', customer.id, error); } } return result; }} // Full sync for periodic deep auditclass FullSyncProcessor { async syncAllCustomerNames(): Promise<SyncResult> { const result: SyncResult = { checked: 0, fixed: 0, errors: 0 }; // Use cursor-based pagination for memory efficiency let cursor: string | null = null; do { const batch = await this.db.customers.findWithCursor({ cursor, limit: 5000, orderBy: { id: 'asc' } }); for (const customer of batch.data) { // Same logic as incremental... result.checked++; const updated = await this.db.orders.updateMany( { customerId: customer.id, customerName: { $ne: customer.name } }, { customerName: customer.name, denormSyncedAt: new Date() } ); result.fixed += updated.count; } cursor = batch.nextCursor; // Rate limiting to avoid overwhelming the database await sleep(50); } while (cursor !== null); return result; }}Batch sync jobs can be resource-intensive, potentially impacting production database performance. Careful optimization is essential for large-scale systems.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
-- Optimized batch sync queries -- Efficient detection query using index on updated_at-- Assumes: CREATE INDEX idx_customers_updated ON customers(updated_at);SELECT c.customer_id, c.customer_name, o.order_id, o.customer_name AS order_customer_nameFROM customers cJOIN orders o ON c.customer_id = o.customer_idWHERE c.updated_at > :last_sync_time AND c.customer_name != o.customer_nameORDER BY c.updated_atLIMIT 10000; -- Bulk correction with single UPDATE per customer-- More efficient than row-by-row updatesWITH stale_orders AS ( SELECT o.order_id, c.customer_name AS correct_name FROM orders o JOIN customers c ON o.customer_id = c.customer_id WHERE o.customer_name != c.customer_name AND c.customer_id IN (:customer_id_batch))UPDATE orders oSET customer_name = s.correct_name, denorm_synced_at = CURRENT_TIMESTAMPFROM stale_orders sWHERE o.order_id = s.order_id; -- For very large updates, use batched updates with progress trackingDO $$DECLARE batch_size INT := 10000; total_fixed INT := 0; batch_fixed INT;BEGIN LOOP UPDATE orders o SET customer_name = c.customer_name, denorm_synced_at = CURRENT_TIMESTAMP FROM customers c WHERE o.customer_id = c.customer_id AND o.customer_name != c.customer_name AND o.order_id IN ( SELECT order_id FROM orders WHERE customer_name != ( SELECT customer_name FROM customers WHERE customer_id = orders.customer_id ) LIMIT batch_size ); GET DIAGNOSTICS batch_fixed = ROW_COUNT; total_fixed := total_fixed + batch_fixed; -- Commit progress and log COMMIT; RAISE NOTICE 'Fixed % orders this batch, % total', batch_fixed, total_fixed; -- Exit when no more to fix EXIT WHEN batch_fixed = 0; -- Rate limit PERFORM pg_sleep(0.1); END LOOP;END $$;Always monitor database CPU, I/O, and replication lag during batch sync runs. If metrics show stress, increase pauses between batches or reduce batch size. A sync job that takes 2 hours is better than one that impacts production traffic.
For very large datasets, a single sync process may take too long or consume too many resources. Sharding the workload across multiple parallel workers dramatically improves throughput.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
// Parallel batch sync with hash-based sharding interface ShardConfig { shardIndex: number; totalShards: number;} class ShardedSyncWorker { constructor( private db: Database, private shardConfig: ShardConfig ) {} async sync(): Promise<SyncResult> { const result: SyncResult = { checked: 0, fixed: 0, errors: 0 }; // Only process records that hash to this shard const myRecords = await this.db.customers.find({ // Hash-based shard assignment $where: `(HASHTEXT(customer_id) % ${this.shardConfig.totalShards}) = ${this.shardConfig.shardIndex}` }); for (const customer of myRecords) { result.checked++; const fixed = await this.syncCustomerOrders(customer); result.fixed += fixed; } return result; } private async syncCustomerOrders(customer: Customer): Promise<number> { return this.db.orders.updateMany( { customerId: customer.id, customerName: { $ne: customer.name } }, { customerName: customer.name } ).then(r => r.count); }} // Coordinator launches multiple workersclass SyncCoordinator { async runParallelSync(numWorkers: number): Promise<SyncResult> { const workers = Array.from({ length: numWorkers }, (_, i) => new ShardedSyncWorker(this.db, { shardIndex: i, totalShards: numWorkers }) ); // Run all workers in parallel const results = await Promise.all( workers.map(w => w.sync()) ); // Aggregate results return results.reduce((acc, r) => ({ checked: acc.checked + r.checked, fixed: acc.fixed + r.fixed, errors: acc.errors + r.errors }), { checked: 0, fixed: 0, errors: 0 }); }} // Usage with distributed job system (e.g., Kubernetes Jobs)async function launchDistributedSync() { const totalShards = 10; for (let i = 0; i < totalShards; i++) { await kubernetes.createJob({ name: `denorm-sync-shard-${i}`, image: 'sync-worker:latest', env: { SHARD_INDEX: i.toString(), TOTAL_SHARDS: totalShards.toString() } }); }}When parallelizing, ensure sharding is deterministic and complete. Every record should be processed by exactly one worker. Use stable hash functions and verify that mod arithmetic correctly covers all cases.
Batch sync jobs must be resilient to failures. A job that crashes halfway through should be able to resume without re-processing or missing records.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
// Resilient batch sync with checkpoint recovery interface SyncJobState { jobId: string; startedAt: Date; status: 'running' | 'completed' | 'failed'; lastCheckpoint: { processedCount: number; lastProcessedId: string; lastProcessedAt: Date; }; result: SyncResult; errors: SyncError[];} class ResilientBatchSync { constructor( private db: Database, private stateStore: StateStore ) {} async run(jobId: string): Promise<SyncResult> { // Try to resume existing job or start new one let state = await this.stateStore.get(jobId) || this.initializeState(jobId); if (state.status === 'completed') { console.log(`Job ${jobId} already completed`); return state.result; } state.status = 'running'; await this.stateStore.save(state); try { await this.processFromCheckpoint(state); state.status = 'completed'; await this.stateStore.save(state); return state.result; } catch (error) { state.status = 'failed'; state.errors.push({ timestamp: new Date(), message: error.message, stack: error.stack }); await this.stateStore.save(state); throw error; } } private async processFromCheckpoint(state: SyncJobState): Promise<void> { let cursor = state.lastCheckpoint.lastProcessedId; const batchSize = 1000; const checkpointInterval = 10; // Save checkpoint every 10 batches let batchCount = 0; while (true) { const batch = await this.db.customers.findWithCursor({ cursor, limit: batchSize, orderBy: { id: 'asc' } }); if (batch.data.length === 0) break; for (const customer of batch.data) { try { const fixed = await this.syncCustomer(customer); state.result.checked++; state.result.fixed += fixed; } catch (error) { state.result.errors++; if (state.errors.length < 1000) { // Limit error log size state.errors.push({ timestamp: new Date(), recordId: customer.id, message: error.message }); } // Continue processing other records console.error(`Error syncing customer ${customer.id}:`, error); } } // Update progress cursor = batch.nextCursor; state.lastCheckpoint = { processedCount: state.result.checked, lastProcessedId: cursor, lastProcessedAt: new Date() }; batchCount++; // Periodic checkpoint save if (batchCount % checkpointInterval === 0) { await this.stateStore.save(state); console.log(`Checkpoint saved: ${state.result.checked} processed`); } } } private initializeState(jobId: string): SyncJobState { return { jobId, startedAt: new Date(), status: 'running', lastCheckpoint: { processedCount: 0, lastProcessedId: '', lastProcessedAt: new Date() }, result: { checked: 0, fixed: 0, errors: 0 }, errors: [] }; }}Batch sync jobs generate valuable data about system health and data quality. Proper reporting and alerting turn this data into actionable insights.
| Metric | Description | Alert Threshold |
|---|---|---|
| Inconsistency Rate | Percentage of records with discrepancies | 0.1% (investigate), > 1% (critical) |
| Time to Sync | Duration of sync job completion | 2x historical average |
| Records Processed | Total records checked per run | Unexpected drop (data access issues) |
| Records Fixed | Total corrections applied | Sudden spike (indicates new bug) |
| Error Count | Processing errors during sync | 0 should trigger review |
| Lag (for continuous) | Time since oldest unprocessed change | SLA target (e.g., 1 hour) |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
// Comprehensive sync reporting and alerting interface SyncReport { jobId: string; startTime: Date; endTime: Date; durationMs: number; recordsChecked: number; recordsFixed: number; errorCount: number; inconsistencyRate: number; // fixed / checked throughput: number; // records / second topErrors: Array<{ message: string; count: number }>; fixesByType: Record<string, number>;} class SyncReporter { constructor( private metricsService: MetricsService, private alertService: AlertService ) {} async processResult(result: SyncJobState): Promise<SyncReport> { const report = this.buildReport(result); // Emit metrics await this.metricsService.recordGauge('sync.inconsistency_rate', report.inconsistencyRate, { job: result.jobId }); await this.metricsService.recordCounter('sync.records_checked', report.recordsChecked); await this.metricsService.recordCounter('sync.records_fixed', report.recordsFixed); await this.metricsService.recordHistogram('sync.duration_ms', report.durationMs); // Evaluate alert conditions await this.evaluateAlerts(report); // Store report for historical analysis await this.storeReport(report); return report; } private async evaluateAlerts(report: SyncReport): Promise<void> { // High inconsistency rate if (report.inconsistencyRate > 0.01) { // > 1% await this.alertService.critical( `Critical: ${(report.inconsistencyRate * 100).toFixed(2)}% data inconsistency detected`, { jobId: report.jobId, recordsFixed: report.recordsFixed, recordsChecked: report.recordsChecked } ); } else if (report.inconsistencyRate > 0.001) { // > 0.1% await this.alertService.warning( `Elevated inconsistency rate: ${(report.inconsistencyRate * 100).toFixed(3)}%`, { jobId: report.jobId } ); } // Processing errors if (report.errorCount > 0) { await this.alertService.warning( `Sync job encountered ${report.errorCount} errors`, { jobId: report.jobId, topErrors: report.topErrors } ); } // Duration anomaly (compare to historical average) const avgDuration = await this.getHistoricalAvgDuration(report.jobId); if (report.durationMs > avgDuration * 2) { await this.alertService.warning( `Sync job took ${report.durationMs}ms, 2x longer than average`, { jobId: report.jobId, avgDuration } ); } } private buildReport(result: SyncJobState): SyncReport { const durationMs = result.lastCheckpoint.lastProcessedAt.getTime() - result.startedAt.getTime(); return { jobId: result.jobId, startTime: result.startedAt, endTime: result.lastCheckpoint.lastProcessedAt, durationMs, recordsChecked: result.result.checked, recordsFixed: result.result.fixed, errorCount: result.result.errors, inconsistencyRate: result.result.checked > 0 ? result.result.fixed / result.result.checked : 0, throughput: result.result.checked / (durationMs / 1000), topErrors: this.aggregateErrors(result.errors), fixesByType: {} // Populated based on your categorization }; }}A 0.1% inconsistency rate might be normal—or it might be 10x higher than last week. Always compare current results to historical baselines. Sudden spikes indicate new bugs; gradual increases suggest systemic issues.
Batch synchronization provides a critical safety net for denormalized data consistency. It detects and corrects issues that slip through real-time enforcement, ensuring bounded inconsistency windows. Let's consolidate the key insights:
What's Next:
With real-time enforcement and batch synchronization in place, how do we know our consistency mechanisms are working? The final page explores monitoring for data integrity—building observability systems that detect anomalies, track consistency metrics, and provide confidence that denormalized data remains accurate.
You now understand how to design, implement, and operate batch synchronization processes for denormalized data. These scheduled reconciliation jobs provide the safety net that ensures inconsistencies—however they arise—are detected and corrected within bounded time windows.