Loading learning content...
Partition-based ordering tells us that messages on the same partition should arrive in order. But how do we verify this? How do we detect when something goes wrong? How do we recover when messages arrive out of sequence? The answer lies in sequence numbers—explicit markers that encode message order into the message itself.
Sequence numbers transform ordering from an infrastructure assumption into an observable, verifiable property. They enable consumers to detect gaps (missing messages), duplicates (same message twice), and reversals (newer messages before older ones). Beyond detection, they enable sophisticated recovery strategies: buffering for reordering, gap filling through re-requests, and graceful degradation under ordering failures.
This page explores sequence numbers in depth—their design, assignment, validation, and the powerful patterns they enable for building robust ordered message processing systems.
By the end of this page, you will understand how to design and implement sequence number schemes, how consumers can detect and respond to ordering violations, and how to build reordering buffers for handling out-of-order arrivals.
A sequence number is a monotonically increasing value assigned to each message within an ordering scope. It explicitly encodes the message's position in the ordered sequence, allowing consumers to verify that they're processing messages in the correct order.
seq(n+1) = seq(n) + 1, but gaps may occur in some designs.Sequence Number vs. Offset:
In Kafka, the 'offset' is essentially a sequence number assigned by the broker. However, application-level sequence numbers serve different purposes:
| Property | Broker Offset | Application Sequence Number |
|---|---|---|
| Assigned By | Broker (infrastructure) | Producer (application) |
| Scope | Per partition | Per entity (within partition) |
| Semantics | Position in log | Position in entity's event stream |
| Visibility | Consumer metadata | Part of message payload |
| Control | Broker controls | Application controls |
| Use Case | Resume consumption | Detect per-entity ordering issues |
Example:
Imagine a single Kafka partition receiving events for Users A, B, and C:
Offset 0: User A event (seq: 1)
Offset 1: User B event (seq: 1)
Offset 2: User A event (seq: 2)
Offset 3: User C event (seq: 1)
Offset 4: User A event (seq: 3)
Offset 5: User B event (seq: 2)
Broker offsets (0-5) ensure partition-level ordering. Application sequence numbers (per user) enable verifying ordering within each user's event stream specifically.
Even if your messaging system guarantees ordering (Kafka per-partition), application-level sequence numbers provide defense in depth. They catch bugs in producer code, infrastructure misconfigurations, and subtle edge cases that infrastructure-level guarantees don't cover.
Generating correct sequence numbers requires careful consideration of concurrency, failures, and ordering scope. Several strategies exist, each with trade-offs.
| Strategy | Mechanism | Ordering Guarantee | Throughput | Failure Resilience |
|---|---|---|---|---|
| Single-Threaded Counter | Increment counter in single producer | Strong per-producer | Limited by producer | Numbers lost on crash |
| Database Sequence | Database auto-increment or sequence | Strong (serializable) | Database dependent | Durable across restarts |
| Atomic Counter Service | Centralized counter with atomic increment | Strong | Limited by service | Depends on service durability |
| Lamport Clock | Logical clock incremented on send | Causal only | High (local operation) | No gap detection possible |
| Vector Clock | One entry per producer, merged on receive | Causal only | High (local operation) | Complex, no total order |
| Hybrid Logical Clock | Physical time + logical counter | Causal with time correlation | High | Clock sync required |
Strategy 1: Database Sequence
For entities stored in a database, leverage the database's sequencing capabilities:
-- PostgreSQL sequence per entity
CREATE TABLE entity_sequence (
entity_id VARCHAR(255) PRIMARY KEY,
current_seq BIGINT NOT NULL DEFAULT 0
);
-- Atomic increment and return
UPDATE entity_sequence
SET current_seq = current_seq + 1
WHERE entity_id = 'order_12345'
RETURNING current_seq;
This approach:
Strategy 2: In-Memory Atomic Counter
For high-throughput scenarios where durability is less critical:
class SequenceGenerator {
private counters = new Map<string, { value: bigint; lock: Mutex }>();
async nextSequence(entityId: string): Promise<bigint> {
let entry = this.counters.get(entityId);
if (!entry) {
entry = { value: 0n, lock: new Mutex() };
this.counters.set(entityId, entry);
}
// Atomic increment under lock
return await entry.lock.runExclusive(() => {
entry!.value += 1n;
return entry!.value;
});
}
}
This approach:
Strategy 3: Event Store Sequence
When using event sourcing, the event store provides sequences:
interface EventStore {
// Appends event with optimistic concurrency
// Returns assigned sequence number
append(
streamId: string,
event: Event,
expectedVersion: number | 'any' | 'no_stream'
): Promise<{ sequenceNumber: number }>;
}
// Usage
const result = await eventStore.append(
'order-12345',
{ type: 'OrderShipped', data: { trackingNumber: 'XYZ' } },
5 // Fails if current version is not 5
);
// result.sequenceNumber === 6
This approach:
If you have an event store, use its built-in sequencing. If you have a database with entity state, consider database sequences. If you need maximum throughput and can tolerate restart-related gaps, use in-memory counters with periodic checkpointing.
With sequence numbers embedded in messages, consumers can detect three categories of ordering problems: gaps, duplicates, and reversals. Each indicates a different underlying issue and requires a different response.
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
interface Message { entityId: string; sequenceNumber: number; payload: unknown;} type ViolationType = 'gap' | 'duplicate' | 'reversal'; interface Violation { type: ViolationType; entityId: string; expected: number; received: number; timestamp: Date;} class OrderingViolationDetector { // Track last seen sequence per entity private lastSequence = new Map<string, number>(); private violations: Violation[] = []; detect(message: Message): Violation | null { const lastSeq = this.lastSequence.get(message.entityId); // First message for this entity if (lastSeq === undefined) { // Optionally validate it's sequence 1 if (message.sequenceNumber !== 1) { // Could be gap if we expect to start from 1 // or acceptable if we're joining mid-stream } this.lastSequence.set(message.entityId, message.sequenceNumber); return null; } const expected = lastSeq + 1; // Normal case: sequential if (message.sequenceNumber === expected) { this.lastSequence.set(message.entityId, message.sequenceNumber); return null; } // Violation detected let violation: Violation; if (message.sequenceNumber === lastSeq) { // Exact duplicate violation = { type: 'duplicate', entityId: message.entityId, expected, received: message.sequenceNumber, timestamp: new Date(), }; } else if (message.sequenceNumber < lastSeq) { // Out-of-order (reversal) violation = { type: 'reversal', entityId: message.entityId, expected, received: message.sequenceNumber, timestamp: new Date(), }; } else { // Gap: skipped sequence number(s) violation = { type: 'gap', entityId: message.entityId, expected, received: message.sequenceNumber, timestamp: new Date(), }; // Still update last seen, as we may need to continue this.lastSequence.set(message.entityId, message.sequenceNumber); } this.violations.push(violation); return violation; } getViolationStats(): { gaps: number; duplicates: number; reversals: number } { const stats = { gaps: 0, duplicates: 0, reversals: 0 }; for (const v of this.violations) { if (v.type === 'gap') stats.gaps++; else if (v.type === 'duplicate') stats.duplicates++; else if (v.type === 'reversal') stats.reversals++; } return stats; }}Responding to Violations:
| Violation | Immediate Response | Recovery Strategy |
|---|---|---|
| Gap | Log warning, emit metric | Wait-and-buffer, or proceed and reconcile later |
| Duplicate | Skip processing | Idempotent design ensures safety |
| Reversal | Buffer or reject | Hold out-of-order messages until expected arrives |
Gap Response Strategies:
Proceed and Reconcile — Accept the gap, process the new message, mark gap for later backfill. Suitable when eventual completeness is acceptable.
Wait and Buffer — Hold the new message, wait for the missing one(s) with timeout. If timeout expires, proceed with gap handling.
Request Missing — If the producer or a replay service exists, request retransmission of missing sequence numbers.
Fail Fast — Treat gaps as fatal errors, halt processing, alert operators. Appropriate for strict ordering requirements.
Duplicate Response:
Duplicates are generally safe if consumers are idempotent. The standard approach is to skip processing and log for visibility. Maintain a set of recently processed sequence numbers for detection.
Reversal Response:
Reversals are the trickiest. Options depend on requirements:
If your consumer maintains state (e.g., order status), processing out of order can corrupt state. An 'OrderShipped' event applying before 'OrderCreated' would fail. Always validate state transitions, not just sequence numbers.
When messages can arrive out of order but must be processed in order, a reordering buffer collects messages and emits them sequentially. This pattern is essential for systems receiving unordered feeds that require ordered output.
Reordering Buffer Mechanics:
Incoming (out of order): [seq:5] [seq:3] [seq:7] [seq:4] [seq:6]
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────┐
│ Reordering Buffer │
│ ┌───┬───┬───┬───┬───┬───┬───┬───┐ │
│ │ 3 │ 4 │ 5 │ 6 │ 7 │ │ │ │ │
│ └───┴───┴───┴───┴───┴───┴───┴───┘ │
│ Wait for seq 3 (expected next) │
└──────────────────┬──────────────────────┘
│
▼
Output (in order): [seq:3] [seq:4] [seq:5] [seq:6] [seq:7]
Key Design Decisions:
Buffer Size — How many messages to hold? Too small = drop messages; too large = memory exhaustion.
Timeout — How long to wait for missing messages? Too short = premature gaps; too long = latency.
Gap Handling — When timeout expires, skip the gap or fail?
Per-Entity vs Global — Separate buffer per entity (isolated, more memory) or shared (complex, less memory)?
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
interface SequencedMessage<T> { entityId: string; sequenceNumber: number; payload: T; receivedAt: Date;} interface BufferConfig { maxBufferSize: number; // Max messages to hold per entity gapTimeoutMs: number; // How long to wait for missing sequence onMessage: <T>(msg: SequencedMessage<T>) => Promise<void>; onGapTimeout: (entityId: string, missing: number[]) => Promise<void>;} class ReorderingBuffer<T> { private buffers = new Map<string, Map<number, SequencedMessage<T>>>(); private expectedSeq = new Map<string, number>(); private gapTimers = new Map<string, NodeJS.Timeout>(); constructor(private config: BufferConfig) {} async receive(message: SequencedMessage<T>): Promise<void> { const { entityId, sequenceNumber } = message; // Initialize expected sequence for new entity if (!this.expectedSeq.has(entityId)) { this.expectedSeq.set(entityId, sequenceNumber); } // Get or create entity buffer let buffer = this.buffers.get(entityId); if (!buffer) { buffer = new Map(); this.buffers.set(entityId, buffer); } const expected = this.expectedSeq.get(entityId)!; // If this is the expected message, process it and any buffered successors if (sequenceNumber === expected) { await this.processMessageAndSuccessors(entityId, message); } else if (sequenceNumber > expected) { // Future message - buffer it if (buffer.size >= this.config.maxBufferSize) { console.warn(`Buffer full for ${entityId}, dropping seq ${sequenceNumber}`); return; } buffer.set(sequenceNumber, message); // Start gap timer if not already running if (!this.gapTimers.has(entityId)) { this.startGapTimer(entityId, expected); } } else { // Past message - already processed or duplicate console.log(`Ignoring past/duplicate: entity=${entityId} seq=${sequenceNumber} expected=${expected}`); } } private async processMessageAndSuccessors( entityId: string, message: SequencedMessage<T> ): Promise<void> { // Cancel any gap timer this.cancelGapTimer(entityId); // Process this message await this.config.onMessage(message); let nextExpected = message.sequenceNumber + 1; const buffer = this.buffers.get(entityId); // Process any buffered successors while (buffer?.has(nextExpected)) { const nextMessage = buffer.get(nextExpected)!; buffer.delete(nextExpected); await this.config.onMessage(nextMessage); nextExpected++; } this.expectedSeq.set(entityId, nextExpected); // If buffer still has messages, they're waiting for gaps if (buffer && buffer.size > 0) { this.startGapTimer(entityId, nextExpected); } } private startGapTimer(entityId: string, expectedSeq: number): void { const timer = setTimeout(async () => { // Gap timeout - determine missing sequence numbers const buffer = this.buffers.get(entityId); if (!buffer) return; const missing: number[] = []; const minBuffered = Math.min(...buffer.keys()); for (let seq = expectedSeq; seq < minBuffered; seq++) { missing.push(seq); } await this.config.onGapTimeout(entityId, missing); // Option: Skip the gap and continue from next available const nextSeq = Math.min(...buffer.keys()); const nextMsg = buffer.get(nextSeq)!; buffer.delete(nextSeq); await this.processMessageAndSuccessors(entityId, nextMsg); }, this.config.gapTimeoutMs); this.gapTimers.set(entityId, timer); } private cancelGapTimer(entityId: string): void { const timer = this.gapTimers.get(entityId); if (timer) { clearTimeout(timer); this.gapTimers.delete(entityId); } }}In production, consider: (1) Persistent buffers for crash recovery, (2) Separate buffer state from processing state, (3) Metrics for buffer depth and gap frequency, (4) Alerting when buffers approach max size or gaps exceed thresholds.
Event sourcing has a natural affinity with sequence numbers. Each event in an aggregate's stream has a version number that serves as its sequence number. This version is critical for both ordering and optimistic concurrency control.
Event Store Versioning Model:
Stream: Order-12345
┌─────────────────────────────────────────────────────────────────┐
│ Version │ Event Type │ Timestamp │ Data │
├─────────┼─────────────────┼────────────────────────┼───────────┤
│ 1 │ OrderCreated │ 2024-01-15T10:00:00Z │ {...} │
│ 2 │ ItemAdded │ 2024-01-15T10:05:00Z │ {...} │
│ 3 │ ItemAdded │ 2024-01-15T10:07:00Z │ {...} │
│ 4 │ OrderSubmitted │ 2024-01-15T10:10:00Z │ {...} │
│ 5 │ PaymentReceived │ 2024-01-15T10:15:00Z │ {...} │
│ 6 │ OrderShipped │ 2024-01-16T09:00:00Z │ {...} │
└─────────────────────────────────────────────────────────────────┘
Optimistic Concurrency with Versions:
When appending events, the producer specifies the expected version:
interface AppendResult {
success: boolean;
newVersion?: number;
error?: 'WrongExpectedVersion' | 'StreamDeleted';
}
async function appendEvents(
streamId: string,
events: Event[],
expectedVersion: number
): Promise<AppendResult> {
// Atomically:
// 1. Check current version === expectedVersion
// 2. Append events with versions (expectedVersion + 1), (expectedVersion + 2), ...
// 3. Return new version
// If current version !== expectedVersion, fail with WrongExpectedVersion
}
This ensures:
Projecting Events with Sequence Tracking:
Projections (read models) that consume events must track their position:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
interface EventEnvelope { streamId: string; version: number; globalPosition: bigint; // Position across all streams eventType: string; data: unknown; metadata: { timestamp: Date; correlationId: string };} interface ProjectionCheckpoint { projectionName: string; lastProcessedPosition: bigint; lastProcessedAt: Date;} class EventProjection { private checkpointStore: CheckpointStore; private projectionName: string; constructor(name: string, checkpointStore: CheckpointStore) { this.projectionName = name; this.checkpointStore = checkpointStore; } async start(): Promise<void> { // Load last checkpoint const checkpoint = await this.checkpointStore.get(this.projectionName); const startPosition = checkpoint?.lastProcessedPosition ?? 0n; console.log(`Starting projection ${this.projectionName} from position ${startPosition}`); // Subscribe to events from checkpoint await this.subscribeFromPosition(startPosition); } private async processEvent(event: EventEnvelope): Promise<void> { // Validate sequence const expected = await this.getExpectedPosition(); if (event.globalPosition !== expected) { // Gap or out-of-order if (event.globalPosition < expected) { console.log(`Skipping already-processed event at position ${event.globalPosition}`); return; } else { throw new Error( `Gap detected: expected position ${expected}, received ${event.globalPosition}` ); } } // Apply event to projection await this.applyEvent(event); // Update checkpoint atomically with apply (ideally in same transaction) await this.checkpointStore.save({ projectionName: this.projectionName, lastProcessedPosition: event.globalPosition, lastProcessedAt: new Date(), }); } private async applyEvent(event: EventEnvelope): Promise<void> { // Projection-specific logic switch (event.eventType) { case 'OrderCreated': await this.handleOrderCreated(event.streamId, event.data); break; case 'OrderShipped': await this.handleOrderShipped(event.streamId, event.data); break; // ... other event types } } private async getExpectedPosition(): Promise<bigint> { const checkpoint = await this.checkpointStore.get(this.projectionName); return (checkpoint?.lastProcessedPosition ?? -1n) + 1n; }}Event stores often track two positions: the version within a stream (per-aggregate ordering) and the global position across all streams (for projections reading everything). Projections typically track global position for resumption, while aggregates use stream version for concurrency control.
Beyond basic detection and buffering, sequence numbers enable sophisticated patterns for complex ordering scenarios.
Epoch-Based Sequences:
When sequences must reset (system migration, aggregate split), epochs prevent confusion:
interface EpochSequence {
epoch: number; // Incremented on reset events
sequence: number; // Resets to 1 on new epoch
}
function compare(a: EpochSequence, b: EpochSequence): number {
// Epoch takes precedence
if (a.epoch !== b.epoch) return a.epoch - b.epoch;
return a.sequence - b.sequence;
}
// Example usage
const oldEvent: EpochSequence = { epoch: 1, sequence: 500 };
const newEvent: EpochSequence = { epoch: 2, sequence: 1 };
// newEvent comes after oldEvent despite lower sequence
console.log(compare(newEvent, oldEvent) > 0); // true
Hash Chain for Integrity:
interface ChainedMessage {
sequenceNumber: number;
previousHash: string; // Hash of previous message
payload: unknown;
hash: string; // Hash of this message (including previousHash)
}
function computeHash(msg: Omit<ChainedMessage, 'hash'>): string {
const content = JSON.stringify({
sequenceNumber: msg.sequenceNumber,
previousHash: msg.previousHash,
payload: msg.payload,
});
return crypto.createHash('sha256').update(content).digest('hex');
}
function verifyChain(messages: ChainedMessage[]): boolean {
for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
// Verify hash
const expectedHash = computeHash(msg);
if (msg.hash !== expectedHash) return false;
// Verify chain link
if (i > 0 && msg.previousHash !== messages[i - 1].hash) return false;
}
return true;
}
Hash chains provide:
Simple sequential integers suffice for most cases. Reach for advanced patterns when: (1) Multiple writers need concurrent access (vector clocks), (2) Sequence resets are possible (epochs), (3) Auditability and tampering detection are requirements (hash chains), (4) Hierarchical ordering is needed (compound sequences).
Sequence numbers transform message ordering from an infrastructure assumption into an observable, verifiable property. They're the foundation for reliable ordered processing in distributed systems.
What's Next:
We've covered how to detect ordering violations with sequence numbers. But what happens when messages inevitably arrive out of order? The next page explores handling out-of-order messages—strategies from dropping to buffering to reprocessing, with deep dives into the trade-offs of each approach.
You now understand how to design sequence number schemes, detect ordering violations, implement reordering buffers, and apply advanced patterns like hash chains and epochs. Next, we'll explore strategies for handling out-of-order messages in production systems.