Loading learning content...
In the previous page, we established that data duplication is often necessary in microservices. The question becomes: how do we keep those copies synchronized? The answer, in most modern systems, is event-driven data synchronization.
The fundamental idea is simple: when the owner of data changes it, the owner publishes an event. Services that maintain copies subscribe to these events and update their local views accordingly. There's no direct coupling between producer and consumer—only a shared understanding of event structure.
This pattern enables the loose coupling that microservices promise. The Customer Service doesn't know which services consume customer events or what they do with them. The Order Service doesn't care when Customer Service publishes—it just updates when events arrive. Each service evolves independently.
By the end of this page, you will understand the mechanics of event-driven synchronization, event design patterns for data sync, handling event ordering and idempotency, building reliable event consumers, and the infrastructure considerations for production event systems.
Event-driven data synchronization follows a publish-subscribe pattern. The data owner (publisher) emits events when data changes. Services that need copies (subscribers) consume events and update their local state.
The flow:
1. Customer Service updates customer email
2. Customer Service writes to its database
3. Customer Service publishes CustomerEmailChanged event
4. Event broker (Kafka, RabbitMQ, etc.) stores and delivers event
5. Order Service receives event
6. Order Service updates its local customer view
7. Notification Service receives event
8. Notification Service updates its contact preferences
Each step is asynchronous. The Customer Service doesn't wait for consumers to process. Consumers process at their own pace. If a consumer is down, events queue until it recovers.
| Component | Role | Responsibility |
|---|---|---|
| Publisher | Data owner service | Emit events after state changes; ensure at-least-once delivery to broker |
| Event Broker | Message infrastructure | Store events durably; deliver to all subscribers; maintain ordering |
| Subscriber | Consumer service | Process events idempotently; update local state; handle failures |
| Event | Data change description | Carry enough information for consumers to update; include metadata |
| Consumer Group | Subscriber instances | Allow scaling consumers; balance load; track processing position |
Why events, not direct updates?
You might wonder: why doesn't Customer Service just call Order Service directly when customer data changes? Several reasons:
Event-driven sync is inherently eventually consistent. There's a window between the publisher writing and the subscriber updating. This window is typically milliseconds to seconds in healthy systems, but can extend during backpressure or failures. Your design must tolerate this latency.
Event design significantly impacts how well your sync works. Poor event design leads to lost updates, duplicate processing, and debugging nightmares. Good design makes sync reliable and understandable.
Thin Events contain minimal information—typically just an identifier and the type of change.
{
"type": "CustomerUpdated",
"customerId": "cust_123",
"timestamp": "2024-01-15T10:30:00Z"
}
Consumers must call the source service to get current data.
Fat Events contain the full entity state or all changed fields.
{
"type": "CustomerUpdated",
"customerId": "cust_123",
"timestamp": "2024-01-15T10:30:00Z",
"data": {
"name": "Jane Doe",
"email": "jane.new@example.com",
"status": "active"
}
}
Consumers have all needed information without additional calls.
Recommendation for data sync: Use fat events. For synchronization purposes, fat events are generally preferred because:
Every sync event should include:
| Field | Purpose | Example |
|---|---|---|
eventId | Unique identifier for deduplication | evt_a1b2c3d4 |
eventType | Distinguishes event types | CustomerCreated |
aggregateId | The entity this event concerns | cust_123 |
timestamp | When the change occurred | ISO 8601 date |
version | Entity version after change | 15 |
correlationId | Trace across services | Request ID |
causationId | What triggered this event | Previous event ID |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
// ===================================================// COMPREHENSIVE EVENT STRUCTURE FOR DATA SYNC// ===================================================// A well-designed event includes everything consumers// need for reliable, idempotent processing.// =================================================== interface BaseEvent { // Unique event identifier - for deduplication eventId: string; // Event type discriminator eventType: string; // The entity this event describes aggregateType: string; aggregateId: string; // Ordering within the entity - critical for sync version: number; // Timing information timestamp: Date; // Tracing correlationId: string; // Original request ID causationId: string; // Event that caused this event // Source information sourceService: string; sourceInstance: string;} interface CustomerCreatedEvent extends BaseEvent { eventType: 'CustomerCreated'; aggregateType: 'Customer'; data: { email: string; name: string; status: 'active' | 'pending'; createdAt: Date; };} interface CustomerUpdatedEvent extends BaseEvent { eventType: 'CustomerUpdated'; aggregateType: 'Customer'; // Include both old and new values for debugging // (optional but helpful) changes: Array<{ field: string; oldValue: unknown; newValue: unknown; }>; // Full current state after update data: { email: string; name: string; status: 'active' | 'suspended' | 'deleted'; updatedAt: Date; };} interface CustomerDeletedEvent extends BaseEvent { eventType: 'CustomerDeleted'; aggregateType: 'Customer'; // Deletion reason for audit reason: string; deletedBy: string;} // Type union for type-safe handlingtype CustomerEvent = | CustomerCreatedEvent | CustomerUpdatedEvent | CustomerDeletedEvent; // ===================================================// EVENT PUBLISHING EXAMPLE// =================================================== class CustomerService { private eventPublisher: EventPublisher; async updateCustomer( customerId: string, updates: CustomerUpdates ): Promise<Customer> { const customer = await this.repository.findById(customerId); // Track changes for the event const changes = this.calculateChanges(customer, updates); // Apply updates const oldVersion = customer.version; Object.assign(customer, updates); customer.version += 1; customer.updatedAt = new Date(); // Persist await this.repository.save(customer); // Publish comprehensive event await this.eventPublisher.publish<CustomerUpdatedEvent>({ eventId: generateEventId(), eventType: 'CustomerUpdated', aggregateType: 'Customer', aggregateId: customerId, version: customer.version, // NEW version timestamp: customer.updatedAt, correlationId: getCurrentCorrelationId(), causationId: getCausationId(), sourceService: 'customer-service', sourceInstance: getInstanceId(), changes: changes, data: { email: customer.email, name: customer.name, status: customer.status, updatedAt: customer.updatedAt, }, }); return customer; } private calculateChanges( before: Customer, updates: CustomerUpdates ): Array<{ field: string; oldValue: unknown; newValue: unknown }> { const changes = []; for (const [key, value] of Object.entries(updates)) { if (before[key] !== value) { changes.push({ field: key, oldValue: before[key], newValue: value, }); } } return changes; }}Two challenges haunt event-driven synchronization: ordering and duplicate processing. Understanding and handling these is essential for reliable sync.
Events for the same entity can arrive out of order due to:
Example:
If consumer receives in order 1 → 3 → 2, applying version 2 after version 3 would revert the name change!
Solution: Version-Based Updates
When processing an event:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
// ===================================================// VERSION-BASED ORDERING PROTECTION// ===================================================// Only apply events that move the version forward.// Stale events are safely ignored.// =================================================== interface LocalCustomerView { customerId: string; name: string; email: string; version: number; // Track which version we have lastEventId: string; // Track last processed event lastUpdated: Date;} class CustomerViewUpdater { private repository: LocalCustomerViewRepository; async handleCustomerEvent(event: CustomerEvent): Promise<void> { const local = await this.repository.findById(event.aggregateId); if (local) { // ORDERING CHECK: Only apply if event advances version if (event.version <= local.version) { console.log( `Skipping stale event. Local version: ${local.version}, ` + `Event version: ${event.version}` ); return; // Safely ignore out-of-order event } // IDEMPOTENCY CHECK: Have we processed this exact event? if (event.eventId === local.lastEventId) { console.log(`Duplicate event ${event.eventId}, skipping`); return; // Safely ignore duplicate } } // Process based on event type switch (event.eventType) { case 'CustomerCreated': await this.handleCreated(event); break; case 'CustomerUpdated': await this.handleUpdated(event, local); break; case 'CustomerDeleted': await this.handleDeleted(event); break; } } private async handleUpdated( event: CustomerUpdatedEvent, local: LocalCustomerView | null ): Promise<void> { if (!local) { // We're missing the Created event - fetch full state console.warn( `Received update for unknown customer ${event.aggregateId}. ` + `Fetching from source.` ); await this.fetchAndStoreCustomer(event.aggregateId); return; } // Apply update await this.repository.update(event.aggregateId, { name: event.data.name, email: event.data.email, version: event.version, lastEventId: event.eventId, lastUpdated: new Date(), }); } private async fetchAndStoreCustomer(customerId: string): Promise<void> { // Fallback: call source service to get current state // This handles missed events, startup, etc. const current = await this.customerServiceClient.getCustomer(customerId); await this.repository.upsert({ customerId: current.id, name: current.name, email: current.email, version: current.version, lastEventId: 'fetched-from-source', lastUpdated: new Date(), }); }}At-least-once delivery is the only practical guarantee for distributed events. Networks fail, consumers crash, acknowledgments get lost. Events will be delivered multiple times.
Your consumer must be idempotent: processing the same event twice produces the same result as processing once.
Techniques for idempotency:
UPDATE WHERE version = ?)Some systems claim 'exactly-once' semantics. Don't trust it for business logic. Always design consumers to be idempotent. Even 'exactly-once' systems have edge cases where duplicates occur.
A reliable event consumer must handle failures gracefully, process efficiently, and never lose events. This section covers the patterns that production consumers use.
The publisher must ensure events are published if and only if the database write succeeds. The Transactional Outbox pattern achieves this:
This ensures no event is lost if publish fails, and no event is published for rolled-back transactions.
Consumers should only acknowledge (ack) events after successful processing:
1. Receive event from broker
2. Start database transaction
3. Update local view
4. Commit transaction
5. Acknowledge event to broker
If step 4 fails, the event remains unacknowledged and will be redelivered. This is the at-least-once guarantee in action.
Critical: Your processing must be idempotent because step 3-5 might succeed/fail independently.
Some events consistently fail processing—malformed data, unexpected types, bugs. After N retries, move these to a Dead Letter Queue (DLQ):
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
// ===================================================// RELIABLE EVENT CONSUMER IMPLEMENTATION// ===================================================// Demonstrates: idempotency, retry logic, DLQ handling,// and proper acknowledgment sequencing.// =================================================== interface ConsumerConfig { maxRetries: number; retryDelayMs: number; deadLetterTopic: string;} class ReliableEventConsumer { private config: ConsumerConfig; private processor: EventProcessor; private publisher: EventPublisher; private processedEvents: ProcessedEventStore; async consumeEvent(event: CustomerEvent): Promise<void> { const consumeId = `${event.eventId}-${event.aggregateId}`; // ========================================== // STEP 1: Check idempotency // ========================================== if (await this.processedEvents.exists(consumeId)) { console.log(`Already processed ${event.eventId}, skipping`); return; // Already processed, acknowledge and move on } // ========================================== // STEP 2: Process with retry logic // ========================================== let lastError: Error | null = null; for (let attempt = 1; attempt <= this.config.maxRetries; attempt++) { try { await this.processWithTransaction(event); // ========================================== // STEP 3: Record successful processing // ========================================== await this.processedEvents.markProcessed(consumeId, { processedAt: new Date(), eventType: event.eventType, }); // Success! Return to acknowledge return; } catch (error) { lastError = error as Error; console.error( `Attempt ${attempt} failed for ${event.eventId}: ${lastError.message}` ); if (attempt < this.config.maxRetries) { // Exponential backoff const delay = this.config.retryDelayMs * Math.pow(2, attempt - 1); await sleep(delay); } } } // ========================================== // STEP 4: Send to Dead Letter Queue // ========================================== console.error( `All retries exhausted for ${event.eventId}. Sending to DLQ.` ); await this.publisher.publish(this.config.deadLetterTopic, { originalEvent: event, error: lastError?.message, failedAt: new Date(), attempts: this.config.maxRetries, }); // Acknowledge original event to unblock queue // The DLQ event now tracks the failure } private async processWithTransaction(event: CustomerEvent): Promise<void> { // Use database transaction for atomic local update await this.db.transaction(async (tx) => { switch (event.eventType) { case 'CustomerCreated': await this.processor.handleCreated(tx, event); break; case 'CustomerUpdated': await this.processor.handleUpdated(tx, event); break; case 'CustomerDeleted': await this.processor.handleDeleted(tx, event); break; default: // Unknown event type - log and skip console.warn(`Unknown event type: ${(event as any).eventType}`); } }); }} // ===================================================// DLQ PROCESSOR - For manual investigation// =================================================== class DeadLetterProcessor { async reviewAndRetry(dlqEvent: DeadLetterEvent): Promise<void> { // Manual review determined event is now processable // (e.g., bug was fixed, data was corrected) // Re-publish to original topic for reprocessing await this.publisher.publish( 'customer-events', dlqEvent.originalEvent ); // Mark DLQ entry as resolved await this.dlqStore.markResolved(dlqEvent.id, { resolution: 'retried', resolvedAt: new Date(), resolvedBy: getCurrentUser(), }); }}The event broker is the critical infrastructure for event-driven sync. Choosing and configuring it correctly determines system reliability.
| Broker | Strengths | Considerations |
|---|---|---|
| Apache Kafka | Massive scale; log-based; replay support; strong ordering per partition | Operational complexity; requires ZooKeeper or KRaft |
| RabbitMQ | Feature-rich; mature; flexible routing; good for smaller scale | No built-in replay; messages deleted after consumption |
| Amazon SQS + SNS | Fully managed; scales automatically; pay-per-use | Limited replay; eventual consistency between SNS→SQS |
| Google Pub/Sub | Fully managed; global; message retention with replay | Ordering requires configuration; GCP-specific |
| Azure Event Hubs | Fully managed; Kafka-compatible; strong Azure integration | Azure-specific ecosystem |
Partitioning for Ordering
Most brokers only guarantee ordering within a partition. For entity-based sync, partition by entity ID:
Partition key = customerId
→ All events for customer X go to same partition
→ Events for customer X processed in order
Be careful: different entities in the same partition are ordered relative to each other, which may cause head-of-line blocking if one entity has many events.
Retention for Replay
Log-based brokers (Kafka) retain events for a configurable period. This enables:
Set retention based on:
Consumer Groups for Scaling
Consumer groups allow horizontal scaling:
Important: During rebalancing, some events may be processed twice. Idempotency handles this.
Unless you have specific reasons, start with managed brokers (AWS MSK, Confluent Cloud, GCP Pub/Sub). Operating Kafka or RabbitMQ clusters is non-trivial. Focus your engineering on business logic, not broker maintenance.
Event-driven sync has a fundamental challenge: what if you miss events? Retention expires, bugs cause events to be dropped, or a new consumer comes online without historical data. You need strategies for catching up.
Periodically, the publisher emits a "snapshot" event containing the full current state. Consumers can use these to reset/verify their view.
CustomerSnapshot event:
- Contains complete customer data
- Emitted on schedule (daily) and/or on demand
- Version allows ordering with regular events
- Consumer can fully reset local view from snapshot
Trade-off: Larger events; only periodic consistency check.
The publisher provides an API for bulk fetching:
GET /customers?updatedSince=2024-01-01T00:00:00Z
→ Returns all customers modified after that time
GET /customers?ids=cust_123,cust_456
→ Returns specific customers on demand
Consumers use this for:
With a log-based broker (Kafka), consumers can replay from any offset:
1. Consumer starts fresh
2. Set consumer offset to earliest retained event
3. Process all historical events
4. Reach end of log → now tracking real-time
Critical: Ensure your consumer is idempotent and handles version ordering. Replaying years of events will include duplicates and out-of-order segments.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
// ===================================================// HYBRID CATCH-UP STRATEGY// ===================================================// Combines initial API fetch with ongoing event processing// for reliable state synchronization.// =================================================== class CustomerViewSynchronizer { private customerClient: CustomerServiceClient; private eventConsumer: EventConsumer; private localView: LocalCustomerViewRepository; async initialize(): Promise<void> { console.log('Starting customer view synchronization...'); // ========================================== // PHASE 1: Initial bulk load from API // ========================================== console.log('Phase 1: Fetching current state from API...'); let cursor: string | undefined; let totalLoaded = 0; do { const batch = await this.customerClient.listCustomers({ cursor, limit: 1000, }); for (const customer of batch.customers) { await this.localView.upsert({ customerId: customer.id, name: customer.name, email: customer.email, version: customer.version, lastEventId: 'initial-load', lastUpdated: new Date(), }); } totalLoaded += batch.customers.length; cursor = batch.nextCursor; console.log(`Loaded ${totalLoaded} customers...`); } while (cursor); console.log(`Phase 1 complete: ${totalLoaded} customers loaded`); // ========================================== // PHASE 2: Subscribe to real-time events // ========================================== console.log('Phase 2: Starting real-time event subscription...'); // Note: We might process some events for data we just loaded, // but idempotency and version checks handle this safely. await this.eventConsumer.subscribe('customer-events', { // Start from recent events to catch anything that happened // during API fetch startFrom: 'earliest-retained', handler: (event) => this.handleEvent(event), }); console.log('Synchronization initialized successfully'); } async handleMissingEntity(customerId: string): Promise<void> { // Called when we receive an event for an unknown entity // Fetch it from the source API console.warn( `Fetching missing customer ${customerId} from API` ); try { const customer = await this.customerClient.getCustomer(customerId); await this.localView.upsert({ customerId: customer.id, name: customer.name, email: customer.email, version: customer.version, lastEventId: 'api-fetch', lastUpdated: new Date(), }); } catch (error) { if (error.code === 'NOT_FOUND') { console.warn( `Customer ${customerId} not found in source - may be deleted` ); } else { throw error; } } } async reconcile(): Promise<ReconciliationResult> { // Periodic reconciliation to catch any drift console.log('Running periodic reconciliation...'); // Get checksums from source const sourceChecksums = await this.customerClient.getChecksums(); // Compare with local const localChecksums = await this.localView.getChecksums(); const discrepancies: string[] = []; for (const [customerId, sourceChecksum] of Object.entries(sourceChecksums)) { if (localChecksums[customerId] !== sourceChecksum) { discrepancies.push(customerId); } } // Fetch and fix discrepancies for (const customerId of discrepancies) { await this.handleMissingEntity(customerId); } return { totalChecked: Object.keys(sourceChecksums).length, discrepanciesFound: discrepancies.length, discrepanciesFixed: discrepancies.length, }; }}No matter how reliable your event pipeline, run periodic reconciliation. It catches bugs, edge cases, and the unexpected. Many production systems reconcile daily or weekly as a background job.
At scale, event-driven sync faces performance challenges. Understanding these helps you design for high throughput and low latency.
Measuring Sync Latency:
Track the time between event publication and consumer processing complete:
Sync Latency = ConsumerProcessedAt - EventGeneratedAt
Set alerts for:
Consumer Lag is especially critical: it indicates your consumer can't keep up with event rate, and latency will continuously increase.
If consumers fall behind, event brokers queue more data, using more memory and disk. Eventually, oldest events may be dropped (retention exceeded) or the broker slows/crashes. Monitor consumer lag aggressively and scale consumers before lag becomes critical.
Event-driven data synchronization is the standard pattern for maintaining data consistency across microservices. It enables loose coupling, high availability, and independent service evolution while providing eventual consistency.
What's next:
Event-driven sync works well for keeping local views updated. But sometimes you need to query data across multiple services to fulfill a request. The next page explores patterns for cross-service queries—joining distributed data without sacrificing service autonomy.
You now understand event-driven data synchronization—the backbone of distributed data consistency. By publishing rich events, handling ordering and idempotency, building reliable consumers, and planning for catch-up scenarios, you can maintain eventually consistent views across your microservices architecture. Next, we'll tackle the challenge of querying across service boundaries.