Loading content...
Sometimes, despite your best efforts, you need to make breaking changes to an event schema. Perhaps a fundamental design flaw needs correction, a data type was wrong from the start, or business requirements have shifted so dramatically that the old schema no longer makes sense.
When backward and forward compatibility can't be maintained, you need migration strategies—systematic approaches for transitioning producers and consumers from old schemas to new ones without data loss to system downtime.
Migration is where schema evolution meets organizational coordination. Technical elegance matters, but so does clear communication, phased rollouts, and contingency planning.
By the end of this page, you will master migration strategies for schema evolution: when migrations are necessary, how to plan and execute them, specific patterns like dual writes and event upcasting, and how to handle the long tail of legacy events in event-sourced systems.
Migration becomes necessary when schema changes cannot be made backward or forward compatible. Understanding which changes require migration helps you plan accordingly.
Changes requiring migration:
| Change Type | Example | Why Breaking | Migration Approach |
|---|---|---|---|
| Remove required field | Drop customerId | Old consumers expect it | Dual write → deprecation |
| Change field type (incompatible) | quantity: string → int | Cannot coerce all values | New field + deprecate old |
| Rename without alias support | email → contactEmail | Old consumers can't find data | Bridge event / transform |
| Split into multiple events | One OrderCompleted → OrderShipped + OrderDelivered | Consumers expect single event | Fan-out migration |
| Merge multiple events | UserCreated + ProfileCreated → UserRegistered | Consumers subscribed to original | Merge event producer |
| Structural reorganization | Flatten nested object; change array to map | Shape mismatch | Full schema replacement |
The migration decision matrix:
1234567891011121314151617181920212223242526272829303132333435363738394041
// Decision tree for schema changes function determineEvolutionStrategy(change: SchemaChange): EvolutionStrategy { // Can it be made optional with a default? if (change.type === 'ADD_FIELD') { if (change.canHaveDefault) { return { type: 'COMPATIBLE', action: 'Add optional field with default' }; } else { return { type: 'MIGRATION', reason: 'Required field addition' }; } } // Can the old field be aliased? if (change.type === 'RENAME_FIELD') { if (registry.supportsAliases) { return { type: 'COMPATIBLE', action: 'Add alias, keep original' }; } else { return { type: 'MIGRATION', reason: 'Rename without alias support' }; } } // Can the type be widened? if (change.type === 'CHANGE_TYPE') { if (isTypeWidenable(change.oldType, change.newType)) { return { type: 'COMPATIBLE', action: 'Type widening (int → long)' }; } else { return { type: 'MIGRATION', reason: 'Incompatible type change' }; } } // Field removals always require migration if (change.type === 'REMOVE_FIELD') { return { type: 'MIGRATION', reason: 'Field removal breaks consumers', timeline: 'Multi-phase deprecation' }; } return { type: 'ANALYSIS_NEEDED' };}Migrations involve coordination across teams, phased deployments, monitoring, and potential rollbacks. Before committing to migration, exhaust all options for compatible evolution. Often what seems to require migration can be solved with careful schema design.
Successful migrations require careful planning before any code is written. The plan must address technical, organizational, and timeline considerations.
Migration planning checklist:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
# Schema Migration Plan: OrderCreated v2 → v3 ## Overview- **Schema**: OrderCreated- **Change**: Restructure shipping info from nested object to flat fields- **Breaking**: Yes (structural change)- **Start Date**: 2024-03-01- **Target Completion**: 2024-05-01 ## Impact Assessment| Consumer | Team | Current Version | Impact | Contact ||----------|------|-----------------|--------|---------|| Analytics | Data | v2.1 | High - ETL pipelines | data@co.com || Shipping | Ops | v2.0 | High - Core logic | ops-team@ || Notifications | CX | v2.1 | Medium - Template | cx-team@ || Fraud | Risk | v2.2 | Low - Uses subset | risk@ | ## Migration Phases ### Phase 1: Dual Write (March 1-15)- Producer emits BOTH v2 and v3 to separate topics- Consumers continue using v2 topic- Validation: Ensure v3 events are correct- Rollback: Disable v3 writes ### Phase 2: Consumer Migration (March 15 - April 15)- Consumer teams migrate one by one- Each validates with production traffic- Weekly check-ins for progress- Rollback: Consumer switches back to v2 ### Phase 3: V2 Deprecation (April 15 - May 1)- Announce v2 sunset date- Monitor v2 consumption- Chase remaining consumers- Rollback: Extend deadline if needed ### Phase 4: V2 Sunset (May 1)- Stop v2 writes- Archive v2 topic- Remove v2 code from producer ## Success Criteria- [ ] All consumers confirm v3 compatibility- [ ] v2 topic consumption drops to 0- [ ] No errors related to schema mismatch- [ ] Performance within 5% of pre-migrationConsumer migrations always take longer than expected. Some teams will be slow to respond, have competing priorities, or discover issues late. Build buffer into your timeline—double your initial estimate for consumer migration phases.
The dual write pattern is the most common migration strategy. The producer writes events in both old and new formats during the transition period, allowing consumers to migrate at their own pace.
How it works:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
// Dual write implementation class OrderEventProducer { private readonly config: MigrationConfig; async publishOrderCreated(order: Order): Promise<void> { // Always write to v3 topic (new format) await this.publishV3(order); // Write to v2 topic during migration period if (this.config.dualWriteEnabled) { await this.publishV2(order); } } private async publishV3(order: Order): Promise<void> { const event: OrderCreatedV3 = { orderId: order.id, customerId: order.customer.id, totalAmount: order.total, currency: order.currency, // v3 structure: flat shipping fields shippingStreet: order.shipping.address.street, shippingCity: order.shipping.address.city, shippingCountry: order.shipping.address.country, shippingPostalCode: order.shipping.address.postalCode, }; await this.kafka.send({ topic: 'orders.created.v3', messages: [{ key: order.id, value: serialize(event) }], }); } private async publishV2(order: Order): Promise<void> { const event: OrderCreatedV2 = { orderId: order.id, customerId: order.customer.id, totalAmount: order.total, // v2 structure: nested shipping object shippingAddress: { street: order.shipping.address.street, city: order.shipping.address.city, country: order.shipping.address.country, postalCode: order.shipping.address.postalCode, }, }; await this.kafka.send({ topic: 'orders.created.v2', messages: [{ key: order.id, value: serialize(event) }], }); }} // Configuration for migration phasesinterface MigrationConfig { dualWriteEnabled: boolean; // Phase 1-3: true, Phase 4: false v2WritePercentage: number; // Gradual reduction: 100 → 0 monitorV2Consumption: boolean; // Track remaining v2 consumers} // Gradual v2 wind-downasync function publishWithGradualReduction(order: Order) { await this.publishV3(order); // Gradually reduce v2 writes if (Math.random() * 100 < this.config.v2WritePercentage) { await this.publishV2(order); }}Dual writes must be atomic—if v3 write succeeds but v2 fails, consumers see inconsistent data. Use Kafka transactions, or implement a saga pattern that compensates for partial failures. Never leave dual writes in a half-completed state.
Event upcasting transforms old events into new formats at read time, rather than migrating the stored data. It's particularly powerful for event-sourced systems where historical events are immutable.
The concept:
When a consumer reads an old event, an upcaster intercepts it and transforms it to match the current schema. The stored event remains unchanged; only the in-memory representation is upgraded.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
// Event upcaster implementation interface Upcaster<TOld, TNew> { fromVersion: string; toVersion: string; upcast(event: TOld): TNew;} // Upcaster from v1 to v2: Add currency fieldconst orderCreatedV1ToV2: Upcaster<OrderCreatedV1, OrderCreatedV2> = { fromVersion: '1.0.0', toVersion: '2.0.0', upcast(event: OrderCreatedV1): OrderCreatedV2 { return { ...event, currency: 'USD', // Default for pre-currency events currencySource: 'UPCASTED', // Track that this was inferred }; },}; // Upcaster from v2 to v3: Flatten shipping addressconst orderCreatedV2ToV3: Upcaster<OrderCreatedV2, OrderCreatedV3> = { fromVersion: '2.0.0', toVersion: '3.0.0', upcast(event: OrderCreatedV2): OrderCreatedV3 { const addr = event.shippingAddress ?? {}; return { orderId: event.orderId, customerId: event.customerId, totalAmount: event.totalAmount, currency: event.currency, // Flatten nested object shippingStreet: addr.street ?? '', shippingCity: addr.city ?? '', shippingCountry: addr.country ?? 'US', shippingPostalCode: addr.postalCode ?? '', }; },}; // Upcaster chain applies transformations in sequenceclass UpcasterChain { private upcasters: Map<string, Upcaster<any, any>> = new Map(); register(upcaster: Upcaster<any, any>): void { const key = `${upcaster.fromVersion}->${upcaster.toVersion}`; this.upcasters.set(key, upcaster); } upcast(event: StoredEvent, targetVersion: string): any { let current = event.payload; let currentVersion = event.schemaVersion; // Apply upcasters until we reach target version while (currentVersion !== targetVersion) { const upcaster = this.findUpcaster(currentVersion); if (!upcaster) { throw new Error(`No upcaster from ${currentVersion}`); } current = upcaster.upcast(current); currentVersion = upcaster.toVersion; } return current; }} // Usage in event-sourced aggregateclass OrderAggregate { private upcasters = new UpcasterChain(); async loadFromHistory(orderId: string): Promise<Order> { const events = await eventStore.getEvents(orderId); let state = initialOrderState(); for (const storedEvent of events) { // Upcast to current version before applying const event = this.upcasters.upcast(storedEvent, CURRENT_VERSION); state = this.apply(state, event); } return state; }}Upcasters form a chain: v1→v2→v3→v4. Each upcaster only knows how to upgrade by one version. To read a v1 event with v4 code, apply three upcasters in sequence. This keeps each upcaster simple and testable.
When to use upcasting:
When NOT to use upcasting:
For systems where upcasting is impractical or performance is critical, you can migrate the event stream itself—transforming old events into new format and writing a new stream.
Stream migration process:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
// Event stream migration worker class StreamMigrationWorker { async migrate(config: MigrationConfig): Promise<MigrationResult> { const { sourceTopic, targetTopic, transformer } = config; // Track progress for resumability let checkpoint = await this.loadCheckpoint(sourceTopic); let migratedCount = 0; let errorCount = 0; // Connect to source with specific offset const consumer = await this.createConsumer(sourceTopic, checkpoint); const producer = await this.createProducer(); try { while (true) { const batch = await consumer.fetchBatch(1000); if (batch.length === 0) break; // Caught up const transformed = []; for (const event of batch) { try { // Transform old event to new format const newEvent = transformer.transform(event); transformed.push({ key: event.key, value: newEvent, timestamp: event.timestamp, // Preserve original timestamp }); migratedCount++; } catch (error) { errorCount++; await this.logError(event, error); if (errorCount > config.maxErrors) { throw new Error('Too many transformation errors'); } } } // Write batch to target topic await producer.sendBatch(targetTopic, transformed); // Checkpoint for resumability checkpoint = batch[batch.length - 1].offset; await this.saveCheckpoint(sourceTopic, checkpoint); this.reportProgress(migratedCount, checkpoint); } } finally { await consumer.close(); await producer.close(); } return { migratedCount, errorCount, finalCheckpoint: checkpoint }; }} // Transformer with validationclass OrderEventTransformer { transform(oldEvent: OrderCreatedV2): OrderCreatedV3 { const transformed: OrderCreatedV3 = { orderId: oldEvent.orderId, customerId: oldEvent.customerId, totalAmount: oldEvent.totalAmount, currency: oldEvent.currency ?? 'USD', shippingStreet: oldEvent.shippingAddress?.street ?? '', shippingCity: oldEvent.shippingAddress?.city ?? '', shippingCountry: oldEvent.shippingAddress?.country ?? 'US', shippingPostalCode: oldEvent.shippingAddress?.postalCode ?? '', // Add metadata about migration _migrated: true, _migratedAt: new Date().toISOString(), _originalVersion: '2.0.0', }; // Validate transformation this.validate(transformed); return transformed; } private validate(event: OrderCreatedV3): void { if (!event.orderId || !event.customerId) { throw new ValidationError('Required fields missing after transform'); } if (event.totalAmount < 0) { throw new ValidationError('Invalid totalAmount'); } }}When migrating events, preserve original timestamps and event IDs where possible. Changing these can break idempotency checks in consumers and distort time-based analytics. Add migration metadata as new fields rather than modifying originals.
The bridge event pattern introduces an intermediate component that transforms events between schemas in real-time. Unlike migration workers, bridges operate continuously on live traffic.
Use cases:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
// Bridge service for event transformation class EventBridge { private readonly transformers: Map<string, EventTransformer>; async start(): Promise<void> { // Subscribe to source topics const consumer = await this.kafka.consumer({ groupId: 'event-bridge', }); await consumer.subscribe({ topics: ['orders.created.v2', 'payments.processed.v1'], }); await consumer.run({ eachMessage: async ({ topic, message }) => { await this.bridgeEvent(topic, message); }, }); } private async bridgeEvent(sourceTopic: string, message: Message): Promise<void> { const transformer = this.transformers.get(sourceTopic); if (!transformer) { logger.warn(`No transformer for topic: ${sourceTopic}`); return; } try { // Parse source event const sourceEvent = JSON.parse(message.value.toString()); // Transform to target format const targetEvent = transformer.transform(sourceEvent); // Publish to target topic await this.producer.send({ topic: transformer.targetTopic, messages: [{ key: message.key, value: JSON.stringify(targetEvent), headers: { 'bridged-from': sourceTopic, 'original-timestamp': message.timestamp, }, }], }); this.metrics.increment('events.bridged', { source: sourceTopic }); } catch (error) { this.metrics.increment('events.bridge_failed', { source: sourceTopic }); await this.handleError(sourceTopic, message, error); } }} // Complex transformation: Split one event into multipleclass OrderCompletedBridge implements EventTransformer { targetTopics = ['orders.shipped', 'orders.delivered']; transform(orderCompleted: OrderCompletedV1): BridgedEvents { // Old system had one OrderCompleted event // New system has separate Shipped and Delivered events return { events: [ { topic: 'orders.shipped', event: { orderId: orderCompleted.orderId, shippedAt: orderCompleted.shippedTimestamp, carrier: orderCompleted.shippingCarrier, trackingNumber: orderCompleted.trackingNumber, }, }, { topic: 'orders.delivered', event: { orderId: orderCompleted.orderId, deliveredAt: orderCompleted.deliveredTimestamp, signedBy: orderCompleted.recipientName, }, }, ], }; }}Bridges can be temporary (during migration) or permanent (for ongoing integration). If a legacy producer cannot be updated, a permanent bridge lets modern consumers use current schemas while the legacy system continues unchanged.
Sometimes independent migration isn't feasible—you need a coordinated cutover where all producers and consumers switch simultaneously. This is the riskiest approach but occasionally necessary.
When coordinated cutover is necessary:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
// Coordinated cutover with feature flags class CutoverCoordinator { private readonly featureFlag: FeatureFlagService; async prepareCutover(services: string[]): Promise<void> { // 1. Verify all services are ready for (const service of services) { const status = await this.checkServiceReady(service); if (!status.ready) { throw new Error(`Service ${service} not ready: ${status.reason}`); } } // 2. Create cutover flag (disabled) await this.featureFlag.create('schema-v3-cutover', { enabled: false, services: services, }); console.log('Cutover prepared. All services ready.'); } async executeCutover(): Promise<CutoverResult> { const startTime = Date.now(); try { // 1. Pause event production (optional) await this.pauseProducers(); // 2. Wait for in-flight events to drain await this.waitForDrain(30_000); // 30 second timeout // 3. Flip the switch - all services read flag await this.featureFlag.enable('schema-v3-cutover'); // 4. Resume production with new schema await this.resumeProducers(); // 5. Verify all services healthy await this.verifyHealth(); return { success: true, duration: Date.now() - startTime, }; } catch (error) { // Rollback on failure await this.rollback(); return { success: false, error: error.message, duration: Date.now() - startTime, }; } } private async rollback(): Promise<void> { // Disable flag - services revert to old behavior await this.featureFlag.disable('schema-v3-cutover'); // Resume producers with old schema await this.resumeProducers(); }} // Service-side cutover handlingclass OrderService { async publishOrderCreated(order: Order): Promise<void> { const useV3 = await featureFlags.isEnabled('schema-v3-cutover'); if (useV3) { await this.publisherV3.publish(this.serializeV3(order)); } else { await this.publisherV2.publish(this.serializeV2(order)); } }} // Consumer-side cutover handlingclass AnalyticsConsumer { async processEvent(event: unknown): Promise<void> { const useV3 = await featureFlags.isEnabled('schema-v3-cutover'); if (useV3) { const order = this.parseV3(event); await this.processOrderV3(order); } else { const order = this.parseV2(event); await this.processOrderV2(order); } }}Coordinated cutover is high-risk: if any service fails to switch, you have schema mismatch. Practice the cutover in staging, have quick rollback ready, and choose a low-traffic window. Alert on-call teams before attempting.
Every migration has stragglers—consumers that don't migrate on schedule. Handling the "long tail" of lagging consumers is often the most frustrating part of schema migrations.
Long tail strategies:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
// Monitoring and enforcement for migration stragglers class MigrationTracker { async getConsumerMigrationStatus(): Promise<ConsumerStatus[]> { const consumers = await this.registry.getConsumersForSubject('order-created'); return Promise.all(consumers.map(async (consumer) => { const metrics = await this.getConsumerMetrics(consumer.id); return { consumerId: consumer.id, team: consumer.owner, currentSchemaVersion: metrics.schemaVersionUsed, targetSchemaVersion: '3.0.0', migrated: metrics.schemaVersionUsed === '3.0.0', lastV2EventProcessed: metrics.lastV2Timestamp, v2EventsLastDay: metrics.v2CountLast24h, }; })); } async enforceDeadline(deadline: Date): Promise<void> { const status = await this.getConsumerMigrationStatus(); const stragglers = status.filter(c => !c.migrated); if (stragglers.length === 0) { console.log('All consumers migrated!'); return; } const daysRemaining = Math.floor( (deadline.getTime() - Date.now()) / (1000 * 60 * 60 * 24) ); for (const straggler of stragglers) { if (daysRemaining <= 7) { // 1 week warning: Daily escalation await this.sendEscalation(straggler, 'URGENT', daysRemaining); } else if (daysRemaining <= 14) { // 2 week warning: Weekly reminder await this.sendReminder(straggler, 'WARNING', daysRemaining); } else if (daysRemaining <= 30) { // 1 month warning: Initial notification await this.sendReminder(straggler, 'INFO', daysRemaining); } } } async handleDeadlinePassed(): Promise<void> { const status = await this.getConsumerMigrationStatus(); const stragglers = status.filter(c => !c.migrated); for (const straggler of stragglers) { const decision = await this.getMigrationDecision(straggler); switch (decision) { case 'BREAK': // Low-priority consumer; accept breakage logger.warn(`Consumer ${straggler.consumerId} will break`); break; case 'BRIDGE': // Set up permanent bridge for this consumer await this.createBridgeForConsumer(straggler); break; case 'EXTEND': // Extend v2 writes for critical consumer await this.extendDualWrite(straggler); break; } } }}Draft clear, escalating communications: friendly reminder (30 days), warning (14 days), urgent notice (7 days), final notice (48 hours). Include the consumer team's owner, their manager, and a skip-level if needed. Attach clear migration steps and offer help.
Migration strategies must be tested thoroughly before production. The stakes are high—failed migrations can cause data loss or extended outages.
Testing layers:
Test transformation logic with comprehensive event samples:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
describe('OrderEventTransformer', () => { const transformer = new OrderEventTransformer(); it('transforms standard v2 event to v3', () => { const v2Event = { orderId: 'order-123', customerId: 'cust-456', totalAmount: 99.99, currency: 'USD', shippingAddress: { street: '123 Main St', city: 'Seattle', country: 'US', postalCode: '98101', }, }; const v3Event = transformer.transform(v2Event); expect(v3Event.shippingStreet).toBe('123 Main St'); expect(v3Event.shippingCity).toBe('Seattle'); }); it('handles missing optional fields', () => { const v2EventMinimal = { orderId: 'order-123', customerId: 'cust-456', totalAmount: 99.99, // currency and shippingAddress missing }; const v3Event = transformer.transform(v2EventMinimal); expect(v3Event.currency).toBe('USD'); // Default expect(v3Event.shippingStreet).toBe(''); // Empty string expect(v3Event.shippingCountry).toBe('US'); // Default }); it('preserves all original data', () => { const v2Event = generateRandomV2Event(); const v3Event = transformer.transform(v2Event); // Core fields unchanged expect(v3Event.orderId).toBe(v2Event.orderId); expect(v3Event.customerId).toBe(v2Event.customerId); expect(v3Event.totalAmount).toBe(v2Event.totalAmount); });});Production data contains edge cases you didn't anticipate: null values where you expected strings, unicode characters, extremely long fields, malformed data that was accepted by old validation. Sample real production events for testing.
Migration strategies are essential when schema changes cannot be made compatible. Let's consolidate the key takeaways:
Module complete!
You've now mastered event schema evolution: versioning strategies, backward and forward compatibility, schema registries, and migration techniques. These skills enable you to evolve event-driven systems without coordination nightmares or production incidents.
Congratulations! You now have comprehensive knowledge of event schema evolution. You can version schemas strategically, maintain compatibility, use schema registries effectively, and execute migrations when breaking changes are necessary. This completes the Event Schema Evolution module.