Loading content...
At 2:47 AM on a Saturday, an undersea cable between your US and European data centers is severed by a ship's anchor. Your strongly consistent database, requiring transatlantic quorum for every write, becomes unavailable for European users. For the next 18 hours—until emergency repairs complete—millions of users see error pages.
Alternatively: your eventually consistent system continues serving requests from local replicas. European users see data that's a few seconds stale. American users' recent updates take minutes to propagate across the Atlantic. But everyone can still use the service. No revenue lost. No users churned.
This is the fundamental value proposition of eventual consistency: the guarantee that systems remain available and responsive even when network partitions make global coordination impossible. It's not a compromise or a weakness—it's a deliberate architectural choice that enables the global-scale, always-on systems that define modern internet infrastructure.
By the end of this page, you will understand the formal definition of eventual consistency and its BASE properties, the mathematical foundations of convergence, anti-entropy mechanisms (gossip protocols, Merkle trees, read repair), how to measure and bound consistency windows, conflict resolution strategies (LWW, vector clocks, CRDTs), and how to design applications that thrive with eventual consistency.
Eventual consistency is formally defined as a liveness guarantee:
If no new updates are made to a given data item, eventually all reads of that item will return the same value.
This definition, while precise, is intentionally weak. It provides no bounds on:
The weakness is the strength—by promising less, the system can deliver more availability.
BASE: The Eventually Consistent Philosophy
In contrast to ACID (Atomicity, Consistency, Isolation, Durability), eventually consistent systems follow BASE:
| Property | ACID | BASE |
|---|---|---|
| Consistency model | Strong/Immediate | Eventual |
| Availability | May be unavailable during partitions | Always available |
| State transitions | Atomic, all-or-nothing | Gradual propagation |
| Conflict handling | Prevention (locks, isolation) | Detection and resolution |
| Scalability | Limited by coordination | Horizontally scalable |
| Latency | Higher (coordination overhead) | Lower (local operations) |
The CAP theorem proves that during a network partition, a distributed system must choose between Consistency and Availability. Eventual consistency chooses Availability (AP systems). This isn't weakness—it's a principled choice. For many workloads, temporary inconsistency is far preferable to unavailability.
Understanding how eventual consistency systems converge requires examining the mathematical properties of update propagation.
Epidemic (Gossip) Propagation
Gossip protocols spread updates through a network like diseases spread through a population. The mathematics of epidemiology applies directly:
The differential equation governing spread:
dI/dt = β × I(t) × (N - I(t)) / N
This is the logistic growth equation. Its solution shows that updates reach all nodes in O(log N) rounds of gossip—remarkably efficient for large clusters.
Convergence Bounds
For a gossip-based system with:
The expected time for an update to reach all nodes:
T_convergence ≈ τ × (log N) / (log k)
For N=1000 nodes, k=3 peers, τ=100ms:
T_convergence ≈ 100ms × (log 1000) / (log 3) ≈ 100ms × 6.3 ≈ 630ms
This is why well-tuned eventually consistent systems converge in sub-second timeframes under normal operation.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
# Gossip Protocol Implementation with Convergence Analysisimport randomimport timeimport mathfrom dataclasses import dataclass, fieldfrom typing import Dict, Set, List, Optionalfrom threading import Thread, Lockimport logging @dataclassclass GossipMessage: """Update propagated via gossip.""" key: str value: any timestamp: float # Lamport timestamp for ordering origin_node: str version: int @dataclassclass NodeState: """State maintained by each node in the gossip cluster.""" data: Dict[str, tuple] = field(default_factory=dict) # key -> (value, timestamp, version) version_vector: Dict[str, int] = field(default_factory=dict) # node_id -> max_version_seen class GossipNode: """ Full-featured gossip node demonstrating eventual consistency. Implements anti-entropy, rumor mongering, and convergence tracking. """ def __init__(self, node_id: str, peers: List['GossipNode'], gossip_interval: float = 0.1, fanout: int = 3): self.node_id = node_id self.peers = peers self.gossip_interval = gossip_interval self.fanout = fanout # Number of peers to gossip to each round self.state = NodeState() self.pending_rumors: List[GossipMessage] = [] self.rumor_counts: Dict[str, int] = {} # message_id -> times_shared self.lock = Lock() # Metrics for convergence analysis self.messages_sent = 0 self.messages_received = 0 self.convergence_events: List[tuple] = [] # (key, time_to_converge) self._running = False self._gossip_thread: Optional[Thread] = None def start(self): """Start background gossip thread.""" self._running = True self._gossip_thread = Thread(target=self._gossip_loop, daemon=True) self._gossip_thread.start() def stop(self): """Stop gossip thread.""" self._running = False if self._gossip_thread: self._gossip_thread.join() def write(self, key: str, value: any) -> GossipMessage: """ Write a value locally and initiate gossip propagation. Returns the message for tracking convergence. """ with self.lock: # Increment our version current_version = self.state.version_vector.get(self.node_id, 0) + 1 self.state.version_vector[self.node_id] = current_version timestamp = time.time() # Create gossip message message = GossipMessage( key=key, value=value, timestamp=timestamp, origin_node=self.node_id, version=current_version ) # Apply locally self.state.data[key] = (value, timestamp, current_version) # Queue for rumor mongering self.pending_rumors.append(message) self.rumor_counts[self._message_id(message)] = 0 return message def read(self, key: str) -> Optional[any]: """ Read from local state. May return stale data! This is the eventual consistency trade-off. """ with self.lock: if key in self.state.data: return self.state.data[key][0] return None def _gossip_loop(self): """Background loop performing periodic gossip.""" while self._running: self._perform_gossip_round() time.sleep(self.gossip_interval) def _perform_gossip_round(self): """Execute one round of gossip protocol.""" if not self.peers: return # Select random peers (fanout) selected_peers = random.sample( self.peers, min(self.fanout, len(self.peers)) ) with self.lock: # Rumor mongering: share pending updates for peer in selected_peers: for rumor in self.pending_rumors[:]: msg_id = self._message_id(rumor) # Send rumor peer.receive_gossip(rumor, self) self.messages_sent += 1 self.rumor_counts[msg_id] += 1 # Stop spreading after sufficient attempts # (prevents infinite propagation) if self.rumor_counts[msg_id] >= math.ceil(math.log(len(self.peers) + 1)): self.pending_rumors.remove(rumor) # Anti-entropy: exchange full state digest for peer in selected_peers: self._anti_entropy_exchange(peer) def receive_gossip(self, message: GossipMessage, sender: 'GossipNode'): """Receive a gossip message from another node.""" with self.lock: self.messages_received += 1 # Check if this is new information current = self.state.data.get(message.key) if current is None or message.timestamp > current[1]: # New or newer update - apply it self.state.data[message.key] = ( message.value, message.timestamp, message.version ) # Update version vector origin_max = self.state.version_vector.get(message.origin_node, 0) self.state.version_vector[message.origin_node] = max( origin_max, message.version ) # Propagate rumor (if not seen before) msg_id = self._message_id(message) if msg_id not in self.rumor_counts: self.pending_rumors.append(message) self.rumor_counts[msg_id] = 0 def _anti_entropy_exchange(self, peer: 'GossipNode'): """ Exchange state digests and repair differences. This ensures convergence even if rumors are lost. """ # Get peer's version vector peer_versions = peer.get_version_vector() # Find keys where we have newer data for key, (value, timestamp, version) in self.state.data.items(): # Determine origin of this key's value # (simplified: assume key encodes origin or use separate tracking) for origin, our_max in self.state.version_vector.items(): peer_max = peer_versions.get(origin, 0) if our_max > peer_max: # We have updates the peer hasn't seen # Send repair message peer.receive_repair(key, value, timestamp, version, origin) def receive_repair(self, key: str, value: any, timestamp: float, version: int, origin: str): """Receive anti-entropy repair.""" with self.lock: current = self.state.data.get(key) if current is None or timestamp > current[1]: self.state.data[key] = (value, timestamp, version) origin_max = self.state.version_vector.get(origin, 0) self.state.version_vector[origin] = max(origin_max, version) def get_version_vector(self) -> Dict[str, int]: """Return current version vector for anti-entropy.""" with self.lock: return self.state.version_vector.copy() def _message_id(self, message: GossipMessage) -> str: """Generate unique ID for message deduplication.""" return f"{message.origin_node}:{message.key}:{message.version}" # ════════════════════════════════════════════════════════════════ # Convergence Analysis # ════════════════════════════════════════════════════════════════ @staticmethod def measure_convergence(nodes: List['GossipNode'], key: str, expected_value: any, timeout: float = 5.0) -> float: """ Measure time for all nodes to converge to expected value. Returns convergence time in seconds, or -1 if timeout. """ start = time.time() while time.time() - start < timeout: converged = all( node.read(key) == expected_value for node in nodes ) if converged: return time.time() - start time.sleep(0.01) # 10ms polling return -1 # Timeout # Example: Measuring gossip convergencedef demonstrate_convergence(): """Show how gossip achieves eventual consistency.""" # Create cluster of 100 nodes nodes = [GossipNode(f"node_{i}", [], gossip_interval=0.05) for i in range(100)] # Connect nodes (each knows all others for simplicity) for node in nodes: node.peers = [n for n in nodes if n != node] # Start gossip for node in nodes: node.start() # Write to one node write_time = time.time() nodes[0].write("important_key", "critical_value") # Measure convergence convergence_time = GossipNode.measure_convergence( nodes, "important_key", "critical_value" ) print(f"100 nodes converged in {convergence_time*1000:.2f}ms") print(f"Total messages: {sum(n.messages_sent for n in nodes)}") # Cleanup for node in nodes: node.stop()Eventually consistent systems employ multiple mechanisms to ensure convergence. Each addresses different failure modes:
1. Rumor Mongering (Push Gossip)
When a node receives an update, it actively pushes that update to random peers. This is fast but unreliable—messages can be lost, and there's no guarantee of delivery.
2. Anti-Entropy (State Exchange)
Periodically, nodes compare their complete state and repair differences. This is reliable but expensive—comparing large states consumes bandwidth.
3. Read Repair
During read operations, if a coordinator contacts multiple replicas and finds disagreement, it repairs the stale replicas. This piggybacks consistency maintenance on regular traffic.
4. Merkle Trees for Efficient Comparison
Comparing full states is expensive. Merkle trees enable efficient difference detection by organizing data into a hash tree. Only differing subtrees need examination.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
# Merkle Tree-Based Anti-Entropyimport hashlibfrom dataclasses import dataclassfrom typing import Dict, List, Optional, Set @dataclassclass MerkleNode: """Node in Merkle tree for efficient state comparison.""" hash: str left: Optional['MerkleNode'] = None right: Optional['MerkleNode'] = None key_range: tuple = None # (start_key, end_key) for leaf nodes leaf_data: Optional[Dict] = None # Actual data at leaf level class MerkleTree: """ Merkle tree for efficient anti-entropy. Used by Cassandra, DynamoDB, and other eventually consistent systems. """ def __init__(self, data: Dict[str, bytes], bucket_size: int = 100): self.bucket_size = bucket_size self.root = self._build_tree(data) def _build_tree(self, data: Dict[str, bytes]) -> MerkleNode: """Build Merkle tree from key-value data.""" if not data: return MerkleNode(hash=self._hash(b"empty")) # Sort keys for deterministic ordering sorted_keys = sorted(data.keys()) # Create leaf buckets buckets = [] for i in range(0, len(sorted_keys), self.bucket_size): bucket_keys = sorted_keys[i:i + self.bucket_size] bucket_data = {k: data[k] for k in bucket_keys} # Hash the bucket bucket_hash = self._hash_bucket(bucket_data) buckets.append(MerkleNode( hash=bucket_hash, key_range=(bucket_keys[0], bucket_keys[-1]), leaf_data=bucket_data )) # Build tree bottom-up while len(buckets) > 1: new_level = [] for i in range(0, len(buckets), 2): left = buckets[i] right = buckets[i + 1] if i + 1 < len(buckets) else None if right: combined_hash = self._hash( (left.hash + right.hash).encode() ) new_level.append(MerkleNode( hash=combined_hash, left=left, right=right )) else: new_level.append(left) buckets = new_level return buckets[0] def _hash(self, data: bytes) -> str: """Compute SHA-256 hash.""" return hashlib.sha256(data).hexdigest() def _hash_bucket(self, bucket: Dict[str, bytes]) -> str: """Hash a bucket of key-value pairs.""" # Sort for determinism items = sorted(bucket.items()) combined = b"".join(k.encode() + v for k, v in items) return self._hash(combined) def get_root_hash(self) -> str: """Get root hash for quick equality check.""" return self.root.hash def find_differences(self, other: 'MerkleTree') -> Set[str]: """ Find keys that differ between two trees. O(log n) when trees are mostly similar. """ differences = set() self._compare_nodes(self.root, other.root, differences) return differences def _compare_nodes(self, node1: MerkleNode, node2: MerkleNode, differences: Set[str]): """Recursively compare tree nodes.""" if node1.hash == node2.hash: # Subtrees are identical, no need to descend return if node1.leaf_data is not None and node2.leaf_data is not None: # Both are leaves - find differing keys all_keys = set(node1.leaf_data.keys()) | set(node2.leaf_data.keys()) for key in all_keys: if node1.leaf_data.get(key) != node2.leaf_data.get(key): differences.add(key) else: # Internal nodes - recurse if node1.left and node2.left: self._compare_nodes(node1.left, node2.left, differences) if node1.right and node2.right: self._compare_nodes(node1.right, node2.right, differences) class ReadRepairCoordinator: """ Implements read repair for eventual consistency. Used when reads contact multiple replicas. """ def __init__(self, replicas: List['Replica']): self.replicas = replicas def read_with_repair(self, key: str, read_quorum: int = 2) -> any: """ Read from multiple replicas and repair inconsistencies. """ responses = [] # Gather responses from replicas for replica in self.replicas[:read_quorum]: try: value, version, timestamp = replica.read_full(key) responses.append({ 'replica': replica, 'value': value, 'version': version, 'timestamp': timestamp }) except Exception as e: continue # Skip failed replicas if not responses: raise Exception("No replicas available") # Find the newest value newest = max(responses, key=lambda r: (r['version'], r['timestamp'])) # Repair stale replicas (async, don't block the read) self._async_repair(key, newest, responses) return newest['value'] def _async_repair(self, key: str, newest: dict, all_responses: List[dict]): """Asynchronously repair stale replicas.""" for response in all_responses: if response['version'] < newest['version']: # This replica is stale - repair it response['replica'].repair( key, newest['value'], newest['version'], newest['timestamp'] )Cassandra uses all three mechanisms: 1) Gossip for cluster membership and schema propagation, 2) Read repair during queries (configurable per-table), 3) Manual 'nodetool repair' using Merkle trees for full anti-entropy. Production clusters typically run repair weekly to catch any drift.
The consistency window is the time period during which different replicas may return different values. Understanding, measuring, and bounding this window is critical for application design.
Factors Affecting Consistency Windows:
Measuring Consistency Windows:
Probabilistically Bounded Staleness (PBS) is a framework for reasoning about eventual consistency. It answers: "What is the probability that a read returns data older than T seconds?"
Key metrics to monitor:
| System | Normal Operation | High Load | Network Partition | Recovery Time |
|---|---|---|---|---|
| Cassandra (LOCAL_ONE) | 10-50ms | 100-500ms | Unbounded (available) | Seconds after heal |
| DynamoDB (eventual) | < 1 second | 1-5 seconds | Remains available | < 1 second |
| MongoDB (secondary reads) | 10-100ms | Seconds | Can be minutes | Seconds |
| Redis Cluster (async) | < 5ms | < 50ms | Data loss possible | Manual intervention |
| CouchDB (multi-master) | 100ms-seconds | Seconds | Unbounded | Manual conflict resolution |
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
# Consistency Window Monitoring Systemimport timeimport statisticsfrom dataclasses import dataclassfrom typing import List, Dictfrom collections import deque @dataclassclass ConsistencyProbe: """Probe measuring consistency window between replicas.""" write_time: float key: str value: str replica_observations: Dict[str, float] # replica_id -> first_observed_time class ConsistencyMonitor: """ Monitors eventual consistency windows in production. Deploy as a background service to track SLOs. """ def __init__(self, replicas: List['Replica'], probe_interval: float = 1.0, window_size: int = 1000): self.replicas = replicas self.probe_interval = probe_interval self.probes = deque(maxlen=window_size) self.completed_probes: List[ConsistencyProbe] = [] def run_probe(self) -> ConsistencyProbe: """ Execute a consistency probe: 1. Write unique value to one replica 2. Poll all replicas until they see it 3. Record observation times """ key = f"probe_{time.time()}" value = f"value_{time.time()}" # Write to first replica write_start = time.time() self.replicas[0].write(key, value) write_complete = time.time() probe = ConsistencyProbe( write_time=write_complete, key=key, value=value, replica_observations={} ) # First replica observed immediately probe.replica_observations[self.replicas[0].id] = write_complete # Poll other replicas pending_replicas = set(r.id for r in self.replicas[1:]) timeout = 30.0 # Max time to wait while pending_replicas and (time.time() - write_complete) < timeout: for replica in self.replicas[1:]: if replica.id in pending_replicas: observed_value = replica.read(key) if observed_value == value: probe.replica_observations[replica.id] = time.time() pending_replicas.remove(replica.id) if pending_replicas: time.sleep(0.01) # 10ms between polls # Record any replicas that timed out for replica_id in pending_replicas: probe.replica_observations[replica_id] = float('inf') self.completed_probes.append(probe) return probe def get_consistency_percentiles(self) -> Dict[str, float]: """Calculate consistency window percentiles.""" windows = [] for probe in self.completed_probes: for replica_id, obs_time in probe.replica_observations.items(): if obs_time != float('inf'): window = obs_time - probe.write_time windows.append(window) if not windows: return {} windows.sort() n = len(windows) return { 'p50': windows[int(n * 0.50)] * 1000, # ms 'p90': windows[int(n * 0.90)] * 1000, 'p99': windows[int(n * 0.99)] * 1000, 'p999': windows[int(n * 0.999)] * 1000 if n >= 1000 else None, 'max': windows[-1] * 1000, 'mean': statistics.mean(windows) * 1000, } def check_slo(self, target_ms: float, percentile: float) -> bool: """ Check if consistency SLO is met. Example: 99% of reads consistent within 500ms """ windows = [] for probe in self.completed_probes: for obs_time in probe.replica_observations.values(): if obs_time != float('inf'): windows.append((obs_time - probe.write_time) * 1000) if not windows: return True windows.sort() target_index = int(len(windows) * percentile) actual = windows[target_index] return actual <= target_msWhen multiple replicas accept concurrent writes, conflicts arise. Eventually consistent systems must detect and resolve these conflicts. Several strategies exist, each with trade-offs:
1. Last-Write-Wins (LWW)
The write with the highest timestamp wins. Simple but can lose data—if two users edit simultaneously, one edit silently disappears.
2. Vector Clocks / Version Vectors
Track causality to detect concurrent updates. When detected, either:
3. Conflict-Free Replicated Data Types (CRDTs)
Data structures mathematically guaranteed to converge without conflicts. Counters, sets, registers, and maps can all be implemented as CRDTs.
4. Operational Transformation (OT)
Transform operations to account for concurrent edits. Used in collaborative editing (Google Docs).
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
# Conflict Resolution Strategies Implementationfrom dataclasses import dataclassfrom typing import Dict, Set, List, Any, Optionalimport time # ════════════════════════════════════════════════════════════════════# Last-Write-Wins (LWW)# ════════════════════════════════════════════════════════════════════ @dataclassclass LWWValue: """Value with timestamp for LWW resolution.""" value: Any timestamp: float writer_id: str # Tiebreaker when timestamps equal class LWWRegister: """ Last-Write-Wins Register. Simple but can lose concurrent writes. """ def __init__(self): self.current: Optional[LWWValue] = None def write(self, value: Any, writer_id: str) -> LWWValue: entry = LWWValue(value, time.time(), writer_id) self._merge(entry) return entry def merge_remote(self, remote: LWWValue): """Merge value received from another replica.""" self._merge(remote) def _merge(self, incoming: LWWValue): if self.current is None: self.current = incoming elif incoming.timestamp > self.current.timestamp: self.current = incoming elif incoming.timestamp == self.current.timestamp: # Tiebreaker: lexicographic comparison of writer_id if incoming.writer_id > self.current.writer_id: self.current = incoming def read(self) -> Any: return self.current.value if self.current else None # ════════════════════════════════════════════════════════════════════# G-Counter CRDT (Grow-only Counter)# ════════════════════════════════════════════════════════════════════ class GCounter: """ Grow-only Counter CRDT. Each node maintains its own count; total is sum of all. Guaranteed to converge without conflicts. """ def __init__(self, node_id: str): self.node_id = node_id self.counts: Dict[str, int] = {node_id: 0} def increment(self, amount: int = 1): """Increment local counter.""" self.counts[self.node_id] = self.counts.get(self.node_id, 0) + amount def value(self) -> int: """Get total count across all nodes.""" return sum(self.counts.values()) def merge(self, other: 'GCounter'): """Merge with another counter (point-wise maximum).""" all_nodes = set(self.counts.keys()) | set(other.counts.keys()) for node in all_nodes: self.counts[node] = max( self.counts.get(node, 0), other.counts.get(node, 0) ) def get_state(self) -> Dict[str, int]: """Get state for transmission to other nodes.""" return self.counts.copy() # ════════════════════════════════════════════════════════════════════# PN-Counter CRDT (Positive-Negative Counter)# ════════════════════════════════════════════════════════════════════ class PNCounter: """ Counter that supports both increment and decrement. Uses two G-Counters internally. """ def __init__(self, node_id: str): self.positive = GCounter(node_id) self.negative = GCounter(node_id) def increment(self, amount: int = 1): self.positive.increment(amount) def decrement(self, amount: int = 1): self.negative.increment(amount) def value(self) -> int: return self.positive.value() - self.negative.value() def merge(self, other: 'PNCounter'): self.positive.merge(other.positive) self.negative.merge(other.negative) # ════════════════════════════════════════════════════════════════════# LWW-Element-Set CRDT# ════════════════════════════════════════════════════════════════════ class LWWSet: """ Set with LWW semantics for add/remove conflicts. Element is in set if add_timestamp > remove_timestamp. """ def __init__(self): self.adds: Dict[Any, float] = {} # element -> timestamp self.removes: Dict[Any, float] = {} # element -> timestamp def add(self, element: Any): self.adds[element] = time.time() def remove(self, element: Any): self.removes[element] = time.time() def contains(self, element: Any) -> bool: if element not in self.adds: return False add_time = self.adds[element] remove_time = self.removes.get(element, 0) return add_time > remove_time def elements(self) -> Set[Any]: return {e for e in self.adds if self.contains(e)} def merge(self, other: 'LWWSet'): # Merge adds (keep latest timestamp) for elem, ts in other.adds.items(): if elem not in self.adds or ts > self.adds[elem]: self.adds[elem] = ts # Merge removes (keep latest timestamp) for elem, ts in other.removes.items(): if elem not in self.removes or ts > self.removes[elem]: self.removes[elem] = tsLast-Write-Wins silently discards concurrent writes. If User A writes 'Alice' and User B writes 'Bob' at nearly the same time, one write vanishes. Always consider: is silent data loss acceptable for this use case? For shopping carts: probably not. For view counters: probably yes.
Building applications on eventually consistent storage requires deliberate design patterns. The goal is to make applications tolerant of temporary inconsistency while leveraging the availability benefits.
You now understand eventual consistency at a deep level—from the mathematical foundations of gossip convergence to practical implementation of CRDTs and conflict resolution. Next, we explore Causal Consistency, which preserves cause-and-effect ordering while maintaining high availability.