Loading content...
When you send a message to Kafka that eventually reaches billions of devices, or run a Hadoop job that processes petabytes of data, there's a silent guardian ensuring the machinery doesn't fly apart: Apache Zookeeper and its ZAB protocol. These systems process staggering volumes of data, but their coordination layer—the part that keeps everything consistent—relies on the atomic broadcast guarantees that ZAB provides.
Understanding how Zookeeper is used in these production systems isn't just academic—it's essential for operating, debugging, and designing large-scale distributed systems. The patterns that Kafka and Hadoop use with Zookeeper have become templates for distributed coordination across the industry.
By the end of this page, you will understand how Kafka uses Zookeeper for controller election, topic metadata, and broker management; how the Hadoop ecosystem leverages Zookeeper for NameNode HA, HBase coordination, and service discovery; and the emerging Kafka Raft (KRaft) architecture that's replacing Zookeeper in Kafka.
Before diving into Kafka and Hadoop specifically, let's establish what Zookeeper provides that makes it so valuable for distributed coordination.
Zookeeper's Core Abstractions:
1. Hierarchical Namespace (ZNode Tree) Zookeeper organizes data in a tree structure similar to a filesystem. Each node (znode) can hold data and have children. This enables natural organization of distributed state.
2. Ephemeral Nodes Znodes can be ephemeral—they automatically disappear when the client session that created them ends. This is powerful for:
3. Sequential Nodes Znodes can be created with monotonically increasing sequence numbers appended. Combined with ephemeral nodes, this enables:
4. Watches Clients can set watches on znodes. When the znode changes, the client receives a notification. This enables reactive programming without polling.
| Primitive | Zookeeper Feature | Common Use Cases |
|---|---|---|
| Configuration Management | Persistent znodes + watches | Store configs, notify on change |
| Service Discovery | Ephemeral znodes + parent watch | Register instances, detect failures |
| Leader Election | Ephemeral sequential znodes | Elect controller, handle failover |
| Distributed Locks | Ephemeral sequential znodes | Mutual exclusion, read/write locks |
| Barriers | Znode existence + watches | Synchronize distributed processes |
| Group Membership | Ephemeral znodes under parent | Track cluster members |
Why ZAB Matters for These Primitives:
All of Zookeeper's coordination primitives depend on ZAB's atomic broadcast guarantees:
Total ordering ensures that all observers see state changes in the same order. If node A sees configuration change before node B registers, all nodes see events in that order.
Prefix consistency means you can rely on seeing complete history. If you see znode X created, you've seen all earlier state.
FIFO client ordering ensures that your operations are applied sequentially. If you create a lock then write data, the write won't happen before the lock.
Durability via quorum acknowledgment means committed changes survive failures.
Without these guarantees, Zookeeper's coordination primitives would be unreliable—race conditions and inconsistencies would undermine their purpose.
Apache Kafka has historically been deeply integrated with Zookeeper, relying on it for critical coordination functions. Understanding this architecture is essential both for operating existing Kafka deployments and appreciating the motivations behind KRaft.
What Kafka Stores in Zookeeper:
1. Broker Registration
Each Kafka broker creates an ephemeral znode under /brokers/ids/[broker-id] when it starts. This znode contains the broker's connection information.
2. Topic and Partition Metadata
/brokers/topics/[topic-name]/partitions/[partition-id] — Partition assignments/config/topics/[topic-name]3. Controller Election
The Kafka controller—the broker responsible for partition leadership elections and rebalancing—is elected using Zookeeper. First broker to successfully create /controller (ephemeral) becomes the controller.
4. Consumer Group Offsets (Legacy)
Older Kafka versions stored consumer group offsets in Zookeeper. Modern Kafka uses internal topics (__consumer_offsets) instead.
The Kafka Controller's Role:
The Kafka controller is the brain of the cluster, and its election via Zookeeper is critical:
Election Process:
/controller (ephemeral, non-sequential)/controller winsController Responsibilities:
/brokers/ids/*)The Controller Epoch:
Kafka maintains a controller epoch (like ZAB's epoch) in /controller_epoch. This prevents stale controllers from making changes after being fenced. Every controller command includes the controller epoch; brokers reject commands from old epochs.
In traditional Kafka, if Zookeeper becomes unavailable, new broker registrations fail, partition leadership cannot change (existing replicas continue serving), and topic operations halt. Existing message production and consumption continue, but the cluster cannot adapt to failures.
Kafka's partition leadership model is where Zookeeper's coordination capabilities shine most visibly. Each partition has exactly one leader at any time, and all reads/writes go through the leader.
Partition State in Zookeeper:
For each partition, Zookeeper stores:
/brokers/topics/[topic]/partitions/[partition]/state
{
"controller_epoch": 123,
"leader": 1,
"version": 1,
"leader_epoch": 45,
"isr": [1, 2, 3]
}
ISR (In-Sync Replica) Management:
The ISR is crucial for Kafka's durability guarantees:
replica.lag.time.max.ms), it's removed from ISR12345678910111213141516171819202122232425262728293031323334353637383940414243444546
// Kafka Controller: Partition Leader Election// Triggered when current leader fails (ephemeral znode disappears) FUNCTION electPartitionLeader(topic, partition): // 1. Read current state from Zookeeper currentState = zk.getData("/brokers/topics/{topic}/partitions/{partition}/state") currentLeader = currentState.leader currentISR = currentState.isr currentLeaderEpoch = currentState.leader_epoch // 2. Verify leader is actually failed IF brokerIsAlive(currentLeader): RETURN // Leader is fine, no election needed // 3. Select new leader from ISR newLeader = null FOR broker IN currentISR: IF brokerIsAlive(broker): newLeader = broker BREAK // First alive ISR member becomes leader // 4. Handle unclean leader election (if all ISR failed) IF newLeader == null AND unclean_leader_election_enabled: // Pick any alive replica - may lose data! newLeader = selectAnyAliveReplica(topic, partition) LOG.WARN("Unclean leader election for {topic}/{partition}") IF newLeader == null: RETURN ERROR("No eligible leader found") // 5. Write new state to Zookeeper (atomic via ZAB) newState = { controller_epoch: myControllerEpoch, leader: newLeader, leader_epoch: currentLeaderEpoch + 1, isr: [newLeader], // ISR resets to just the new leader version: currentState.version + 1 } // Conditional write - only succeeds if version matches zk.setData("/brokers/topics/{topic}/partitions/{partition}/state", newState, expectedVersion = currentState.version) // 6. Notify all brokers of leadership change broadcastLeaderChangeToAllBrokers(topic, partition, newLeader)Leader Epoch and Fencing:
The leader epoch is critical for preventing split-brain scenarios:
Relationship to ZAB:
Kafka's leader_epoch is conceptually similar to ZAB's epoch:
The difference is scope: ZAB's epoch is for the Zookeeper cluster; Kafka's leader_epoch is per-partition and stored in Zookeeper.
When all ISR replicas fail, Kafka can elect an out-of-sync replica as leader (if unclean.leader.election.enable=true). This trades availability for consistency—you get a leader, but may lose messages. Most production deployments disable this, preferring unavailability to data loss.
Despite Zookeeper's proven reliability, Apache Kafka has been migrating away from it toward KRaft (Kafka Raft)—a built-in consensus protocol that eliminates the Zookeeper dependency. Understanding why reveals both Zookeeper's limitations and the maturation of the Kafka project.
Why Move Away from Zookeeper?
1. Operational Complexity
2. Scalability Limits
3. Latency for Metadata Operations
4. Single Point of Failure Perception
| Aspect | Zookeeper Mode | KRaft Mode |
|---|---|---|
| Consensus protocol | ZAB (via Zookeeper cluster) | Raft (built into Kafka controllers) |
| Metadata storage | Zookeeper znodes | Kafka internal log topic (__cluster_metadata) |
| Controller count | 1 active (elected via ZK) | 1+ active (Raft quorum) |
| Components to operate | Kafka brokers + Zookeeper cluster | Kafka brokers (some in controller role) |
| Partition scalability | ~100K partitions | Millions of partitions targeted |
| Failover time | Seconds (Zookeeper election + discovery) | Milliseconds (Raft election) |
| Monitoring | Kafka + Zookeeper metrics | Kafka metrics only |
KRaft Architecture:
In KRaft mode, a subset of Kafka brokers act as controllers, forming a Raft quorum:
__cluster_metadata internal topic stores all cluster metadataBenefits Realized:
Migration Path:
Kafka supports migration from Zookeeper to KRaft mode:
KRaft uses Raft rather than ZAB because Kafka's team wanted tight integration without the overhead of the full Zookeeper coordination service. Raft's well-specified implementation guidance and familiarity made it a natural choice. The core consensus guarantees are equivalent—the practical differences are in the implementation details.
Apache Hadoop and its ecosystem have deep dependencies on Zookeeper for coordination. Unlike Kafka's evolution toward independence, most Hadoop components continue to rely on Zookeeper as their coordination service.
HDFS High Availability:
The Hadoop Distributed File System (HDFS) uses Zookeeper for NameNode high availability:
Problem: The NameNode is HDFS's brain, storing all filesystem metadata. Originally, it was a single point of failure.
Solution: Active/Standby NameNode pair with Zookeeper coordination:
Apache HBase Coordination:
HBase, the distributed wide-column store built on Hadoop, is heavily dependent on Zookeeper:
1. RegionServer Registration
/hbase/rs/[server-name]2. HMaster Leader Election
3. Table and Region Metadata
/hbase/table/[table-name] stores table state (enabled, disabled)4. Distributed Locks
Critical Path: If Zookeeper is unavailable, HBase cannot:
Existing read/write operations to healthy RegionServers continue, but failure handling stops.
HBase creates many ephemeral znodes (one per RegionServer, plus region metadata). Large HBase clusters require careful Zookeeper sizing—more memory, faster disks, and appropriate session timeouts. Session timeout tuning is critical: too short causes false failure detection; too long delays real failure detection.
Both Kafka and Hadoop implement common coordination patterns on top of Zookeeper. These patterns have become templates used throughout the industry.
Pattern 1: Leader Election
The standard pattern used by Kafka controllers, HBase masters, and many other systems:
/[service]/leader ← ephemeral znode, first to create is leader
Pattern 2: Fair Leader Election (Queue-Based)
For fairer ordering or when you need to know your position:
/[service]/candidates/candidate-[sequence]
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
// Pattern 1: Simple Leader Election (Kafka-style)public class SimpleLeaderElection { private ZooKeeper zk; private String leaderPath = "/kafka/controller"; public boolean tryBecomeLeader(String myId) { try { // Attempt to create ephemeral node zk.create(leaderPath, myId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); return true; // I am the leader } catch (KeeperException.NodeExistsException e) { // Someone else is leader, watch and wait zk.exists(leaderPath, this::onLeaderChange); return false; } } private void onLeaderChange(WatchedEvent event) { if (event.getType() == EventType.NodeDeleted) { tryBecomeLeader(myId); // Leader died, try to take over } }} // Pattern 2: Fair Queue-Based Election (HBase-style)public class FairLeaderElection { private String candidatesPath = "/hbase/master/candidates"; private String myZnode; public void joinElection(String myId) throws Exception { // Create ephemeral sequential node myZnode = zk.create( candidatesPath + "/candidate-", myId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); checkLeadership(); } private void checkLeadership() throws Exception { List<String> candidates = zk.getChildren(candidatesPath, false); Collections.sort(candidates); // Sort by sequence number int myIndex = candidates.indexOf(myZnode.substring( candidatesPath.length() + 1)); if (myIndex == 0) { becomeLeader(); // I have lowest sequence number } else { // Watch the node before me String nodeToWatch = candidatesPath + "/" + candidates.get(myIndex - 1); zk.exists(nodeToWatch, event -> { if (event.getType() == EventType.NodeDeleted) { checkLeadership(); } }); } }}Pattern 3: Configuration Management
Store configuration in znodes, get notified on changes:
/[service]/config/[setting-name]Pattern 4: Service Registry (Service Discovery)
/services/[service-name]/instances/[instance-id] ← ephemeral
/services/[service-name]/instancesPattern 5: Distributed Barrier
Synchronize N processes:
/barriers/[barrier-id]/[process-id]Operating Zookeeper for production Kafka and Hadoop deployments requires careful attention to several factors that directly impact ZAB's performance.
Cluster Sizing:
Why odd numbers? Zookeeper quorums need a majority. With N nodes:
| Nodes | Quorum | Tolerated Failures |
|---|---|---|
| 3 | 2 | 1 |
| 4 | 3 | 1 |
| 5 | 3 | 2 |
| 6 | 4 | 2 |
| 7 | 4 | 3 |
4 nodes tolerates only 1 failure (same as 3), so avoid even numbers. Most deployments use 3 or 5 nodes.
Disk Considerations:
ZAB's performance is heavily disk-bound:
| Parameter | Default | Guidance | Why It Matters |
|---|---|---|---|
| tickTime | 2000ms | Keep default for most cases | Base time unit for all intervals |
| initLimit | 10 (× tickTime) | Increase for large snapshots | Time for followers to sync at startup |
| syncLimit | 5 (× tickTime) | Tune for network latency | Time for followers to lag before disconnect |
| maxSessionTimeout | 20 × tickTime | Match client needs | Longest session timeout clients can request |
| autopurge.purgeInterval | 0 (disabled) | Enable for production | Automatic log cleanup interval |
| dataDir | /var/zookeeper | Fast SSD, not shared | Main data directory (snapshots) |
| dataLogDir | (same as dataDir) | Separate fast SSD | Transaction log directory (CRITICAL) |
Monitoring ZAB Health:
Key Metrics to Watch:
JVM Considerations:
Network Placement:
Zookeeper exposes 'four letter word' commands for monitoring: 'stat', 'mntr', 'ruok', 'envi', etc. For production monitoring, prefer 'mntr' (metrics in key=value format) over 'stat'. Note: Admin Server (HTTP) is the modern alternative, enabled by default in newer versions.
When Zookeeper misbehaves, Kafka and Hadoop coordination breaks. Understanding common failure modes helps with rapid diagnosis.
Issue 1: Leader Election Storms
Symptoms:
Causes:
Remediation:
tickTime, syncLimit settingsIssue 2: Session Expiration Floods
Symptoms:
Causes:
Issue 3: Slow Cluster Recovery After Full Outage
Symptoms:
Causes:
initLimit too short for data volumeRemediation:
initLimit during recovery if neededIssue 4: Kafka Controller Thrashing
Symptoms:
__controller topic)Causes:
Remediation:
zookeeper.session.timeout.msWe've explored how ZAB powers Zookeeper's role in Kafka and Hadoop—critical production systems processing enormous data volumes. Let's consolidate the key insights:
Module Complete:
You've now completed the comprehensive study of ZAB (Zookeeper Atomic Broadcast). From the core protocol mechanics to leader-based ordering, comparisons with Raft and Paxos, and real-world usage in Kafka and Hadoop, you understand one of the most important consensus protocols in production distributed systems.
This knowledge enables you to:
Congratulations! You've mastered ZAB (Zookeeper Atomic Broadcast). You understand its core mechanics, how it provides leader-based ordering, how it compares to Raft and Paxos, and how it powers critical distributed systems like Kafka and Hadoop. This knowledge is essential for any engineer working with distributed systems at scale.