Loading content...
Consider this scenario from a social media platform:
To Carol, Bob appears to be randomly expressing sympathy to no one. The context—the cause of Bob's response—is missing.
This is a causal consistency violation. The system delivered an effect (Bob's reply) before its cause (Alice's post). From a user's perspective, the conversation is incomprehensible.
Strong consistency would prevent this by totally ordering all operations globally—expensive and sometimes impossible. Eventual consistency provides no ordering guarantees at all—effects routinely arrive before causes.
Causal consistency threads the needle: it guarantees that if operation B was influenced by operation A, then every observer who sees B must first see A. It preserves the logical structure of cause and effect while allowing unrelated operations to be observed in different orders by different observers.
By the end of this page, you will understand Lamport's happens-before relation and its formal properties, how vector clocks and version vectors capture causal dependencies, the difference between causal consistency and other models, implementation strategies for causal broadcast and causal memory, how production systems (MongoDB, Riak, COPS) implement causal consistency, and the overhead costs and when they're justified.
Leslie Lamport's 1978 paper "Time, Clocks, and the Ordering of Events in a Distributed System" introduced the happens-before relation (→), the foundational concept for reasoning about causality in distributed systems.
Definition: The happens-before relation is the smallest relation satisfying:
Program Order: If events a and b occur in the same process, and a occurs before b in that process, then a → b.
Message Causality: If a is the sending of a message and b is the receipt of that same message, then a → b.
Transitivity: If a → b and b → c, then a → c.
Concurrent Events: Two events a and b are concurrent (written a || b) if neither a → b nor b → a. Concurrent events have no causal relationship—they cannot have influenced each other.
Critical Insight: The happens-before relation captures potential causality, not actual causality. If a → b, then a could have influenced b (information could have flowed from a to b). If a || b, then a and b cannot have influenced each other (no information could have flowed between them).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
# Visualizing Happens-Before Relation"""Consider three processes P1, P2, P3 and events: P1: ────●a────●b────────●e───────────●g────► ╲ ↗ ↗ ╲ ╱ ╱P2: ──────────────●c────●d─────────●f────► ↘ ╲P3: ───────────────────────●h────────────► Message sends: b→c, d→e, d→h, f→g Happens-before relationships:- a → b (program order in P1)- b → c (message from P1 to P2)- a → c (transitivity: a → b → c)- c → d (program order in P2)- d → e (message from P2 to P1)- d → h (message from P2 to P3)- d → f (program order in P2)- f → g (message from P2 to P1) Concurrent pairs:- a || h (no causal path)- e || h (both depend on d, but neither depends on the other)- b || f (b happens before things that happen before f, but not f itself)""" from dataclasses import dataclassfrom typing import Set, Dict, Optional, List, Tuplefrom enum import Enum class Ordering(Enum): BEFORE = "before" AFTER = "after" CONCURRENT = "concurrent" SAME = "same" @dataclassclass Event: """An event in a distributed system.""" process_id: str sequence_num: int # Local sequence number within process event_type: str # 'local', 'send', or 'receive' message_id: Optional[str] = None @property def id(self) -> str: return f"{self.process_id}:{self.sequence_num}" class HappensBeforeAnalyzer: """ Analyzes happens-before relationships in distributed executions. Used for debugging, testing, and understanding causality. """ def __init__(self): self.events: Dict[str, Event] = {} self.program_order: Dict[str, List[str]] = {} # process_id -> [event_ids] self.message_pairs: List[Tuple[str, str]] = [] # (send_id, receive_id) # Cached transitive closure self._happens_before_cache: Dict[Tuple[str, str], bool] = {} def add_event(self, event: Event): """Register an event.""" self.events[event.id] = event if event.process_id not in self.program_order: self.program_order[event.process_id] = [] self.program_order[event.process_id].append(event.id) # Invalidate cache self._happens_before_cache.clear() def add_message(self, send_event_id: str, receive_event_id: str): """Register that send_event caused receive_event.""" self.message_pairs.append((send_event_id, receive_event_id)) self._happens_before_cache.clear() def happens_before(self, a_id: str, b_id: str) -> bool: """ Determine if event a happens-before event b. Uses memoized transitive closure computation. """ cache_key = (a_id, b_id) if cache_key in self._happens_before_cache: return self._happens_before_cache[cache_key] result = self._compute_happens_before(a_id, b_id, set()) self._happens_before_cache[cache_key] = result return result def _compute_happens_before(self, a_id: str, b_id: str, visited: Set[str]) -> bool: """Compute happens-before via graph traversal.""" if a_id == b_id: return False if a_id in visited: return False visited.add(a_id) a = self.events[a_id] b = self.events[b_id] # Rule 1: Program order if a.process_id == b.process_id: a_idx = self.program_order[a.process_id].index(a_id) b_idx = self.program_order[b.process_id].index(b_id) if a_idx < b_idx: return True # Rule 2 & 3: Message causality + transitivity # Find all events that immediately follow a successors = self._immediate_successors(a_id) for succ_id in successors: if succ_id == b_id: return True if self._compute_happens_before(succ_id, b_id, visited.copy()): return True return False def _immediate_successors(self, event_id: str) -> List[str]: """Get events immediately after this one.""" successors = [] event = self.events[event_id] # Next event in program order if event.process_id in self.program_order: events = self.program_order[event.process_id] idx = events.index(event_id) if idx + 1 < len(events): successors.append(events[idx + 1]) # Message receives for send_id, recv_id in self.message_pairs: if send_id == event_id: successors.append(recv_id) return successors def compare(self, a_id: str, b_id: str) -> Ordering: """Determine the causal relationship between two events.""" if a_id == b_id: return Ordering.SAME if self.happens_before(a_id, b_id): return Ordering.BEFORE if self.happens_before(b_id, a_id): return Ordering.AFTER return Ordering.CONCURRENT def find_concurrent_pairs(self) -> List[Tuple[str, str]]: """Find all pairs of concurrent events.""" concurrent = [] event_ids = list(self.events.keys()) for i, a_id in enumerate(event_ids): for b_id in event_ids[i+1:]: if self.compare(a_id, b_id) == Ordering.CONCURRENT: concurrent.append((a_id, b_id)) return concurrentHappens-before is based on logical ordering, not physical time. Two events can be concurrent even if one occurred 'earlier' by wall-clock time, because information could not have traveled between them. Physical clocks in distributed systems cannot be perfectly synchronized, making logical ordering the only reliable basis for causality.
Lamport clocks (scalar logical clocks) can order events consistently with happens-before, but they cannot detect concurrency. If L(a) < L(b), we know b does not happen-before a, but we cannot distinguish whether a → b or a || b.
Vector clocks solve this by maintaining a vector of logical times, one entry per process. They fully characterize the happens-before relation.
Vector Clock Rules:
Initialization: Each process i starts with VC = [0, 0, ..., 0]
Local Event: Before each event, process i increments VC[i] by 1
Send: When sending message m, include current VC as timestamp: m.timestamp = VC
Receive: When receiving message m with timestamp m.ts:
Comparing Vector Clocks:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
# Comprehensive Vector Clock Implementationfrom typing import Dict, Optional, List, Tuplefrom dataclasses import dataclass, fieldfrom enum import Enumimport copy class ClockComparison(Enum): BEFORE = "before" # VC1 < VC2 AFTER = "after" # VC1 > VC2 CONCURRENT = "concurrent" # VC1 || VC2 EQUAL = "equal" # VC1 == VC2 @dataclassclass VectorClock: """ Vector clock for tracking causality in distributed systems. Mathematical Properties: - Characterizes happens-before: VC(a) < VC(b) ⟺ a → b - Detects concurrency: neither VC(a) ≤ VC(b) nor VC(b) ≤ VC(a) ⟺ a || b """ clock: Dict[str, int] = field(default_factory=dict) def increment(self, node_id: str) -> 'VectorClock': """ Increment this node's logical time. Called before every local event. """ new_clock = self.clock.copy() new_clock[node_id] = new_clock.get(node_id, 0) + 1 return VectorClock(new_clock) def merge(self, other: 'VectorClock') -> 'VectorClock': """ Merge with another vector clock (point-wise maximum). Called when receiving a message. """ merged = {} all_keys = set(self.clock.keys()) | set(other.clock.keys()) for key in all_keys: merged[key] = max( self.clock.get(key, 0), other.clock.get(key, 0) ) return VectorClock(merged) def compare(self, other: 'VectorClock') -> ClockComparison: """ Compare two vector clocks. Returns: BEFORE: self happens-before other AFTER: other happens-before self CONCURRENT: neither happens-before the other EQUAL: identical clocks """ all_keys = set(self.clock.keys()) | set(other.clock.keys()) self_less = False other_less = False for key in all_keys: self_val = self.clock.get(key, 0) other_val = other.clock.get(key, 0) if self_val < other_val: self_less = True if other_val < self_val: other_less = True if self_less and not other_less: return ClockComparison.BEFORE if other_less and not self_less: return ClockComparison.AFTER if self_less and other_less: return ClockComparison.CONCURRENT return ClockComparison.EQUAL def happens_before(self, other: 'VectorClock') -> bool: """Returns True if self → other.""" return self.compare(other) == ClockComparison.BEFORE def concurrent_with(self, other: 'VectorClock') -> bool: """Returns True if self || other.""" return self.compare(other) == ClockComparison.CONCURRENT def dominates(self, other: 'VectorClock') -> bool: """Returns True if self ≥ other (self sees everything other has seen).""" for key in other.clock: if self.clock.get(key, 0) < other.clock[key]: return False return True def copy(self) -> 'VectorClock': """Create a deep copy.""" return VectorClock(self.clock.copy()) @dataclassclass VersionedValue: """A value tagged with a vector clock for conflict detection.""" value: any vector_clock: VectorClock def supersedes(self, other: 'VersionedValue') -> bool: """Returns True if this version supersedes (is newer than) other.""" return other.vector_clock.happens_before(self.vector_clock) def conflicts_with(self, other: 'VersionedValue') -> bool: """Returns True if this version conflicts with other.""" return self.vector_clock.concurrent_with(other.vector_clock) class CausalMemory: """ Key-value store with causal consistency using vector clocks. Guarantees: - A read will never return a causally older version than a previous read - A read will see all writes that causally precede it """ def __init__(self, node_id: str): self.node_id = node_id self.clock = VectorClock() self.data: Dict[str, List[VersionedValue]] = {} # key -> list of versions def write(self, key: str, value: any) -> VectorClock: """ Write a value, establishing a new causal version. """ # Increment local clock self.clock = self.clock.increment(self.node_id) versioned = VersionedValue(value, self.clock.copy()) if key not in self.data: self.data[key] = [] # Remove versions superseded by this write self.data[key] = [ v for v in self.data[key] if not versioned.supersedes(v) ] self.data[key].append(versioned) return self.clock.copy() def read(self, key: str) -> Tuple[Optional[any], Optional[VectorClock]]: """ Read the latest version of a key. If multiple concurrent versions exist (conflict), returns one arbitrarily. Use read_all() to get all concurrent versions. """ if key not in self.data or not self.data[key]: return None, None # Return the latest version by our current clock latest = self.data[key][-1] return latest.value, latest.vector_clock.copy() def read_all(self, key: str) -> List[VersionedValue]: """ Read all concurrent versions of a key. In a causally consistent system, there should typically be only one version. Multiple versions indicate a conflict. """ return self.data.get(key, []).copy() def merge_remote(self, key: str, value: any, remote_clock: VectorClock): """ Merge a remotely written value into our state. Called during replication. """ # Update our clock to reflect seeing this write self.clock = self.clock.merge(remote_clock) versioned = VersionedValue(value, remote_clock.copy()) if key not in self.data: self.data[key] = [versioned] return # Filter out versions superseded by incoming remaining = [ v for v in self.data[key] if not versioned.supersedes(v) ] # Check if incoming is superseded by any existing version for v in remaining: if v.supersedes(versioned): # Incoming is older than what we have; ignore return # Check for existing version with same clock (duplicate) for v in remaining: if v.vector_clock.compare(remote_clock) == ClockComparison.EQUAL: return # Already have this version remaining.append(versioned) self.data[key] = remaining| VC1 | VC2 | Relationship | Meaning |
|---|---|---|---|
| [2,0,0] | [3,1,0] | VC1 < VC2 | Event with VC1 happens-before event with VC2 |
| [3,1,0] | [2,0,0] | VC1 > VC2 | Event with VC1 happens-after event with VC2 |
| [2,0,1] | [1,1,0] | VC1 || VC2 | Events are concurrent (neither caused the other) |
| [2,1,1] | [2,1,1] | VC1 = VC2 | Same logical time (same event or replicas of it) |
Causal broadcast is a communication primitive that ensures messages are delivered in causal order to all recipients. If message m1 causally precedes message m2, then every process that delivers both will deliver m1 before m2.
Delivery vs Receipt:
Causal broadcast may hold received messages in a buffer until their causal predecessors have been delivered.
Implementation Strategy:
Attach a vector clock to each message. Upon receipt, check if all causal dependencies are satisfied (the sender's previous messages have been delivered). If not, buffer the message until dependencies are met.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
# Causal Broadcast Implementationfrom dataclasses import dataclass, fieldfrom typing import List, Dict, Set, Callable, Optionalfrom threading import Lockimport queue @dataclassclass CausalMessage: """Message with vector clock timestamp for causal ordering.""" sender_id: str content: any vector_clock: VectorClock message_id: str class CausalBroadcast: """ Causal broadcast protocol ensuring causal delivery order. Safety: If deliver(m1) → deliver(m2) for any process, then for all processes that deliver both, m1 is delivered before m2. Liveness: Every message broadcast is eventually delivered to all correct processes. """ def __init__(self, node_id: str, peers: List['CausalBroadcast'], on_deliver: Callable[[CausalMessage], None]): self.node_id = node_id self.peers = peers self.on_deliver = on_deliver self.clock = VectorClock() self.pending: List[CausalMessage] = [] # Track what we've delivered from each sender # delivered[p] = highest sequence number delivered from p self.delivered: Dict[str, int] = {} self.lock = Lock() self.message_counter = 0 def broadcast(self, content: any): """ Broadcast a message to all peers with causal ordering. """ with self.lock: # Increment local clock (broadcast is a send event) self.clock = self.clock.increment(self.node_id) self.message_counter += 1 message = CausalMessage( sender_id=self.node_id, content=content, vector_clock=self.clock.copy(), message_id=f"{self.node_id}:{self.message_counter}" ) # Deliver locally first self._do_deliver(message) # Send to all peers (outside lock to avoid deadlock) for peer in self.peers: peer.receive(message) def receive(self, message: CausalMessage): """ Receive a message from another node. Buffer if causal dependencies not satisfied. """ with self.lock: if self._can_deliver(message): self._do_deliver(message) # Check if pending messages can now be delivered self._process_pending() else: # Buffer until dependencies are satisfied self.pending.append(message) def _can_deliver(self, message: CausalMessage) -> bool: """ Check if we can deliver this message (all dependencies satisfied). A message m from sender s with vector clock VC(m) can be delivered iff: 1. For the sender s: VC(m)[s] == delivered[s] + 1 (This is the next expected message from s) 2. For all other j ≠ s: VC(m)[j] <= delivered[j] (We've seen all messages that s had seen before sending m) """ msg_clock = message.vector_clock.clock sender = message.sender_id # Check condition 1: next expected from sender expected_from_sender = self.delivered.get(sender, 0) + 1 sender_seq = msg_clock.get(sender, 0) if sender_seq != expected_from_sender: return False # Check condition 2: all dependencies satisfied for node_id, count in msg_clock.items(): if node_id == sender: continue if count > self.delivered.get(node_id, 0): return False return True def _do_deliver(self, message: CausalMessage): """Actually deliver the message to the application.""" sender = message.sender_id sender_seq = message.vector_clock.clock.get(sender, 0) # Update delivered tracking self.delivered[sender] = sender_seq # Merge sender's clock into ours self.clock = self.clock.merge(message.vector_clock) # Deliver to application self.on_deliver(message) def _process_pending(self): """Try to deliver pending messages whose dependencies are now satisfied.""" made_progress = True while made_progress: made_progress = False for msg in self.pending[:]: # Iterate over copy if self._can_deliver(msg): self.pending.remove(msg) self._do_deliver(msg) made_progress = True def get_pending_count(self) -> int: """Get number of messages waiting for dependencies.""" with self.lock: return len(self.pending) # Example usage demonstrating causal orderingdef demo_causal_broadcast(): """ Demonstrate that messages are delivered in causal order. """ delivered_order = [] def on_deliver(msg: CausalMessage): delivered_order.append((msg.sender_id, msg.content)) print(f"Delivered: {msg.sender_id} said '{msg.content}'") # Create three nodes nodes = {} for name in ['Alice', 'Bob', 'Carol']: nodes[name] = CausalBroadcast(name, [], on_deliver) # Connect them for name, node in nodes.items(): node.peers = [n for n_name, n in nodes.items() if n_name != name] # Scenario: # 1. Alice broadcasts "I got fired" # 2. Bob receives Alice's message, then broadcasts "I'm sorry" # 3. Network delay: Carol receives Bob's message before Alice's # Alice broadcasts nodes['Alice'].broadcast("I got fired") # Bob receives and responds # (In real scenario, Bob's broadcast causally depends on seeing Alice's) nodes['Bob'].broadcast("I'm sorry to hear that") # Even though messages might arrive out of order at Carol, # causal broadcast ensures "I got fired" is delivered before "I'm sorry" return delivered_orderSeveral production systems implement causal consistency, each with different mechanisms and trade-offs:
| System | Mechanism | Scope | Key Features |
|---|---|---|---|
| MongoDB 3.6+ | Cluster time + operation timestamps | Session-scoped | Causal sessions with read concern 'majority' |
| Riak/Riak TS | Dotted version vectors | Per-key | Conflict detection at read time |
| COPS (research) | Explicit dependency tracking | Global | Causal+ consistency with convergent conflict resolution |
| Eiger (research) | Dependency metadata | Global | Read-only transactions on causally consistent snapshots |
| SwiftCloud | Shadow operations + timestamps | Global | Designed for geo-replication |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
# MongoDB Causal Consistency Sessionsfrom pymongo import MongoClient, ReadPreferencefrom pymongo.read_concern import ReadConcernfrom pymongo.write_concern import WriteConcern """MongoDB's Causal Consistency Implementation: 1. Cluster Time: A logical clock maintained across the cluster. - Advances on every operation - Total ordering of operations cluster-wide 2. Operation Time: Timestamp assigned to each operation. - For writes: assigned by primary - For reads: timestamp of data read 3. Session: Client-side construct tracking causal dependencies. - operationTime: highest operation time seen - clusterTime: highest cluster time seen 4. Read Concern 'majority': Ensures reads reflect acknowledged writes. - Combined with session, provides causal consistency""" def mongodb_causal_session_example(): """ Demonstrate MongoDB's causal consistency sessions. """ client = MongoClient( "mongodb://localhost:27017", replicaSet="myReplicaSet" ) db = client.mydb collection = db.mycollection # Start a causally consistent session with client.start_session(causal_consistency=True) as session: # Write with the session # MongoDB tracks this operation's timestamp collection.insert_one( {"user": "alice", "status": "posted"}, session=session ) # Read with the session # MongoDB ensures this read sees the previous write # (and all writes that causally precede it) result = collection.find_one( {"user": "alice"}, session=session ) # Even if this read goes to a secondary replica, # MongoDB ensures the secondary has replicated # at least up to our last operation's timestamp # The session tracks: # - operationTime: timestamp of last operation # - clusterTime: logical timestamp of cluster state seen print(f"Session cluster time: {session.cluster_time}") print(f"Session operation time: {session.operation_time}") def mongodb_causal_guarantees(): """ MongoDB causal sessions provide these guarantees: 1. Read your writes: A read in a session sees prior writes in that session. 2. Monotonic reads: A read in a session never sees state older than prior reads. 3. Monotonic writes: Writes in a session are ordered and durable before subsequent writes. 4. Writes follow reads: A write in a session occurs after reads in that session. These four properties together constitute causal consistency. """ client = MongoClient("mongodb://localhost:27017", replicaSet="rs") db = client.mydb posts = db.posts comments = db.comments with client.start_session(causal_consistency=True) as session: # Write a post post_result = posts.insert_one( {"author": "alice", "content": "Hello world!"}, session=session ) post_id = post_result.inserted_id # Write a comment that references the post # Causal consistency ensures the comment is never # visible before the post it references comments.insert_one( {"post_id": post_id, "author": "bob", "content": "Great post!"}, session=session ) # Any read in this session will see both documents # Any read in a different session might see neither (not yet replicated), # but will NEVER see the comment without the postCOPS (Clusters of Order-Preserving Servers) introduced 'Causal+ Consistency,' which adds convergent conflict resolution to causal consistency. This means concurrent writes are allowed (no blocking), and conflicts are resolved deterministically (e.g., LWW). The system is both highly available AND causally consistent.
Causal consistency comes with overhead costs that must be weighed against the benefits:
Metadata Overhead:
Computation Overhead:
Latency Overhead:
Mitigation Strategies:
If message N cannot be delivered until message N-1 arrives, and N-1 is lost or delayed, N is blocked. This creates head-of-line blocking. In practice, systems use timeouts, speculation, or probabilistic dependency tracking to mitigate this.
You now understand causal consistency at a deep, implementable level—from the mathematical foundations of Lamport's happens-before relation to the practical implementation of vector clocks and causal broadcast. Next, we explore Read-Your-Writes Consistency, a specific session guarantee that ensures users always see their own updates.