Loading learning content...
Once a leader is elected, its primary responsibility is log replication—ensuring that every server in the cluster has an identical copy of the log. This is where Raft actually achieves consensus: by agreeing on the sequence of log entries, servers agree on the sequence of state machine commands, and therefore on the final state.
Log replication in Raft is elegantly asymmetric. The leader is the single source of truth: it accepts client commands, appends them to its log, and pushes those entries to followers. Followers never initiate replication—they simply accept what the leader sends (if it's valid) or reject it (if something is wrong).
This simplicity has a cost: the leader is a bottleneck. But it also has a profound benefit: the replication protocol is easy to reason about. Data flows one way, from leader to followers. There's no conflict resolution, no merge logic, no vector clocks. Just append, replicate, commit.
By the end of this page, you will understand:
• The structure of log entries and why each field exists • The AppendEntries RPC and its dual role as replication and heartbeat • The Log Matching Property that guarantees consistency • How leaders handle follower inconsistencies and bring them into sync • The commitment rule that determines when entries are safely applied • Optimization techniques used in production Raft implementations
The Raft log is conceptually simple: an ordered sequence of entries, each containing a command to be executed by the state machine. But the details of log entry structure are crucial to understanding replication.
Each log entry contains:
Index — The position of the entry in the log (1-indexed). Once assigned, an entry's index never changes.
Term — The term in which the entry was created. This captures "when" the entry was written in logical time.
Command — The actual operation to apply to the state machine (e.g., "SET x = 5").
The combination of index and term uniquely identifies a log entry across the entire cluster. Two entries with the same index and term are guaranteed to contain the same command—a property we'll explore in depth.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
from dataclasses import dataclassfrom typing import List, Any, Optional @dataclass(frozen=True)class LogEntry: """ A single entry in the Raft log. Immutable once created - index and term never change. """ index: int # Position in log (1-indexed, never changes) term: int # Term when entry was created (for consistency checking) command: Any # Client command (opaque to Raft, interpreted by state machine) def __repr__(self): return f"[{self.index}|T{self.term}]: {self.command}" class RaftLog: """ The Raft log: an ordered, append-only sequence of entries. Key invariants: 1. Entries are indexed from 1 (index 0 doesn't exist) 2. Once an entry is at index i, it stays at index i 3. Committed entries are never removed 4. Uncommitted entries MAY be removed (for conflict resolution) """ def __init__(self): # Internal storage (0-indexed, but entries have 1-indexed .index) self._entries: List[LogEntry] = [] # Index of highest committed entry self._commit_index: int = 0 # Index of highest entry applied to state machine self._last_applied: int = 0 # === QUERY METHODS === def get_last_index(self) -> int: """Return index of last entry, or 0 if log is empty.""" if not self._entries: return 0 return self._entries[-1].index def get_last_term(self) -> int: """Return term of last entry, or 0 if log is empty.""" if not self._entries: return 0 return self._entries[-1].term def get_entry(self, index: int) -> Optional[LogEntry]: """Get entry at given index, or None if not present.""" if index < 1 or index > len(self._entries): return None return self._entries[index - 1] # Convert to 0-indexed def get_term_at(self, index: int) -> int: """Get term of entry at index, or 0 if not present.""" entry = self.get_entry(index) return entry.term if entry else 0 def get_entries_from(self, start_index: int) -> List[LogEntry]: """Get all entries from start_index onwards.""" if start_index < 1: return list(self._entries) if start_index > len(self._entries): return [] return list(self._entries[start_index - 1:]) # === MODIFICATION METHODS === def append(self, term: int, command: Any) -> LogEntry: """ Append a new entry to the log. Used by leader when receiving client commands. """ new_index = len(self._entries) + 1 entry = LogEntry(index=new_index, term=term, command=command) self._entries.append(entry) return entry def append_entries(self, prev_index: int, prev_term: int, entries: List[LogEntry]) -> bool: """ Append entries from leader (AppendEntries RPC). Args: prev_index: Index of entry immediately before new entries prev_term: Term of entry at prev_index entries: New entries to append Returns: True if successful, False if log doesn't match at prev_index """ # Special case: prev_index 0 means entries start at beginning if prev_index == 0: pass # No consistency check needed else: # Consistency check: our log must have matching entry at prev_index if prev_index > len(self._entries): return False # We don't have that entry if self._entries[prev_index - 1].term != prev_term: return False # We have different entry (conflict!) # If we got here, log matches up to prev_index # Now handle new entries for entry in entries: existing = self.get_entry(entry.index) if existing is None: # No entry at this index - append self._entries.append(entry) elif existing.term != entry.term: # Conflict! Existing entry has different term. # Delete this entry and all that follow, then append new self._entries = self._entries[:entry.index - 1] self._entries.append(entry) # else: Same entry already present (from previous attempt), skip return True def truncate_from(self, index: int): """ Remove all entries from index onwards. Used when leader detects conflict in follower's log. """ if index > self._commit_index: # Never truncate committed entries! self._entries = self._entries[:index - 1] # Example: Log evolution over timelog = RaftLog() # Leader in term 1 receives client commandslog.append(term=1, command={"op": "SET", "key": "x", "value": 1})log.append(term=1, command={"op": "SET", "key": "y", "value": 2}) log.append(term=1, command={"op": "SET", "key": "x", "value": 3}) # Log now: [1|T1]:SET x=1, [2|T1]:SET y=2, [3|T1]:SET x=3 # New leader in term 2log.append(term=2, command={"op": "DELETE", "key": "y"}) # Log now: [1|T1]:SET x=1, [2|T1]:SET y=2, [3|T1]:SET x=3, [4|T2]:DEL yThe term in each entry isn't just metadata—it's essential for detecting conflicts. If two servers have entries at the same index with different terms, one of them has an entry from a leader that was superseded. The entry with the later term wins, because only committed entries from previous terms survive.
The AppendEntries RPC is the workhorse of Raft. It serves two purposes:
The same RPC handler processes both cases. The only difference is whether the entries list is empty.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
from dataclasses import dataclassfrom typing import List @dataclassclass AppendEntriesRequest: """ Arguments for AppendEntries RPC (sent by leader). """ # Leader's current term # Used by followers to: # 1. Reject if our term is higher (stale leader) # 2. Update our term if leader's is higher term: int # Leader's ID # So followers know who to redirect clients to leader_id: int # === LOG CONSISTENCY ARGUMENTS === # Index of log entry immediately preceding new entries # If entries=[e5, e6], then prev_log_index=4 prev_log_index: int # Term of entry at prev_log_index # For the consistency check (Log Matching Property) prev_log_term: int # === ENTRIES TO APPEND === # Log entries to append (empty for heartbeat) entries: List[LogEntry] # === COMMITMENT INFORMATION === # Leader's commit index # Followers use this to know which entries are safe to apply leader_commit: int @dataclassclass AppendEntriesResponse: """ Response from follower. """ # Current term (for leader to update itself if stale) term: int # True if follower matched prev_log_index/prev_log_term # and appended entries successfully success: bool # OPTIMIZATION: If success=False, where follower diverges # Leader uses this to skip back more efficiently # (Not in original Raft, but common in implementations) conflict_index: int = 0 conflict_term: int = 0 class AppendEntriesHandler: """ Handles incoming AppendEntries RPCs (follower side). """ def __init__(self, server): self.server = server def handle(self, request: AppendEntriesRequest) -> AppendEntriesResponse: """ Process AppendEntries RPC. Complete implementation: """ # ======================================== # STEP 1: Term check # ======================================== if request.term < self.server.current_term: # Leader is from past term - reject return AppendEntriesResponse( term=self.server.current_term, success=False ) # If leader's term is higher, update and become follower if request.term > self.server.current_term: self.server.become_follower(request.term) # Valid leader! Reset election timer. self.server.reset_election_timer() self.server.leader_id = request.leader_id # ======================================== # STEP 2: Log consistency check # ======================================== if request.prev_log_index > 0: # Check that we have matching entry at prev_log_index our_term = self.server.log.get_term_at(request.prev_log_index) if our_term == 0: # We don't have an entry at that index return AppendEntriesResponse( term=self.server.current_term, success=False, conflict_index=self.server.log.get_last_index() + 1, conflict_term=0 ) if our_term != request.prev_log_term: # We have an entry, but with different term (conflict!) # Find first entry of conflicting term for faster backoff conflict_idx = self._find_first_of_term(our_term) return AppendEntriesResponse( term=self.server.current_term, success=False, conflict_index=conflict_idx, conflict_term=our_term ) # ======================================== # STEP 3: Append entries # ======================================== for entry in request.entries: existing = self.server.log.get_entry(entry.index) if existing is None: # New entry - append self.server.log.append_entry_at(entry) elif existing.term != entry.term: # Conflict - truncate and append self.server.log.truncate_from(entry.index) self.server.log.append_entry_at(entry) # else: Already have this exact entry, skip # ======================================== # STEP 4: Update commit index # ======================================== if request.leader_commit > self.server.commit_index: # Commit index is min of leader's commit and our last entry # (We might not have all entries the leader has committed) self.server.commit_index = min( request.leader_commit, self.server.log.get_last_index() ) # Trigger application of newly committed entries self.server.apply_committed_entries() # ======================================== # STEP 5: Persist and respond # ======================================== self.server.persist() # Log changes must be durable before responding return AppendEntriesResponse( term=self.server.current_term, success=True ) def _find_first_of_term(self, term: int) -> int: """Find index of first entry with given term (for fast backoff).""" for i in range(1, self.server.log.get_last_index() + 1): if self.server.log.get_term_at(i) == term: return i return 1 # Shouldn't happen, but safe fallbackThe Heartbeat Mechanism:
When AppendEntries is sent with an empty entries list, it functions as a heartbeat. Heartbeats:
Leaders typically send heartbeats every 50-150ms. The election timeout must be significantly longer (150-300ms) to account for network delays and prevent spurious elections.
The Log Matching Property is a key invariant that Raft maintains:
If two logs contain an entry with the same index and term, then:
- The entries contain the same command
- All preceding entries are also identical
This powerful guarantee simplifies reasoning about log consistency. Let's understand why it holds.
Part 1: Same command
Each leader creates at most one entry per index per term:
Therefore, within a single term, there's exactly one entry at each index. If two logs have entries with the same (index, term), they came from the same leader and contain the same command.
Part 2: All preceding entries match
AppendEntries includes prev_log_index and prev_log_term:
This is a simple consistency check that guarantees the entire log prefix matches.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
"""Log Matching Property Illustrated The Log Matching Property is maintained through a simple inductive mechanismin the AppendEntries protocol. === EXAMPLE: How Matching Propagates === Leader log: [1|T1]:a [2|T1]:b [3|T2]:c [4|T2]:d ↑ ↑ ↑ ↑ Entry 1 Entry 2 Entry 3 Entry 4 To send entry 4, leader includes: prev_log_index = 3 prev_log_term = T2 Follower checks: "Do I have entry 3 with term T2?" - If yes: Accept entry 4, log now matches up to entry 4 - If no: Reject, leader will backtrack === WHY THIS CREATES A CHAIN === Entry 4 accepted → Entry 3 must matchEntry 3 was accepted earlier → Entry 2 must matchEntry 2 was accepted earlier → Entry 1 must matchEntry 1 was first entry → No predecessor to check By induction: If entry N matches, entries 1..N-1 also match. === EXAMPLE WITH DIVERGENT LOGS === Scenario: Leader crashed while replicating, new leader has different entries. Leader (term 3) log: [1|T1]:a [2|T1]:b [3|T2]:c [4|T3]:xFollower log: [1|T1]:a [2|T1]:b [3|T2]:c [4|T2]:y [5|T2]:z ↑ ↑ Conflict! Different term! Step 1: Leader sends AppendEntries with entry 4 prev_log_index = 3, prev_log_term = T2 entries = [[4|T3]:x] Step 2: Follower checks prev_log Entry 3 = [3|T2]:c ✓ matches! Step 3: Follower processes entry 4 Existing entry 4 = [4|T2]:y (term T2 ≠ T3!) CONFLICT DETECTED Step 4: Follower truncates from index 4 Log becomes: [1|T1]:a [2|T1]:b [3|T2]:c Step 5: Follower appends new entry 4 Log becomes: [1|T1]:a [2|T1]:b [3|T2]:c [4|T3]:x Result: Follower log now matches leader log!(Entries [4|T2]:y and [5|T2]:z were uncommitted and correctly removed)""" def demonstrate_log_matching(): """ Show how log matching property is enforced. """ # Scenario: Two followers with different states leader_log = [ LogEntry(1, 1, "a"), LogEntry(2, 1, "b"), LogEntry(3, 2, "c"), LogEntry(4, 3, "x"), ] follower_a_log = [ LogEntry(1, 1, "a"), LogEntry(2, 1, "b"), # Missing entries 3 and 4 ] follower_b_log = [ LogEntry(1, 1, "a"), LogEntry(2, 1, "b"), LogEntry(3, 2, "c"), LogEntry(4, 2, "y"), # Different term! LogEntry(5, 2, "z"), ] print("Leader's task: Bring all followers to match its log") print() # For Follower A (missing entries) print("Follower A: Missing entries") print(" Leader sends prev_log_index=2, prev_log_term=1") print(" Follower A has [2|T1]:b ✓ match") print(" Follower A accepts entries 3 and 4") print(" After: Follower A log = Leader log ✓") print() # For Follower B (conflicting entries) print("Follower B: Has conflicting entries") print(" Leader sends prev_log_index=3, prev_log_term=2") print(" Follower B has [3|T2]:c ✓ match for prev_log") print(" But entry 4 conflicts: [4|T2]:y vs [4|T3]:x") print(" Follower B truncates from index 4") print(" Follower B accepts entry [4|T3]:x") print(" After: Follower B log = Leader log ✓")When a follower truncates conflicting entries, those entries were necessarily uncommitted. Committed entries are replicated to a majority and can only exist if a leader replicated them. Since the new leader must have won an election (requiring majority votes with the election restriction), it must have all committed entries. Therefore, any conflicting entries on followers were never committed.
When a leader receives a client command, it goes through a precise sequence of steps:
The leader maintains two pieces of state for each follower:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
import asynciofrom typing import Dict, Set, Optionalfrom dataclasses import dataclass @dataclassclass PendingCommand: """Tracks a client command awaiting commitment.""" index: int term: int command: Any future: asyncio.Future # Resolved when command is committed and applied class LeaderReplicator: """ Handles log replication from leader to followers. """ def __init__(self, server, peers: list): self.server = server self.peers = peers # For each peer: next index to send self.next_index: Dict[int, int] = {} # For each peer: highest replicated index self.match_index: Dict[int, int] = {} # Pending client commands awaiting commitment self.pending: Dict[int, PendingCommand] = {} # index -> PendingCommand # Background replication tasks self._replication_tasks: Dict[int, asyncio.Task] = {} def initialize_leader_state(self): """ Called when becoming leader. Initialize replication state for all peers. """ last_index = self.server.log.get_last_index() for peer in self.peers: # Optimistic: Assume peer is caught up # Will correct if AppendEntries fails self.next_index[peer] = last_index + 1 self.match_index[peer] = 0 async def handle_client_command(self, command: Any) -> Any: """ Process a client command: 1. Append to local log 2. Replicate to followers 3. Wait for commitment 4. Apply and return result """ # Step 1: Append to local log entry = self.server.log.append( term=self.server.current_term, command=command ) # Step 2: Create pending command tracker future = asyncio.get_event_loop().create_future() self.pending[entry.index] = PendingCommand( index=entry.index, term=entry.term, command=command, future=future ) # Step 3: Trigger replication to all peers self._trigger_replication_all() # Step 4: Wait for commitment result = await future return result def _trigger_replication_all(self): """ Start/restart replication to all peers. Called after appending new entries or on heartbeat timer. """ for peer in self.peers: self._trigger_replication(peer) def _trigger_replication(self, peer: int): """ Ensure replication task is running for this peer. """ if peer not in self._replication_tasks or self._replication_tasks[peer].done(): self._replication_tasks[peer] = asyncio.create_task( self._replicate_to_peer(peer) ) async def _replicate_to_peer(self, peer: int): """ Replicate log entries to a specific peer. Handles retries and backoff on failure. """ while self.server.is_leader(): next_idx = self.next_index[peer] prev_idx = next_idx - 1 prev_term = self.server.log.get_term_at(prev_idx) # Get entries to send entries = self.server.log.get_entries_from(next_idx) # Build request request = AppendEntriesRequest( term=self.server.current_term, leader_id=self.server.id, prev_log_index=prev_idx, prev_log_term=prev_term, entries=entries, leader_commit=self.server.commit_index ) try: response = await self.server.rpc.send_append_entries(peer, request) if response.term > self.server.current_term: # We're stale! Step down. self.server.become_follower(response.term) return if response.success: # Replication succeeded! if entries: self.next_index[peer] = entries[-1].index + 1 self.match_index[peer] = entries[-1].index # Check if we can commit more entries self._advance_commit_index() return # Done with this peer for now else: # Replication failed - need to backtrack self._handle_replication_failure(peer, response) # Loop will retry with new next_index except asyncio.TimeoutError: # Peer didn't respond - will retry on next heartbeat return def _handle_replication_failure(self, peer: int, response: AppendEntriesResponse): """ Handle failed AppendEntries by adjusting next_index. Basic version: Decrement by 1 Optimized version: Use conflict info to skip back faster """ if response.conflict_term > 0: # Optimized: Skip all entries of conflicting term # Find our last entry of conflict_term found = False for i in range(self.server.log.get_last_index(), 0, -1): if self.server.log.get_term_at(i) == response.conflict_term: self.next_index[peer] = i + 1 found = True break if not found: # We don't have any entries of that term # Back up to conflict_index self.next_index[peer] = response.conflict_index else: # Basic: Decrement by 1 self.next_index[peer] = max(1, self.next_index[peer] - 1) def _advance_commit_index(self): """ Check if we can commit more entries. Entry is committed if it's: 1. At an index replicated to a majority 2. From the current term """ for n in range(self.server.commit_index + 1, self.server.log.get_last_index() + 1): if self.server.log.get_term_at(n) != self.server.current_term: # Can only commit entries from current term directly # (Entries from previous terms are committed indirectly) continue # Count replications replicated_count = 1 # Leader has it for peer in self.peers: if self.match_index.get(peer, 0) >= n: replicated_count += 1 # Check majority if replicated_count > (len(self.peers) + 1) / 2: self.server.commit_index = n self._resolve_pending_commands(n) def _resolve_pending_commands(self, up_to_index: int): """ Apply committed entries and resolve pending client commands. """ while self.server.last_applied < up_to_index: self.server.last_applied += 1 entry = self.server.log.get_entry(self.server.last_applied) # Apply to state machine result = self.server.state_machine.apply(entry.command) # Resolve pending command if we have one if entry.index in self.pending: pending = self.pending.pop(entry.index) if not pending.future.done(): pending.future.set_result(result)An entry is committed when it's safe to apply to the state machine—meaning it will never be overwritten or lost. The commitment rule in Raft is more subtle than it first appears.
Basic idea: An entry is committed when it's been replicated to a majority of servers.
The subtlety: A leader can only commit entries from its current term directly. Entries from previous terms are committed indirectly when a current-term entry is committed.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
"""Why the Commitment Rule Exists=============================== Consider this scenario (the "figure 8" problem from the Raft paper): Time →======= State 1: S1 is leader of term 2, replicates entry to S2 S1 (leader T2): [1|T1] [2|T2] ← entry 2 created S2: [1|T1] [2|T2] ← replicated S3: [1|T1] S4: [1|T1] S5: [1|T1] Entry [2|T2] is on 2 servers. Is it committed? NO! State 2: S1 crashes. S5 becomes leader of term 3. S1 (dead): [1|T1] [2|T2] S2: [1|T1] [2|T2] S3: [1|T1] S4: [1|T1] S5 (leader T3): [1|T1] [3|T3] ← S5 wins election (S3,S4,S5 voted) ↑ Different entry at index 2! S5 can win because:- S5's log ends at [1|T1]- S3 and S4 also end at [1|T1] - Majority (S3,S4,S5) can elect S5- S2 wouldn't vote for S5 (S2's log is more up-to-date) State 3: S5 replicates entry [3|T3] to majority S1 (dead): [1|T1] [2|T2] S2: [1|T1] [2|T2] S3: [1|T1] [3|T3] ← Overwrites nothing, S3 had no entry 2 S4: [1|T1] [3|T3] S5 (leader T3): [1|T1] [3|T3] Entry [3|T3] is now on S3,S4,S5 (majority). It IS committed.Entry [2|T2] on S1,S2 will be OVERWRITTEN when S1 recovers. THE LESSON: Entry [2|T2] was on 2 servers (S1,S2) but was NOT committed because:1. S1 was leader of term 22. Entry was from term 23. S1 crashed before replicating to majority If S1 had replicated [2|T2] to S3 before crashing, then:- S5 couldn't have won the election (S3's log would be more up-to-date)- [2|T2] would have been committed CONCLUSION: Majority replication is necessary but not sufficient for commitmentwhen replicating entries from previous terms.""" def is_entry_committed( entry: LogEntry, current_term: int, replication_count: int, cluster_size: int) -> bool: """ Determine if an entry is committed. Rules: 1. Must be replicated to majority 2. If entry is from current term, that's sufficient 3. If entry is from previous term, need current-term entry committed after it """ majority = cluster_size // 2 + 1 if replication_count < majority: return False # Not on majority if entry.term == current_term: return True # Current term entry on majority = committed # Previous term entry - can only be committed indirectly # (This function can't determine this alone - need to check # if there's a committed current-term entry after it) return False # Caller needs to verify indirectly def demonstrate_indirect_commitment(): """ Show how previous-term entries are committed indirectly. """ print("Scenario: S1 is leader of term 4, has entries from terms 2 and 3") print() print("S1's log: [1|T1] [2|T2] [3|T3] [4|T4]") print(" ↑ ↑ ↑") print(" prev prev current") print(" term term term") print() print("When [4|T4] is replicated to majority and committed:") print(" - [4|T4] is committed (current term, on majority)") print(" - [3|T3] is committed (before committed entry)") print(" - [2|T2] is committed (before committed entry)") print() print("Why? Log Matching Property guarantees if [4|T4] matches,") print("all preceding entries also match. They can't be overwritten.")Leaders never directly commit entries from previous terms by counting replicas. They only commit entries from their current term. This ensures that once an entry is committed, the Log Matching Property guarantees it exists on any future leader (who must have voted for by a majority that includes servers with the committed entry).
In a Raft cluster, followers can diverge from the leader for various reasons:
Raft handles all these cases with the same mechanism: find the point of agreement, then overwrite everything after.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
"""Divergent Log Resolution in Raft The leader brings followers into sync by:1. Finding where logs diverge2. Overwriting follower's log from that point This works because any divergent entries are UNCOMMITTED.(Committed entries are on a majority; leader has them; election restriction.) === EXAMPLE: Multiple Divergent Followers === Current leader S1 (term 8) has authoritative log:S1: [1|T1] [2|T1] [3|T1] [4|T4] [5|T5] [6|T5] [7|T6] [8|T6] [9|T6] [10|T6] Followers with various divergent states:S2: [1|T1] [2|T1] [3|T1] [4|T4] [5|T5] [6|T5] [7|T6] [8|T6] [9|T6] ↑ Complete except last entry. next_index starts at 10. S3: [1|T1] [2|T1] [3|T1] [4|T4] ↑ Way behind. Was down for a while. next_index starts at 5. S4: [1|T1] [2|T1] [3|T1] [4|T4] [5|T5] [6|T5] [7|T6] ↑ Missing some entries. next_index starts at 8. S5: [1|T1] [2|T1] [3|T1] [4|T4] [5|T5] [6|T5] [7|T6] [8|T7] [9|T7] [10|T7] [11|T7] ↑ Was leader in term 7, added uncommitted entries 8-11. These conflict with leader's [8|T6] etc. Must be TRUNCATED and replaced. === RESOLUTION PROCESS FOR S5 === Step 1: Leader sends AppendEntries starting at predicted next_index next_index[S5] = 11 (optimistic - leader's last + 1) Leader sends: prev_log_index=10, prev_log_term=T6, entries=[] Step 2: S5 checks prev_log S5's entry 10 = [10|T7], term T7 ≠ T6 S5 rejects with conflict_term=T7, conflict_index=8 Step 3: Leader backs up Leader has no entries of term T7, so uses conflict_index next_index[S5] = 8 Step 4: Leader retries Leader sends: prev_log_index=7, prev_log_term=T6, entries=[8|T6],... Step 5: S5 checks prev_log S5's entry 7 = [7|T6], term T6 = T6 ✓ S5 accepts, truncates [8|T7]-[11|T7], appends [8|T6]-[10|T6] Step 6: S5 is now synchronized S5: [1|T1] [2|T1] [3|T1] [4|T4] [5|T5] [6|T5] [7|T6] [8|T6] [9|T6] [10|T6]""" class LogReconciliation: """ Demonstrates the log reconciliation process. """ @staticmethod def find_common_ancestor(leader_log: list, follower_log: list) -> int: """ Find the highest index where logs agree. This is the 'common ancestor' we replicate from. """ common = 0 for i in range(min(len(leader_log), len(follower_log))): if leader_log[i] == follower_log[i]: common = i + 1 # 1-indexed else: break return common @staticmethod def simulate_reconciliation( leader_log: list, follower_log: list ) -> list: """ Simulate bringing a follower into sync. Returns the reconciled follower log. """ # Find where logs match common = LogReconciliation.find_common_ancestor(leader_log, follower_log) print(f"Logs match up to index {common}") print(f"Truncating follower from {common + 1}") print(f"Appending {len(leader_log) - common} entries from leader") # Truncate follower log reconciled = follower_log[:common] # Append from leader reconciled.extend(leader_log[common:]) return reconciled # Exampleleader = ["1:T1:a", "2:T1:b", "3:T2:c", "4:T3:d"]follower = ["1:T1:a", "2:T1:b", "3:T2:x", "4:T2:y", "5:T2:z"]# ↑ Divergence starts here result = LogReconciliation.simulate_reconciliation(leader, follower)# Output: Logs match up to index 2# Truncating follower from 3# Appending 2 entries from leader# Result: ["1:T1:a", "2:T1:b", "3:T2:c", "4:T3:d"]Optimizing Backoff:
The naive approach decrements next_index by 1 on each failure. With large log divergence, this could require many round trips.
The optimized approach (used in production implementations) includes conflict information in the response:
The leader can then skip back to the appropriate point in one step.
Real-world Raft implementations include several optimizations beyond the basic protocol. Here are the most important ones:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
"""Key Replication Optimizations 1. BATCHING Instead of one AppendEntries per command, batch multiple commands: Without batching: 1000 commands = 1000 RPCs = 1000 * RTT latency With batching: 1000 commands = 10 RPCs (100 each) = 10 * RTT latency 2. PIPELINING Don't wait for ACK before sending next batch: Without pipelining: [Send batch 1] → [Wait for ACK] → [Send batch 2] → [Wait] → ... With pipelining: [Send batch 1] → [Send batch 2] → [Send batch 3] → [ACK 1] → ... 3. PARALLEL COMMIT Leader commits when ANY majority responds (doesn't wait for slowest): Followers: S2 (50ms), S3 (51ms), S4 (200ms) Commit at 51ms (S2+S3+leader = majority), not 200ms""" import asynciofrom typing import List, Set class OptimizedReplicator: """ Replicator with batching and pipelining. """ BATCH_SIZE = 100 BATCH_TIMEOUT_MS = 1 # Max wait before sending partial batch MAX_INFLIGHT_BATCHES = 4 # Pipeline depth def __init__(self, server): self.server = server self.pending_commands: List[Any] = [] self.inflight_count: Dict[int, int] = {} # peer -> inflight batches async def add_command(self, command: Any): """Add command to pending batch.""" self.pending_commands.append(command) # Send immediately if batch is full if len(self.pending_commands) >= self.BATCH_SIZE: await self._flush_batch() async def _flush_batch(self): """Send current pending commands as a batch.""" if not self.pending_commands: return batch = self.pending_commands[:self.BATCH_SIZE] self.pending_commands = self.pending_commands[self.BATCH_SIZE:] # Append all to local log entries = [] for cmd in batch: entry = self.server.log.append( term=self.server.current_term, command=cmd ) entries.append(entry) # Replicate to all peers with pipelining await self._replicate_entries(entries) async def _replicate_entries(self, entries: List[LogEntry]): """Replicate entries using pipelining.""" tasks = [] for peer in self.server.peers: # Check pipeline depth if self.inflight_count.get(peer, 0) < self.MAX_INFLIGHT_BATCHES: self.inflight_count[peer] = self.inflight_count.get(peer, 0) + 1 task = asyncio.create_task( self._send_and_track(peer, entries) ) tasks.append(task) # Wait for majority await self._wait_for_majority(tasks, entries) async def _wait_for_majority(self, tasks: list, entries: list): """Wait for majority acknowledgment.""" success_count = 1 # Leader majority = (len(self.server.peers) + 1) // 2 + 1 for coro in asyncio.as_completed(tasks): try: success = await coro if success: success_count += 1 if success_count >= majority: # Commit! self.server.commit_index = entries[-1].index return # Don't wait for slow replicas except Exception: continue # Didn't get majority - entries not committed # (but will be retried on next heartbeat)| Component | Purpose | Key Detail |
|---|---|---|
| Log Entry | Unit of replication | (index, term, command) uniquely identifies |
| AppendEntries RPC | Replicate/heartbeat | Includes prev_log_index/term for consistency |
| next_index[] | Track replication progress | Per-follower, where to send next |
| match_index[] | Track confirmed replication | Per-follower, highest replicated index |
| Commit Index | What's safe to apply | Updated when majority confirms |
What's Next:
With log replication understood, we can examine the safety guarantees that tie everything together. The next page covers the formal safety properties of Raft—State Machine Safety, Leader Completeness, and the proofs that show why Raft never loses committed data.
You now understand Raft's log replication mechanism in complete detail: the structure of log entries, the AppendEntries RPC protocol, the Log Matching Property that guarantees consistency, how leaders commit entries, how divergent followers are brought into sync, and the optimizations used in production systems.