Loading learning content...
Unit tests verify that individual components work correctly in isolation. But event-driven systems are fundamentally about connections—events flowing between producers, message brokers, and consumers. A system where every handler passes its unit tests can still fail catastrophically if:
Integration tests exercise the real plumbing. They verify that events actually traverse your infrastructure, handlers actually process them, and the end-to-end system produces correct outcomes.
By the end of this page, you will master: (1) Strategies for integration testing event flows without production infrastructure, (2) Test containers and embedded brokers for realistic testing, (3) Patterns for asserting eventual outcomes in asynchronous systems, (4) Handling timing and ordering challenges in event tests, and (5) Best practices from large-scale event-driven systems.
Understanding when to use integration tests versus unit tests is crucial for an effective testing strategy. Each type serves different purposes and catches different bugs.
| Aspect | Unit Tests | Integration Tests |
|---|---|---|
| Scope | Single handler or producer in isolation | End-to-end event flow through infrastructure |
| Dependencies | All mocked/faked | Real or realistic (containers, embedded brokers) |
| Speed | Milliseconds (thousands per second) | Seconds (dozens per minute) |
| Determinism | Fully deterministic | May have timing-related flakiness |
| Bugs Caught | Logic errors, calculation mistakes | Serialization, routing, concurrency, infrastructure |
| Maintenance | Low maintenance, rarely break | Higher maintenance, sensitive to infrastructure changes |
| When to Run | Every commit, pre-commit hooks | CI pipeline, before deployment |
The classic testing pyramid applies to event-driven systems with a specific twist:
The twist: Event-driven systems require proportionally more integration tests than traditional applications because events introduce failure modes that can't be caught in isolation.
Many teams achieve excellent unit test coverage, feel confident, and then face production failures from serialization bugs or routing issues. Unit tests verify that handlers process constructed events correctly—they don't verify that events are correctly constructed or correctly delivered in production.
Integration tests require infrastructure—but using production infrastructure is impractical. Let's examine options from simplest to most realistic.
The simplest approach: use an in-process event bus that executes handlers synchronously or with minimal async delay.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
/** * In-memory event bus for integration testing * Simulates async behavior with configurable delays */export class InMemoryEventBus implements EventBus { private handlers: Map<string, EventHandler[]> = new Map(); private publishedEvents: DomainEvent[] = []; private processingDelay: number = 0; subscribe<T extends DomainEvent>( eventType: new (...args: any[]) => T, handler: EventHandler<T> ): void { const eventName = eventType.name; const existing = this.handlers.get(eventName) || []; existing.push(handler); this.handlers.set(eventName, existing); } async publish(event: DomainEvent): Promise<void> { this.publishedEvents.push(event); const eventName = event.constructor.name; const handlers = this.handlers.get(eventName) || []; // Simulate async processing if (this.processingDelay > 0) { await sleep(this.processingDelay); } // Execute all handlers await Promise.all( handlers.map(handler => handler.handle(event)) ); } // Test configuration withProcessingDelay(ms: number): this { this.processingDelay = ms; return this; } // Test assertions getPublishedEvents(): DomainEvent[] { return [...this.publishedEvents]; } getPublishedEventsOfType<T extends DomainEvent>( type: new (...args: any[]) => T ): T[] { return this.publishedEvents.filter(e => e instanceof type) as T[]; } reset(): void { this.publishedEvents.length = 0; }} // Usage in testsdescribe('Order Flow Integration', () => { let eventBus: InMemoryEventBus; let orderService: OrderService; let inventoryHandler: InventoryReservationHandler; let emailHandler: EmailNotificationHandler; beforeEach(() => { eventBus = new InMemoryEventBus(); // Wire up real handlers with test dependencies inventoryHandler = new InventoryReservationHandler( new InMemoryInventoryRepository(), new InMemoryReservationRepository() ); emailHandler = new EmailNotificationHandler( new MockEmailService(), new InMemoryCustomerRepository() ); eventBus.subscribe(OrderPlacedEvent, inventoryHandler); eventBus.subscribe(OrderPlacedEvent, emailHandler); orderService = new OrderService(eventBus); }); it('should trigger all handlers when order is placed', async () => { // Act await orderService.placeOrder({ customerId: 'cust-123', items: [{ productId: 'prod-1', quantity: 2 }], }); // Assert - Both handlers executed expect(inventoryHandler.reservationCount).toBe(1); expect(emailHandler.emailsSent).toBe(1); });});Test containers spin up real infrastructure (RabbitMQ, Kafka, Redis) in Docker containers for tests. More realistic than in-memory, but slower.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
import { GenericContainer, StartedTestContainer } from 'testcontainers'; /** * Integration tests with real RabbitMQ via Testcontainers */describe('RabbitMQ Integration', () => { let rabbitContainer: StartedTestContainer; let connection: RabbitMQConnection; let eventBus: RabbitMQEventBus; beforeAll(async () => { // Start RabbitMQ container (takes 5-10 seconds) rabbitContainer = await new GenericContainer('rabbitmq:3-management') .withExposedPorts(5672, 15672) .withWaitStrategy(Wait.forLogMessage('Server startup complete')) .start(); const port = rabbitContainer.getMappedPort(5672); connection = await RabbitMQConnection.create({ host: 'localhost', port: port, username: 'guest', password: 'guest', }); eventBus = new RabbitMQEventBus(connection); }, 60000); // 60 second timeout for container startup afterAll(async () => { await connection.close(); await rabbitContainer.stop(); }); beforeEach(async () => { // Clear queues between tests await eventBus.purgeAllQueues(); }); it('should deliver events to all subscribers', async () => { // Arrange const receivedEvents: DomainEvent[] = []; await eventBus.subscribe(OrderPlacedEvent, async (event) => { receivedEvents.push(event); }); const event = new OrderPlacedEvent({ eventId: EventId.generate(), orderId: OrderId.from('order-123'), customerId: CustomerId.from('cust-456'), items: [{ productId: 'prod-1', quantity: 1, price: Money.of(10.00) }], totalAmount: Money.of(10.00), occurredAt: new Date(), }); // Act await eventBus.publish(event); // Assert - Wait for async delivery await waitFor(() => { expect(receivedEvents).toHaveLength(1); expect(receivedEvents[0]).toMatchObject({ orderId: expect.objectContaining({ value: 'order-123' }), }); }); }); it('should survive connection interruption', async () => { // Arrange const receivedEvents: DomainEvent[] = []; await eventBus.subscribe(OrderPlacedEvent, async (event) => { receivedEvents.push(event); }); // Act - Pause container to simulate network failure await rabbitContainer.pause(); const publishPromise = eventBus.publish(new OrderPlacedEvent({ eventId: EventId.generate(), orderId: OrderId.from('order-resilient'), customerId: CustomerId.from('cust-1'), items: [], totalAmount: Money.of(0), occurredAt: new Date(), })); // Resume after short delay setTimeout(() => rabbitContainer.unpause(), 1000); // Assert - Event eventually delivered after reconnection await publishPromise; await waitFor(() => { expect(receivedEvents).toHaveLength(1); }, { timeout: 10000 }); });});beforeAll to start once, beforeEach to reset state.start() returns. Wait for specific log messages or health checks.afterAll. Leaked containers consume resources and cause flaky tests.Event-driven systems are inherently eventually consistent. When you publish an event, you can't immediately assert on its effects—handlers need time to process. This introduces the fundamental challenge of async testing: How do you assert on outcomes that haven't happened yet?
The most common pattern: repeatedly check a condition until it's true or timeout elapses.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
/** * Wait for a condition to become true within a timeout */export async function waitFor( condition: () => boolean | Promise<boolean>, options: { timeout?: number; interval?: number; message?: string } = {}): Promise<void> { const { timeout = 5000, interval = 50, message = 'Condition not met' } = options; const startTime = Date.now(); while (Date.now() - startTime < timeout) { const result = await condition(); if (result) { return; } await sleep(interval); } throw new Error(`Timeout after ${timeout}ms: ${message}`);} /** * Wait for and return a value once available */export async function waitForValue<T>( getValue: () => T | null | undefined | Promise<T | null | undefined>, options: { timeout?: number; interval?: number } = {}): Promise<T> { const { timeout = 5000, interval = 50 } = options; const startTime = Date.now(); while (Date.now() - startTime < timeout) { const value = await getValue(); if (value !== null && value !== undefined) { return value; } await sleep(interval); } throw new Error(`Value not available after ${timeout}ms`);} // Usage in testsdescribe('Order Processing Flow', () => { it('should reserve inventory after order placed', async () => { // Arrange const orderId = OrderId.generate(); // Act - Place order (publishes event) await orderService.placeOrder({ orderId: orderId.value, customerId: 'cust-123', items: [{ productId: 'prod-1', quantity: 5 }], }); // Assert - Wait for handler to create reservation await waitFor( async () => { const reservation = await reservationRepository .findByOrderId(orderId); return reservation !== null; }, { timeout: 3000, message: 'Reservation not created' } ); // Now we can make specific assertions const reservation = await reservationRepository.findByOrderId(orderId); expect(reservation!.quantity).toBe(5); expect(reservation!.status).toBe(ReservationStatus.Active); });});For testing event ordering and completeness, collect events as they arrive and assert on the collection.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
/** * Collects events for assertion in integration tests */export class EventCollector { private events: DomainEvent[] = []; private eventArrived = new EventEmitter(); collect(event: DomainEvent): void { this.events.push(event); this.eventArrived.emit('event', event); } getEvents(): DomainEvent[] { return [...this.events]; } getEventsOfType<T extends DomainEvent>( type: new (...args: any[]) => T ): T[] { return this.events.filter(e => e instanceof type) as T[]; } async waitForEventCount(count: number, timeout = 5000): Promise<void> { if (this.events.length >= count) { return; } await new Promise<void>((resolve, reject) => { const timer = setTimeout(() => { reject(new Error( `Expected ${count} events, got ${this.events.length} ` + `after ${timeout}ms` )); }, timeout); const checkCount = () => { if (this.events.length >= count) { clearTimeout(timer); this.eventArrived.off('event', checkCount); resolve(); } }; this.eventArrived.on('event', checkCount); }); } async waitForEvent<T extends DomainEvent>( type: new (...args: any[]) => T, predicate?: (event: T) => boolean, timeout = 5000 ): Promise<T> { const existing = this.getEventsOfType(type) .find(e => !predicate || predicate(e)); if (existing) { return existing; } return new Promise((resolve, reject) => { const timer = setTimeout(() => { reject(new Error(`Event ${type.name} not received in ${timeout}ms`)); }, timeout); const handler = (event: DomainEvent) => { if (event instanceof type && (!predicate || predicate(event))) { clearTimeout(timer); this.eventArrived.off('event', handler); resolve(event); } }; this.eventArrived.on('event', handler); }); } reset(): void { this.events.length = 0; }} // Usage in testsdescribe('Order Lifecycle Flow', () => { let eventCollector: EventCollector; let eventBus: EventBus; beforeEach(() => { eventCollector = new EventCollector(); eventBus = createEventBus(); // Subscribe collector to all events eventBus.subscribeAll((event) => eventCollector.collect(event)); }); it('should emit complete event sequence for order lifecycle', async () => { // Act - Complete order workflow const order = await orderService.placeOrder({ customerId: 'cust-123', items: [{ productId: 'prod-1', quantity: 2 }], }); await orderService.confirmOrder(order.id); await orderService.shipOrder(order.id, 'TRACK123'); // Assert - Wait for all events await eventCollector.waitForEventCount(3, 5000); const events = eventCollector.getEvents(); expect(events[0]).toBeInstanceOf(OrderPlacedEvent); expect(events[1]).toBeInstanceOf(OrderConfirmedEvent); expect(events[2]).toBeInstanceOf(OrderShippedEvent); }); it('should receive specific event with matching data', async () => { // Act const customerId = CustomerId.from('vip-customer'); await orderService.placeOrder({ customerId: customerId.value, items: [{ productId: 'luxury-item', quantity: 1 }], }); // Assert - Wait for specific event const event = await eventCollector.waitForEvent( OrderPlacedEvent, (e) => e.customerId.equals(customerId) ); expect(event.items[0].productId).toBe('luxury-item'); });});Never write await sleep(2000) and then assert. This creates slow, flaky tests. Instead, poll for the expected condition with a timeout. If the condition is met in 50ms, the test completes in 50ms. If it takes 1.5 seconds, you still pass. Only if it exceeds the timeout do you fail.
The most valuable integration tests verify complete business flows—from initial user action through all event handlers to final state. These tests catch integration bugs that unit tests miss.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
describe('Order Fulfillment Flow', () => { // Full integration test setup (could use TestContainers or in-memory) let eventBus: EventBus; let orderService: OrderService; let inventoryService: InventoryService; let paymentService: PaymentService; let notificationService: NotificationService; let eventCollector: EventCollector; beforeAll(async () => { // Set up real or containerized infrastructure eventBus = await createEventBus(); // Initialize services with real implementations orderService = new OrderService( new PostgresOrderRepository(), eventBus ); inventoryService = new InventoryService( new PostgresInventoryRepository(), eventBus ); paymentService = new PaymentService( new StripePaymentGateway(TEST_API_KEY), eventBus ); notificationService = new NotificationService( new MockEmailProvider(), // Still mock external services eventBus ); // Register event handlers eventBus.subscribe(OrderPlacedEvent, new InventoryReservationHandler(inventoryService)); eventBus.subscribe(InventoryReservedEvent, new PaymentCaptureHandler(paymentService)); eventBus.subscribe(PaymentCapturedEvent, new OrderConfirmationHandler(notificationService)); eventCollector = new EventCollector(); eventBus.subscribeAll(e => eventCollector.collect(e)); }); beforeEach(async () => { await clearDatabase(); eventCollector.reset(); // Seed test data await inventoryService.addStock('prod-1', 100); await inventoryService.addStock('prod-2', 50); }); afterAll(async () => { await closeAll(); }); describe('Successful Order Flow', () => { it('should complete full order flow from placement to confirmation', async () => { // Arrange const customerId = 'cust-flow-test'; const orderRequest = { customerId, items: [ { productId: 'prod-1', quantity: 5, price: 10.00 }, { productId: 'prod-2', quantity: 2, price: 25.00 }, ], }; // Act - Single action triggers entire flow const order = await orderService.placeOrder(orderRequest); // Assert - Wait for complete flow await waitFor(async () => { const finalOrder = await orderService.findById(order.id); return finalOrder?.status === OrderStatus.Confirmed; }, { timeout: 10000, message: 'Order not confirmed' }); // Verify all side effects occurred // 1. Inventory was reserved const inventory1 = await inventoryService.getStock('prod-1'); expect(inventory1.available).toBe(95); // 100 - 5 expect(inventory1.reserved).toBe(5); const inventory2 = await inventoryService.getStock('prod-2'); expect(inventory2.available).toBe(48); // 50 - 2 expect(inventory2.reserved).toBe(2); // 2. Payment was captured const payment = await paymentService.findByOrderId(order.id); expect(payment).toBeDefined(); expect(payment!.status).toBe(PaymentStatus.Captured); expect(payment!.amount.equals(Money.of(100.00))).toBe(true); // (5*10) + (2*25) // 3. Confirmation email was sent const emails = notificationService.getSentNotifications(); expect(emails).toHaveLength(1); expect(emails[0].type).toBe('order_confirmation'); expect(emails[0].orderId).toBe(order.id.value); // 4. All events were published in correct order const events = eventCollector.getEvents(); expect(events.map(e => e.constructor.name)).toEqual([ 'OrderPlacedEvent', 'InventoryReservedEvent', 'PaymentCapturedEvent', 'OrderConfirmedEvent', ]); }); it('should maintain data consistency across all services', async () => { // Place multiple orders concurrently const orders = await Promise.all([ orderService.placeOrder({ customerId: 'cust-1', items: [{ productId: 'prod-1', quantity: 10, price: 10.00 }], }), orderService.placeOrder({ customerId: 'cust-2', items: [{ productId: 'prod-1', quantity: 15, price: 10.00 }], }), orderService.placeOrder({ customerId: 'cust-3', items: [{ productId: 'prod-1', quantity: 20, price: 10.00 }], }), ]); // Wait for all orders to complete await waitFor(async () => { const statuses = await Promise.all( orders.map(async o => (await orderService.findById(o.id))?.status) ); return statuses.every(s => s === OrderStatus.Confirmed); }, { timeout: 15000 }); // Verify inventory is consistent const inventory = await inventoryService.getStock('prod-1'); expect(inventory.available).toBe(55); // 100 - (10+15+20) expect(inventory.reserved).toBe(45); // Verify all payments captured const payments = await Promise.all( orders.map(o => paymentService.findByOrderId(o.id)) ); expect(payments.every(p => p?.status === PaymentStatus.Captured)).toBe(true); }); }); describe('Failure and Compensation', () => { it('should compensate inventory when payment fails', async () => { // Arrange - Set up payment to fail paymentService.simulateNextPaymentFailure('card_declined'); // Act const order = await orderService.placeOrder({ customerId: 'cust-payment-fail', items: [{ productId: 'prod-1', quantity: 10, price: 10.00 }], }); // Wait for compensation flow await waitFor(async () => { const finalOrder = await orderService.findById(order.id); return finalOrder?.status === OrderStatus.PaymentFailed; }, { timeout: 10000 }); // Assert - Inventory was released (compensated) const inventory = await inventoryService.getStock('prod-1'); expect(inventory.available).toBe(100); // Fully restored expect(inventory.reserved).toBe(0); // Failed order notification sent const notifications = notificationService.getSentNotifications(); expect(notifications).toContainEqual( expect.objectContaining({ type: 'payment_failed', orderId: order.id.value, }) ); }); it('should handle insufficient inventory gracefully', async () => { // Arrange - Request more than available const order = await orderService.placeOrder({ customerId: 'cust-no-stock', items: [{ productId: 'prod-2', quantity: 100, price: 25.00 }], // Only 50 available }); // Wait for failure handling await waitFor(async () => { const finalOrder = await orderService.findById(order.id); return finalOrder?.status === OrderStatus.InventoryUnavailable; }, { timeout: 10000 }); // Assert - No payment attempted const payment = await paymentService.findByOrderId(order.id); expect(payment).toBeNull(); // Customer notified of stock issue const notifications = notificationService.getSentNotifications(); expect(notifications).toContainEqual( expect.objectContaining({ type: 'out_of_stock', orderId: order.id.value, }) ); }); });});Notice that external services (email, payment gateways) are still mocked in integration tests. The goal is to test YOUR event flow, not external APIs. Mock external boundaries, but use real implementations for internal services and infrastructure.
Async integration tests are prone to flakiness—tests that sometimes pass and sometimes fail without code changes. This is the bane of event-driven testing. Let's examine strategies to eliminate flakiness.
| Cause | Symptom | Solution |
|---|---|---|
| Insufficient Wait Time | Sometimes passes, fails under load | Use adaptive polling with generous timeout |
| Event Ordering Assumptions | Occasional assertion failures on order | Assert on final state, not intermediate events |
| Shared State Between Tests | Tests pass individually, fail together | Isolate test data, reset state in beforeEach |
| Container Startup Race | First test after container start fails | Use proper wait strategies for containers |
| Clock Skew | Timestamp comparisons fail intermittently | Use time windows, not exact comparisons |
| Resource Contention | Failures under parallel test execution | Use unique identifiers per test, avoid global state |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
/** * Anti-flakiness Patterns for Event Integration Tests */ // Pattern 1: Unique Identifiers Per Testdescribe('Isolated Test Data', () => { it('should process order independently', async () => { // Use unique IDs to prevent collision with parallel tests const testId = `test-${Date.now()}-${Math.random().toString(36).slice(2)}`; const orderId = OrderId.from(`order-${testId}`); const customerId = CustomerId.from(`cust-${testId}`); await orderService.placeOrder({ orderId: orderId.value, customerId: customerId.value, items: [{ productId: 'prod-1', quantity: 1, price: 10.00 }], }); // Assertions can safely query by unique ID await waitFor(async () => { const order = await orderRepository.findById(orderId); return order?.status === OrderStatus.Confirmed; }); });}); // Pattern 2: Generous Timeouts with Fast Pollingdescribe('Adaptive Waiting', () => { it('should handle variable processing times', async () => { await orderService.placeOrder({ /* ... */ }); // Short poll interval (50ms) = fast when condition is met quickly // Long timeout (30s) = survives slow CI environments await waitFor( async () => { const order = await orderRepository.findById(orderId); return order?.status === OrderStatus.Confirmed; }, { interval: 50, // Check frequently timeout: 30000, // But wait a long time if needed } ); });}); // Pattern 3: Assert Final State, Not Eventsdescribe('State-Based Assertions', () => { // BAD: Flaky due to event timing it.skip('FLAKY: waits for specific event count', async () => { await orderService.placeOrder({ /* ... */ }); // FLAKY: Race condition if events process faster than subscription await eventCollector.waitForEventCount(4); // May miss events expect(eventCollector.getEvents()).toHaveLength(4); }); // GOOD: Assert on database state it('should result in correct final state', async () => { const order = await orderService.placeOrder({ /* ... */ }); // Wait for final state - this is deterministic await waitFor(async () => { const finalOrder = await orderRepository.findById(order.id); return finalOrder?.status === OrderStatus.Confirmed; }); // Then verify all side effects of that state const inventory = await inventoryRepository.findByProductId('prod-1'); expect(inventory.reserved).toBeGreaterThan(0); });}); // Pattern 4: Complete Isolation Between Testsdescribe('State Isolation', () => { beforeEach(async () => { // Clear ALL state before each test await database.truncateAllTables(); await eventBus.purgeAllQueues(); // Re-seed base data each test needs await seedInventory('prod-1', 1000); }); // Each test starts from known state}); // Pattern 5: Event Eventual Consistency Guardsclass EventualConsistencyGuard { /** * Wait until system reaches consistent state after event processing */ static async waitForConsistency( stateCheck: () => Promise<boolean>, maxWait = 30000 ): Promise<void> { const startTime = Date.now(); while (Date.now() - startTime < maxWait) { if (await stateCheck()) { // Additional delay to ensure any in-flight operations complete await sleep(100); // Double-check consistency if (await stateCheck()) { return; } } await sleep(50); } throw new Error('System did not reach consistent state'); }} // Usageit('should reach consistent state', async () => { await orderService.placeMultipleOrders([...]); await EventualConsistencyGuard.waitForConsistency(async () => { const orders = await orderRepository.findAll(); const allConfirmed = orders.every(o => o.status === OrderStatus.Confirmed); const inventoryConsistent = await inventoryService.isConsistent(); return allConfirmed && inventoryConsistent; });});Some teams add automatic test retries to paper over flakiness. This masks bugs that will appear in production. Instead, investigate root causes and fix the underlying issues. Every flaky test is a signal that your system has timing-dependent behavior that production will also experience.
One of the most common integration failures is serialization mismatch. The producer serializes events one way; consumers expect another format. These bugs are invisible to unit tests.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
describe('Event Serialization', () => { /** * Test that events survive round-trip through broker serialization */ describe('Round-Trip Serialization', () => { it('should preserve all OrderPlacedEvent fields through JSON', () => { // Arrange const originalEvent = new OrderPlacedEvent({ eventId: EventId.from('evt-123'), orderId: OrderId.from('order-456'), customerId: CustomerId.from('cust-789'), items: [ { productId: 'prod-1', quantity: 5, price: Money.of(10.50) }, { productId: 'prod-2', quantity: 3, price: Money.of(25.00) }, ], totalAmount: Money.of(127.50), occurredAt: new Date('2024-01-15T10:30:00Z'), correlationId: CorrelationId.from('corr-abc'), }); // Act - Serialize and deserialize (simulates broker transport) const serialized = EventSerializer.toJSON(originalEvent); const deserialized = EventSerializer.fromJSON(serialized); // Assert - All fields preserved expect(deserialized).toBeInstanceOf(OrderPlacedEvent); const restored = deserialized as OrderPlacedEvent; expect(restored.eventId).toEqual(originalEvent.eventId); expect(restored.orderId).toEqual(originalEvent.orderId); expect(restored.customerId).toEqual(originalEvent.customerId); expect(restored.items).toHaveLength(2); expect(restored.items[0].productId).toBe('prod-1'); expect(restored.items[0].quantity).toBe(5); expect(restored.items[0].price.equals(Money.of(10.50))).toBe(true); expect(restored.totalAmount.equals(Money.of(127.50))).toBe(true); expect(restored.occurredAt.getTime()) .toBe(originalEvent.occurredAt.getTime()); expect(restored.correlationId).toEqual(originalEvent.correlationId); }); it('should handle edge cases in serialization', () => { // Test with edge case values const edgeCaseEvent = new OrderPlacedEvent({ eventId: EventId.from('evt-edge'), orderId: OrderId.from('order-edge'), customerId: CustomerId.from('cust-edge'), items: [], // Empty array totalAmount: Money.of(0.00), // Zero amount occurredAt: new Date(0), // Epoch }); const serialized = EventSerializer.toJSON(edgeCaseEvent); const restored = EventSerializer.fromJSON(serialized) as OrderPlacedEvent; expect(restored.items).toEqual([]); expect(restored.totalAmount.equals(Money.of(0))).toBe(true); expect(restored.occurredAt.getTime()).toBe(0); }); it('should handle special characters in string fields', () => { const specialEvent = new OrderPlacedEvent({ eventId: EventId.from('evt-special'), orderId: OrderId.from('order-ñ-日本語-🚀'), customerId: CustomerId.from('cust-<script>alert("xss")</script>'), items: [ { productId: 'prod-with-"quotes"-and-\slashes', quantity: 1, price: Money.of(10.00) }, ], totalAmount: Money.of(10.00), occurredAt: new Date(), }); const serialized = EventSerializer.toJSON(specialEvent); const restored = EventSerializer.fromJSON(serialized) as OrderPlacedEvent; expect(restored.orderId.value).toBe('order-ñ-日本語-🚀'); expect(restored.customerId.value).toBe('cust-<script>alert("xss")</script>'); expect(restored.items[0].productId).toBe('prod-with-"quotes"-and-\slashes'); }); }); /** * Test backward compatibility when event schemas evolve */ describe('Schema Evolution', () => { it('should deserialize events from older schema versions', () => { // Simulate receiving event serialized by old producer const oldFormatJson = { eventId: 'evt-old', orderId: 'order-123', customerId: 'cust-456', // Old schema: items as simple array of product IDs // New schema: items as objects with quantity and price productIds: ['prod-1', 'prod-2'], // OLD FORMAT amount: 100.00, // OLD: 'amount' vs NEW: 'totalAmount' timestamp: '2024-01-01T00:00:00Z', // OLD: 'timestamp' vs NEW: 'occurredAt' version: 1, }; // Act - Deserialize with adapter for old version const restored = EventSerializer.fromJSON( JSON.stringify(oldFormatJson), { schemaVersion: 1 } ) as OrderPlacedEvent; // Assert - Adapter transformed to new format expect(restored).toBeInstanceOf(OrderPlacedEvent); expect(restored.orderId.value).toBe('order-123'); expect(restored.items).toHaveLength(2); expect(restored.totalAmount.equals(Money.of(100.00))).toBe(true); }); it('should include schema version in serialized events', () => { const event = new OrderPlacedEvent({ eventId: EventId.from('evt-versioned'), orderId: OrderId.from('order-ver'), customerId: CustomerId.from('cust-ver'), items: [], totalAmount: Money.of(0), occurredAt: new Date(), }); const serialized = JSON.parse(EventSerializer.toJSON(event)); expect(serialized._schemaVersion).toBeDefined(); expect(serialized._schemaVersion).toBe(2); // Current version }); }); /** * Contract tests: verify producer and consumer agree on format */ describe('Contract Compatibility', () => { it('should match consumer expected format', () => { // This test uses a contract/fixture that both producer and consumer agree on const contract = loadContract('order-placed-v2.json'); // Producer generates event const event = OrderPlacedEventBuilder.fromContract(contract.example); const produced = EventSerializer.toJSON(event); // Consumer expects specific fields const parsed = JSON.parse(produced); // Verify against contract requirements contract.requiredFields.forEach((field: string) => { expect(parsed).toHaveProperty(field); }); expect(parsed.orderId).toMatch(contract.formats.orderId); expect(parsed.occurredAt).toMatch(contract.formats.timestamp); }); });});When producers and consumers are owned by different teams, use contract testing tools like Pact. Contracts become the source of truth that both sides verify against. Breaking changes are caught before deployment, not in production.
Integration tests are inherently slower than unit tests. Managing this speed penalty is crucial for maintaining developer productivity.
| Approach | Startup Time | Test Execution | Total for 50 Tests |
|---|---|---|---|
| In-Memory Event Bus | 0ms | 10-50ms | ~1-2 seconds |
| Embedded Broker (e.g., ActiveMQ Embedded) | 500ms | 50-100ms | ~3-5 seconds |
| Testcontainers (warm) | 0ms (reused) | 100-500ms | ~5-25 seconds |
| Testcontainers (cold) | 10-30s per container | 100-500ms | ~1-3 minutes |
| Full Docker Compose stack | 30-60s | 200-1000ms | ~2-5 minutes |
Integration tests are essential for event-driven systems because they catch bugs that unit tests fundamentally cannot. Let's consolidate the key practices:
What's Next:
Even with comprehensive unit and integration tests, event-driven systems fail in production. The final page of this module covers event debugging—techniques for understanding what happened when things go wrong, from distributed tracing to event replay and dead letter queue analysis.
You now have the techniques to build comprehensive integration testing suites for event-driven systems. These tests provide the confidence that your event flows work correctly end-to-end. Next, we'll explore debugging techniques for when production issues do occur.