Loading content...
Change Data Capture isn't merely a technical pattern—it's a capability enabler that unlocks entire categories of solutions previously requiring complex, fragile custom implementations. Once you have reliable change streams flowing from your databases, a world of possibilities opens.
This page examines the most impactful CDC use cases in production systems. For each, we'll explore the problem, why CDC is the right solution, and implementation patterns with working examples.
By the end of this page, you will understand the major production use cases for CDC, have implementation patterns for each scenario, and be able to recognize opportunities to apply CDC in your own systems. You'll see how CDC transforms challenging distributed systems problems into straightforward data pipeline tasks.
The Challenge:
In microservices architectures, each service owns its data. But services frequently need to query data owned by other services. Making synchronous calls creates tight coupling, latency, and availability concerns. The solution is data replication—maintaining local copies of data from other services.
Why CDC Solves This:
CDC enables continuous, reliable replication without source application changes. The consuming service maintains a local replica that stays synchronized with sub-second latency.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
interface ProductCDCEvent { before: Product | null; after: Product | null; op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) source: { ts_ms: number; table: string; lsn: number; };} class ProductReplicaConsumer { constructor( private readonly replicaDb: Database, private readonly kafka: KafkaConsumer ) {} async start(): Promise<void> { await this.kafka.subscribe({ topic: 'product-db.public.products', fromBeginning: true // Start from snapshot for new replicas }); await this.kafka.run({ eachMessage: async ({ message }) => { const event: ProductCDCEvent = JSON.parse(message.value!.toString()); await this.handleEvent(event); } }); } private async handleEvent(event: ProductCDCEvent): Promise<void> { switch (event.op) { case 'c': // Create case 'r': // Snapshot read (treat as create) await this.replicaDb.upsert('products', event.after!); break; case 'u': // Update await this.replicaDb.upsert('products', event.after!); break; case 'd': // Delete await this.replicaDb.delete('products', event.before!.id); break; } // Track replication lag for monitoring const lagMs = Date.now() - event.source.ts_ms; metrics.recordHistogram('replication_lag_ms', lagMs); }}The Challenge:
Caches are essential for performance, but cache invalidation is famously "one of the two hard problems in computer science." The classic approaches are:
Why CDC Solves This:
CDC provides reactive cache invalidation. When data changes in the database, the cache is automatically invalidated or updated—no application code changes, no missed invalidations, no dual writes.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
class CDCCacheInvalidator { constructor( private readonly redis: RedisClient, private readonly kafka: KafkaConsumer ) {} async start(): Promise<void> { // Subscribe to all product-related tables await this.kafka.subscribe({ topics: [ 'inventory-db.public.products', 'inventory-db.public.product_prices', 'inventory-db.public.product_inventory' ] }); await this.kafka.run({ eachMessage: async ({ topic, message }) => { const event = JSON.parse(message.value!.toString()); await this.invalidate(topic, event); } }); } private async invalidate(topic: string, event: CDCEvent): Promise<void> { // Extract entity ID from before or after state const entityId = event.after?.product_id || event.before?.product_id; // Invalidate all cache keys related to this entity const keysToInvalidate = this.getCacheKeys(topic, entityId); // Use pipeline for efficiency const pipeline = this.redis.pipeline(); for (const key of keysToInvalidate) { pipeline.del(key); } await pipeline.exec(); console.log(`Invalidated ${keysToInvalidate.length} keys for ${entityId}`); } private getCacheKeys(topic: string, entityId: string): string[] { // Different tables affect different cache keys const keys: string[] = []; keys.push(`product:${entityId}`); // Single product keys.push(`product:${entityId}:details`); // Product details keys.push('products:featured'); // Listing may include this product keys.push('products:catalog'); // Catalog view if (topic.includes('product_prices')) { keys.push(`product:${entityId}:price`); keys.push('products:on-sale'); // Sale listings } if (topic.includes('product_inventory')) { keys.push(`product:${entityId}:stock`); keys.push('products:in-stock'); } return keys; }}Use invalidation when: cache keys are complex/unpredictable, cache population is expensive and rarely needed, or you prefer simplicity. Use write-through when: cache hits must remain high (latency-sensitive), derived data (aggregations, sorted sets) must stay synchronized, or stale data is unacceptable.
The Challenge:
Search engines like Elasticsearch or Algolia require denormalized, search-optimized documents. Keeping these indexes synchronized with the source of truth database is challenging:
Why CDC Solves This:
CDC streams changes to a search index consumer that applies incremental updates. Each change updates only the affected document(s), and indexing happens in near-real-time.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
class ProductSearchIndexer { constructor( private readonly es: ElasticsearchClient, private readonly kafka: KafkaConsumer ) {} async start(): Promise<void> { // Subscribe to all tables that contribute to the product document await this.kafka.subscribe({ topics: [ 'inventory-db.public.products', 'inventory-db.public.product_categories', 'inventory-db.public.product_reviews', 'inventory-db.public.product_inventory' ] }); await this.kafka.run({ eachBatch: async ({ batch }) => { // Process in batches for Elasticsearch bulk API efficiency const bulkOperations: any[] = []; for (const message of batch.messages) { const event = JSON.parse(message.value!.toString()); const ops = await this.convertToEsBulkOps(message.topic, event); bulkOperations.push(...ops); } if (bulkOperations.length > 0) { await this.es.bulk({ operations: bulkOperations }); } } }); } private async convertToEsBulkOps(topic: string, event: CDCEvent): Promise<any[]> { const productId = this.extractProductId(topic, event); if (event.op === 'd') { // For deletes on main product table, delete document if (topic.includes('.products')) { return [ { delete: { _index: 'products', _id: productId } } ]; } // For related tables, reindex to update the document } // For creates/updates, build denormalized document const document = await this.buildSearchDocument(productId); return [ { index: { _index: 'products', _id: productId } }, document ]; } private async buildSearchDocument(productId: string): Promise<object> { // Fetch current state from replica or source // This denormalized document includes data from multiple tables const product = await this.db.query(` SELECT p.*, array_agg(DISTINCT c.name) as categories, avg(r.rating) as avg_rating, count(r.id) as review_count, i.quantity as stock_quantity FROM products p LEFT JOIN product_categories pc ON p.id = pc.product_id LEFT JOIN categories c ON pc.category_id = c.id LEFT JOIN product_reviews r ON p.id = r.product_id LEFT JOIN product_inventory i ON p.id = i.product_id WHERE p.id = $1 GROUP BY p.id, i.quantity `, [productId]); return { id: product.id, name: product.name, description: product.description, price: product.price, categories: product.categories, avgRating: product.avg_rating || 0, reviewCount: product.review_count || 0, inStock: (product.stock_quantity || 0) > 0, // Full-text search fields searchText: `${product.name} ${product.description}`.toLowerCase(), // Facets priceRange: this.getPriceRange(product.price), ratingBucket: Math.floor(product.avg_rating || 0) }; }}When a search document spans multiple source tables (products + reviews + inventory), a change to ANY of these tables requires reindexing the parent document. Use careful topic subscription and document rebuild logic. Consider caching intermediate states to reduce database queries during reindexing.
The Challenge:
Analytics requires loading operational data into warehouses (Snowflake, BigQuery, Redshift). Traditional ETL runs batch jobs that:
Why CDC Solves This:
CDC provides streaming ETL (ELT)—changes flow continuously to the warehouse, enabling near-real-time analytics without impacting source systems.
The Raw Landing Pattern:
Append ALL CDC events to raw tables, preserving full history. Transform during query time.
-- Raw table structure (immutable)
CREATE TABLE raw.orders_cdc (
-- CDC metadata
event_id STRING, -- Unique event identifier
event_ts TIMESTAMP, -- When event was captured
operation STRING, -- 'c', 'u', 'd'
source_lsn STRING, -- Log sequence number
-- Before image (JSON for flexibility)
before_state VARIANT,
-- After image (JSON for flexibility)
after_state VARIANT,
-- Partitioning for query efficiency
event_date DATE,
-- Processing metadata
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
-- Latest state view (for analytics)
CREATE VIEW curated.orders_current AS
SELECT
after_state:order_id::number AS order_id,
after_state:customer_id::number AS customer_id,
after_state:status::string AS status,
after_state:total::number(10,2) AS total,
event_ts AS last_updated
FROM raw.orders_cdc
QUALIFY ROW_NUMBER() OVER (
PARTITION BY after_state:order_id
ORDER BY source_lsn DESC
) = 1
WHERE operation != 'd'; -- Exclude deleted records
Benefits:
Trade-offs:
The Challenge:
Microservices need to react to events from other services. Options include:
Why CDC Solves This:
CDC generates events from any database write, making the database itself an event source. Services subscribe to change topics and react asynchronously.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
// Order Service writes to database - nothing specialclass OrderService { async createOrder(orderData: CreateOrderDto): Promise<Order> { // Just write to database - CDC handles event emission return await this.db.orders.create(orderData); } async updateOrderStatus(orderId: string, status: OrderStatus): Promise<void> { await this.db.orders.update({ where: { id: orderId }, data: { status, updatedAt: new Date() } }); // No need to emit events - CDC captures this automatically }} // Notification Service reacts to order changes via CDCclass NotificationService { async startEventProcessor(): Promise<void> { await this.kafka.subscribe({ topic: 'order-db.public.orders' }); await this.kafka.run({ eachMessage: async ({ message }) => { const event: CDCEvent = JSON.parse(message.value!.toString()); // Only process updates where status changed if (event.op === 'u' && this.statusChanged(event)) { await this.handleStatusChange(event.before!, event.after!); } // New order created if (event.op === 'c') { await this.sendOrderConfirmation(event.after!); } } }); } private statusChanged(event: CDCEvent): boolean { return event.before?.status !== event.after?.status; } private async handleStatusChange(before: Order, after: Order): Promise<void> { const transitions: Record<string, () => Promise<void>> = { 'pending→confirmed': () => this.sendOrderConfirmed(after), 'confirmed→shipped': () => this.sendShippingNotification(after), 'shipped→delivered': () => this.sendDeliveryConfirmation(after), 'any→cancelled': () => this.sendCancellationNotice(after) }; const transitionKey = `${before.status}→${after.status}`; const handler = transitions[transitionKey] || transitions['any→' + after.status]; if (handler) { await handler(); } }} // Inventory Service reacts to order creationclass InventoryService { async startEventProcessor(): Promise<void> { await this.kafka.subscribe({ topic: 'order-db.public.order_items' }); await this.kafka.run({ eachMessage: async ({ message }) => { const event: CDCEvent = JSON.parse(message.value!.toString()); if (event.op === 'c') { // New order item - reserve inventory await this.reserveInventory( event.after!.product_id, event.after!.quantity ); } if (event.op === 'd') { // Order item cancelled - release inventory await this.releaseInventory( event.before!.product_id, event.before!.quantity ); } } }); }}CDC events are low-level (row changed) while domain events are high-level (order placed). For complex business logic, consider using CDC to trigger a service that then publishes rich domain events. This gives you both the reliability of CDC and the semantic richness of domain events.
The Challenge:
Regulations (GDPR, SOX, HIPAA, PCI-DSS) require detailed audit trails of data access and changes. Building audit systems is complex:
Why CDC Solves This:
CDC captures every change with before/after states and timestamps—exactly what auditors need. The audit trail is external to the source database, tamper-resistant, and complete.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
interface AuditRecord { auditId: string; timestamp: Date; tableName: string; operation: 'INSERT' | 'UPDATE' | 'DELETE'; recordId: string; beforeState: Record<string, any> | null; afterState: Record<string, any> | null; changedFields: string[]; source: { transactionId: string; lsn: string; connector: string; };} class AuditTrailConsumer { constructor( private readonly auditStore: AuditStore, // Immutable store (S3, immutable DB) private readonly kafka: KafkaConsumer ) {} async start(): Promise<void> { // Subscribe to ALL tables for comprehensive audit await this.kafka.subscribe({ topicPattern: /inventory-db\.public\..*/ }); await this.kafka.run({ eachMessage: async ({ topic, message }) => { const event = JSON.parse(message.value!.toString()); const auditRecord = this.createAuditRecord(topic, event); // Store to immutable audit log await this.auditStore.append(auditRecord); // Check for sensitive data access (PII columns) if (this.containsSensitiveData(auditRecord)) { await this.alertSecurityTeam(auditRecord); } } }); } private createAuditRecord(topic: string, event: CDCEvent): AuditRecord { const tableName = topic.split('.').pop()!; const recordId = this.extractRecordId(event); return { auditId: uuid(), timestamp: new Date(event.source.ts_ms), tableName, operation: this.mapOperation(event.op), recordId, beforeState: event.before, afterState: event.after, changedFields: this.detectChangedFields(event.before, event.after), source: { transactionId: event.source.txId, lsn: event.source.lsn, connector: event.source.connector } }; } private detectChangedFields(before: any, after: any): string[] { if (!before) return Object.keys(after || {}); // INSERT if (!after) return Object.keys(before || {}); // DELETE // UPDATE: find which fields actually changed const allFields = new Set([...Object.keys(before), ...Object.keys(after)]); return Array.from(allFields).filter(field => JSON.stringify(before[field]) !== JSON.stringify(after[field]) ); } private containsSensitiveData(record: AuditRecord): boolean { const sensitiveFields = ['ssn', 'credit_card', 'password_hash', 'email', 'phone']; return record.changedFields.some(f => sensitiveFields.includes(f.toLowerCase())); }}| Regulation | Requirement | CDC Solution |
|---|---|---|
| GDPR | Data subject access requests | Query audit log for all changes to user's data |
| GDPR | Right to erasure proof | Audit trail shows deletion event with timestamp |
| SOX | Financial data integrity | Immutable log of all financial record changes |
| HIPAA | Access logging | Every PHI access/modification captured |
| PCI-DSS | Cardholder data protection | Track all access to payment data fields |
The Challenge:
Migrating databases—whether schema changes, technology changes (MySQL→PostgreSQL), or cloud migrations—traditionally requires downtime. The longer the data, the longer the migration window.
Why CDC Solves This:
CDC enables online migration by continuously syncing changes while the bulk migration progresses. The cutover window shrinks from hours to seconds.
For extra safety, run 'shadow writes' where the application writes to both old and new databases (but reads only from old). CDC validates that the new database is receiving correct data. This pattern catches application-level issues before cutover.
We've explored the major production use cases for Change Data Capture. Here are the key insights:
What's Next:
Now that you understand what CDC can do, we'll examine how to architect CDC pipelines for production. The next page covers CDC Architecture Patterns—topologies, scalability, failure handling, and operational patterns for building robust change data capture systems.
You now have a comprehensive toolkit of CDC use cases with implementation patterns. You can recognize opportunities to apply CDC in your systems and understand the architecture for each scenario. Next, we'll dive into the patterns for building production-grade CDC architectures.