Loading content...
Understanding CDC concepts and use cases is necessary but not sufficient for production systems. Architecture decisions—how you structure pipelines, handle failures, scale throughput, and manage operations—determine whether your CDC implementation is robust or fragile.
This page synthesizes architectural patterns proven in production CDC deployments. We'll cover topologies for different scales, patterns for handling the inevitable failures, strategies for monitoring and operations, and guidance for making sound architectural decisions.
By the end of this page, you will understand the architectural patterns for building production CDC systems, be able to design pipelines that handle failures gracefully, implement monitoring and alerting strategies, and make informed decisions about scaling and topology based on your requirements.
The way you structure your CDC pipeline significantly impacts scalability, manageability, and failure domains. Let's examine the common topologies:
The Hub and Spoke Pattern:
All source databases connect to a central streaming platform (the "hub"), and consumers read from the hub.
┌─────────────┐
┌─────────────▶ │ Kafka │ ──────────────┐
│ │ (Hub) │ │
│ ┌─────────▶ │ │ ─────┐ │
│ │ └─────────────┘ │ │
│ │ │ │ │
┌───┴───┴───┐ │ ┌────▼────┐ │
│ Debezium │ │ │Consumer │ │
│ Connectors│ │ │ Apps │ │
└───────────┘ │ └─────────┘ │
▲ ▲ │ ▲ │
│ │ └──────────────┘ │
┌───┴───┴───┐ │
│ PostgreSQL│ ┌────────▼┐
│ MySQL │ │Analytics│
│ MongoDB │ └─────────┘
└───────────┘
Advantages:
Disadvantages:
Best For:
CDC pipelines must scale with data growth. Unlike application scaling, CDC scaling has unique constraints because transaction logs must be read sequentially. Here's how to scale each component:
| Dimension | Challenge | Solution |
|---|---|---|
| More tables | Single connector handles all tables | Multiple connectors, each with table subset |
| Higher throughput | Log reading is sequential | Parallel consumers on partitioned topics |
| More consumers | Shared change stream | Kafka topics with consumer groups |
| Larger transactions | Memory exhaustion | Tune queue sizes, streaming mode |
| More databases | Connector proliferation | Centralized Kafka Connect cluster |
Scaling the Connector (Ingestion):
Debezium connectors read logs sequentially—you can't parallelize a single connector's log reading. But you can:
snapshot.max.threads for initial load// High-throughput table gets dedicated connector
{
"name": "orders-connector",
"config": {
"table.include.list": "public.orders,public.order_items",
"max.batch.size": 4096,
"max.queue.size": 32768
}
}
// Lower-throughput tables share a connector
{
"name": "reference-data-connector",
"config": {
"table.include.list": "public.products,public.categories,public.users",
"max.batch.size": 1024
}
}
Scaling Consumers (Processing):
Consumer scaling leverages Kafka's partitioning:
// Producer partitions by entity ID
producer.send({
topic: 'cdc.orders',
key: event.after.order_id, // Partition key
value: JSON.stringify(event)
});
// Consumers in same group divide partitions
// 12 partitions with 4 consumers = 3 partitions each
consumer.subscribe({ topic: 'cdc.orders', fromBeginning: true });
// Each consumer processes its partition subset in parallel
// Order guaranteed within partition (same order_id → same partition)
Partitioning Strategy:
| Strategy | Description | Use When |
|---|---|---|
| By record key | Hash of primary key | General purpose |
| By customer | Hash of customer_id | Customer-centric processing |
| By table | Route tables to partitions | Table-specific consumers |
| Round-robin | No ordering guarantee | Maximum parallelism, order doesn't matter |
Kafka guarantees order only within a partition. If you partition by order_id, all changes to order 12345 stay ordered. But changes across different orders may be processed out of order. If you need strict global ordering (rare), you need a single partition—which limits throughput to one consumer.
CDC pipelines will fail. Networks partition, databases restart, consumers crash. Robust architectures anticipate failures and recover gracefully.
123456789101112131415161718192021222324252627
{ "name": "resilient-connector", "config": { // Resume behavior on failure "errors.tolerance": "all", // Continue on errors (vs "none") "errors.log.enable": "true", // Log errors for investigation "errors.log.include.messages": "true", // Include event data in logs // Dead letter queue for failed events "errors.deadletterqueue.topic.name": "cdc-dlq", "errors.deadletterqueue.context.headers.enable": "true", // Offset management "offset.flush.interval.ms": "1000", // Commit offsets frequently "offset.flush.timeout.ms": "5000", // Timeout for offset commits // Connection retry "connect.poll.interval": "1000", "connect.max.retries": "10", // Heartbeat for detecting hung connections "heartbeat.interval.ms": "10000", // Signal table for controlled operations "signal.data.collection": "public.debezium_signal" }}Achieving exactly-once semantics in CDC pipelines is critical for data integrity. A payment processed twice or an inventory deduction missed causes real business harm.
The Challenge:
With at-least-once delivery (the default), duplicates occur when:
The Solution: Idempotent Consumers + Transactional Semantics
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
class IdempotentCDCConsumer { constructor( private db: Database, private redis: Redis, // For deduplication private kafka: KafkaConsumer ) {} async processEvent(event: CDCEvent): Promise<void> { // Create unique idempotency key from source position const idempotencyKey = this.createIdempotencyKey(event); // Check if already processed (cheap Redis lookup) const alreadyProcessed = await this.redis.get(idempotencyKey); if (alreadyProcessed) { console.log(`Skipping duplicate event: ${idempotencyKey}`); metrics.increment('cdc.duplicates.skipped'); return; } // Process with database transaction await this.db.transaction(async (tx) => { // Apply the change await this.applyChange(tx, event); // Record processing in same transaction await tx.execute(` INSERT INTO processed_events (event_key, processed_at) VALUES ($1, NOW()) ON CONFLICT (event_key) DO NOTHING `, [idempotencyKey]); }); // Mark as processed in Redis (with TTL for cleanup) await this.redis.set(idempotencyKey, '1', 'EX', 86400 * 7); // 7 days } private createIdempotencyKey(event: CDCEvent): string { // Use source position as globally unique key // Format: connector-name:table:lsn or txId:position return `${event.source.name}:${event.source.table}:${event.source.lsn}`; } private async applyChange(tx: Transaction, event: CDCEvent): Promise<void> { // Use upsert semantics for natural idempotency switch (event.op) { case 'c': case 'u': case 'r': await tx.execute(` INSERT INTO orders (id, status, amount, updated_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status, amount = EXCLUDED.amount, updated_at = EXCLUDED.updated_at WHERE orders.updated_at < EXCLUDED.updated_at `, [ event.after.id, event.after.status, event.after.amount, new Date(event.source.ts_ms) ]); break; case 'd': await tx.execute(` DELETE FROM orders WHERE id = $1 `, [event.before.id]); break; } }}CDC pipelines require comprehensive monitoring. Unlike request-response systems where failures are immediately visible, CDC failures can silently accumulate stale data until a user notices inconsistencies.
| Metric | Description | Alert Threshold |
|---|---|---|
| cdc.lag.seconds | Time between source commit and Kafka publish | 10 seconds |
| cdc.consumer.lag.messages | Messages waiting in Kafka for consumer | 10,000 messages |
| cdc.connector.status | Connector running/paused/failed | != running |
| cdc.replication.slot.lag.bytes | Bytes behind in source log | 1 GB |
| cdc.dlq.messages.count | Dead letter queue size | 0 |
| cdc.events.processed.rate | Events per second flowing through | < baseline - 50% |
| cdc.errors.rate | Errors per second | 1/minute |
| cdc.snapshot.progress | Snapshot completion percentage | Stalled > 1 hour |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
class CDCMonitor { constructor( private metrics: MetricsClient, private alertManager: AlertManager ) {} async monitorPipeline(): Promise<void> { // Monitor every 30 seconds setInterval(async () => { await Promise.all([ this.checkConnectorHealth(), this.checkConsumerLag(), this.checkReplicationSlot(), this.checkDLQ() ]); }, 30000); } private async checkConnectorHealth(): Promise<void> { const connectors = await this.kafkaConnect.getConnectors(); for (const connector of connectors) { const status = await this.kafkaConnect .getConnectorStatus(connector.name); this.metrics.gauge( 'cdc.connector.status', status.state === 'RUNNING' ? 1 : 0, { connector: connector.name } ); if (status.state !== 'RUNNING') { await this.alertManager.fire({ severity: 'critical', summary: `CDC Connector ${connector.name} is ${status.state}`, description: `Connector stopped: ${status.trace || 'Unknown error'}` }); } // Check individual tasks for (const task of status.tasks) { if (task.state !== 'RUNNING') { this.metrics.increment('cdc.task.failures', { connector: connector.name, taskId: task.id }); } } } } private async checkConsumerLag(): Promise<void> { const consumerGroups = await this.kafka.admin().describeGroups(); for (const group of consumerGroups) { const offsets = await this.kafka.admin() .fetchOffsets({ groupId: group.groupId }); for (const topicOffset of offsets) { const endOffsets = await this.kafka.admin() .fetchTopicOffsets(topicOffset.topic); const lag = endOffsets.reduce((sum, partition) => { const consumed = topicOffset.partitions .find(p => p.partition === partition.partition)?.offset || '0'; return sum + (parseInt(partition.high) - parseInt(consumed)); }, 0); this.metrics.gauge('cdc.consumer.lag.messages', lag, { topic: topicOffset.topic, consumerGroup: group.groupId }); if (lag > 10000) { await this.alertManager.fire({ severity: 'warning', summary: `High consumer lag for ${group.groupId}`, description: `Lag: ${lag} messages on ${topicOffset.topic}` }); } } } } private async checkReplicationSlot(): Promise<void> { // PostgreSQL specific const slots = await this.db.query(` SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag_bytes FROM pg_replication_slots WHERE slot_type = 'logical' `); for (const slot of slots) { this.metrics.gauge('cdc.replication.slot.lag.bytes', slot.lag_bytes, { slot: slot.slot_name }); // 1GB lag is critical if (slot.lag_bytes > 1073741824) { await this.alertManager.fire({ severity: 'critical', summary: `Replication slot ${slot.slot_name} critically behind`, description: `Lag: ${(slot.lag_bytes / 1e9).toFixed(2)} GB. Risk of WAL overflow.` }); } } }}Even with robust CDC pipelines, data drift can occur. Schema changes, bugs, or missed events can cause source and target to diverge. Periodic reconciliation verifies data consistency and detects issues before they become customer-visible.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
class CDCReconciler { async reconcileTable( sourceDb: Database, targetDb: Database, table: string, options: ReconcileOptions = {} ): Promise<ReconciliationResult> { const { sampleRate = 0.01, // Check 1% of rows batchSize = 1000, compareColumns = ['*'] // Or specific columns } = options; const result: ReconciliationResult = { table, sampledRows: 0, matchingRows: 0, missingInTarget: 0, extraInTarget: 0, differentValues: 0, discrepancies: [] }; // Get row count for sampling const [{ count }] = await sourceDb.query( `SELECT COUNT(*) FROM ${table}` ); const sampleSize = Math.ceil(count * sampleRate); // Sample random rows from source const sourceRows = await sourceDb.query(` SELECT * FROM ${table} ORDER BY RANDOM() LIMIT ${sampleSize} `); // Check each sampled row against target for (const batch of this.chunkArray(sourceRows, batchSize)) { const ids = batch.map(r => r.id); const targetRows = await targetDb.query(` SELECT * FROM ${table} WHERE id = ANY($1) `, [ids]); const targetMap = new Map(targetRows.map(r => [r.id, r])); for (const sourceRow of batch) { result.sampledRows++; const targetRow = targetMap.get(sourceRow.id); if (!targetRow) { result.missingInTarget++; result.discrepancies.push({ type: 'missing', id: sourceRow.id, source: sourceRow, target: null }); } else if (!this.rowsEqual(sourceRow, targetRow, compareColumns)) { result.differentValues++; result.discrepancies.push({ type: 'different', id: sourceRow.id, source: sourceRow, target: targetRow, differences: this.findDifferences(sourceRow, targetRow) }); } else { result.matchingRows++; } } } // Log and alert const matchRate = result.matchingRows / result.sampledRows; if (matchRate < 0.99) { // Less than 99% match await this.alertManager.fire({ severity: 'warning', summary: `CDC reconciliation found discrepancies in ${table}`, description: `Match rate: ${(matchRate * 100).toFixed(2)}%. ` + `Missing: ${result.missingInTarget}, Different: ${result.differentValues}` }); } return result; } // Repair function to fix discrepancies async repairDiscrepancies( result: ReconciliationResult, targetDb: Database ): Promise<void> { for (const discrepancy of result.discrepancies) { if (discrepancy.type === 'missing' || discrepancy.type === 'different') { // Upsert correct value from source await targetDb.upsert(result.table, discrepancy.source); } } console.log(`Repaired ${result.discrepancies.length} discrepancies`); }}Production CDC requires documented procedures for common operational scenarios. Here are runbooks for critical situations:
Runbook: CDC Connector Failed
Symptoms: Alert: connector status != RUNNING, no new events in Kafka
Diagnosis Steps:
Check connector status:
curl -s http://connect:8083/connectors/<name>/status | jq
Check connector logs:
kubectl logs -l app=kafka-connect --tail=100 | grep ERROR
Common causes:
Resolution:
| Cause | Fix |
|---|---|
| DB unreachable | Verify connectivity, credentials |
| Slot deleted | Recreate slot, may need re-snapshot |
| Schema break | Fix schema or update connector config |
| OOM | Increase connector memory, reduce batch size |
Recovery:
# Restart connector
curl -X POST http://connect:8083/connectors/<name>/restart
# If tasks specifically failed
curl -X POST http://connect:8083/connectors/<name>/tasks/0/restart
Post-Incident:
We've covered the architectural patterns essential for production CDC systems. Here are the key takeaways:
Module Complete:
You've now completed the comprehensive coverage of Change Data Capture. You understand:
This knowledge positions you to design, implement, and operate CDC pipelines that form the backbone of real-time data infrastructure.
Congratulations! You've mastered Change Data Capture—from fundamentals through production architecture. You now have the knowledge to implement reliable CDC pipelines that enable real-time data synchronization, event-driven architectures, and streaming analytics. Apply these patterns to build systems that treat data changes as first-class events.