Special Release Five Kafka Community's Key Functionality Removing Zoo Keeper Dependency

Special Release Five Kafka Community’s Key Functionality- Removing ZooKeeper Dependency #

Hello, I’m Hu Xi. Today, let’s talk about a major feature in the Kafka community: removing the ZooKeeper dependency.

Since its inception, Kafka has been tightly coupled with ZooKeeper. It can be said that without ZooKeeper, Kafka wouldn’t have achieved its success today.

However, as Kafka has continued to improve and evolve, the community gradually discovered some issues that arose from the combination of ZooKeeper and Kafka. For example, ZooKeeper is not suitable for frequent write operations, but in the Kafka 0.8 era, consumers used ZooKeeper to store their offset information. Therefore, by removing ZooKeeper and using Kafka internal topics to store offsets, this fundamentally avoids the drawbacks of ZooKeeper.

Another benefit of getting rid of ZooKeeper dependency is that it allows Kafka to become a standalone framework. In the future, when using Kafka, there will no longer be a need to maintain an additional ZooKeeper cluster. Clearly, the cost of installing, operating, and optimizing a distributed cluster is high, so removing such a dependency is certainly a good thing.

Now, you must be curious about how the community plans to remove ZooKeeper. Don’t worry, I’ll guide you through the community’s plan step by step.

Evolution of Kafka Clients #

First, let’s take a look at two diagrams. These two diagrams summarize the functionality changes in versions 0.8.x and 0.11.x (whether the changes actually started from version 0.11 is not important) and subsequent versions.

In the Kafka 0.8 era, Kafka had three clients:

  • Producer, responsible for writing messages to Kafka;
  • Consumer, responsible for reading messages from Kafka;
  • Admin Tool, used for various operational tasks, such as creating or deleting topics.

Among them, the Consumer’s offset data was stored in ZooKeeper, so both offset commit and offset retrieval operations of the Consumer required access to ZooKeeper. In addition, the Admin Tool also accessed ZooKeeper to execute operational tasks, such as creating a temporary node on the corresponding ZooKeeper znode and triggering the corresponding processing logic through a predefined watch.

Later, as Kafka evolved, the community introduced the concept of offset topic (__consumer_offsets) and defined new RPC protocols such as OffsetFetch and OffsetCommit. This way, offset commit and offset retrieval operations of the Consumer directly interacted with the offset topic, avoiding the need to access ZooKeeper.

In addition, the community introduced a new operational tool called AdminClient and corresponding RPC protocols such as CreateTopics, DeleteTopics, and AlterConfigs, replacing the original Admin Tool. As a result, operational tasks such as creating and deleting topics were completely moved to Kafka, as shown in the second diagram above.

Now, the three Kafka clients no longer need to interact with ZooKeeper. It can be said that the work of removing ZooKeeper is mostly complete. However, there is still a part of the work that needs to be completed with the help of ZooKeeper, which is Consumer rebalancing.

In the 0.8 era, the management of Consumer Groups was handled by ZooKeeper, including managing group members and allocating subscribed partitions. This design has also been modified in the new version of the Consumer. All group management operations are now handled by the Coordinator component newly introduced in Kafka Broker. To complete these tasks, the Broker added many RPC protocols, such as JoinGroup, SyncGroup, Heartbeat, and LeaveGroup.

At this point, except for AdminClient, the Java clients of Kafka no longer depend on ZooKeeper, as all other components have eliminated their dependence on ZooKeeper.

Later, the community introduced the Kafka security layer, which implemented user authentication and authorization. This additional security layer does not require access to ZooKeeper, so the Clients that previously depended on ZooKeeper cannot “benefit” from this security layer. Once enabled, new version Clients need to first connect to this layer and can only access Brokers after being approved, as shown in the following diagram:

The benefit of doing this is that it “standardizes the pattern of Clients accessing Brokers”, that is, by defining a cross-language RPC protocol stack, it enables Clients to interact with Kafka Brokers. With this approach, developers of different languages only need to develop their own RPC protocol according to this specification to interact with Kafka Broker. If more functionality needs to be implemented in the future, the community only needs to define new RPC protocols. At the same time, the newly introduced security layer is responsible for performing security checks on this set of RPC protocols, unifying the access pattern. In addition, these protocols are versioned, allowing independent evolution while considering compatibility aspects.

Broker Interactions #

After discussing the Clients side, let’s talk about the current state of the Broker side. Currently, it can be said that Kafka Broker is heavily dependent on ZooKeeper, mainly in the following aspects:

  • Broker registration and management;
  • ACL security layer configuration management;
  • Dynamic parameter management;
  • Replica ISR management;
  • Controller election.

Let’s use a diagram to illustrate.

The diagram shows 4 Broker nodes and 1 ZooKeeper. The Broker in the top left corner acts as the Controller. Currently, all Brokers must maintain a session with ZooKeeper after they are started. Kafka relies on this session for Broker registration. Furthermore, all configuration information, replica information, and topic information in the Kafka cluster are stored in ZooKeeper. Finally, the Controller maintains a TCP long connection with each Broker in the cluster to send RPC requests. The current Controller RPC types mainly include:

  1. LeaderAndIsr: Mainly used to broadcast the changes of topic partition leaders and ISR (InSyncReplicas) in the cluster, such as which Broker should be the Leader or Follower of a specific partition;
  2. StopReplica: Broadcasts commands to stop replicas in the cluster;
  3. UpdateMetadata: Broadcasts commands to update metadata information in the cluster.

The diagram also introduces a new AlterISR RPC, which is part of the KIP-497 to be implemented. Currently, the ISR information for each topic in Kafka is stored in ZooKeeper. If ZooKeeper is to be abandoned in the future, this information must be moved out of ZooKeeper and handled by the Controller. At the same time, support for managing ISR at the program level is also required. Therefore, the community plans to add the AlterISR protocol on top of KIP-497. By the way, currently the Controller election is also done through ZooKeeper.

Therefore, the future evolution of the Broker side may be similar to that of the Clients side: first, eliminate all interactions between Brokers and ZooKeeper, and only let the Controller interact with ZooKeeper, while all other Brokers only interact with the Controller, as shown in the following diagram:

It seems that the community has already become familiar with this evolutionary path, but in reality, there are still some lingering issues that need to be addressed.

Broker Liveness #

The first issue is the Broker liveness problem, that is, how Kafka determines whether a Broker is alive or not. In the current design, the liveness monitoring of the Broker depends entirely on the session with ZooKeeper. Once the session times out or is disconnected, the Controller automatically triggers the removal of the Broker from ZooKeeper and performs cleanup operations on the partitions it was responsible for. If ZooKeeper is removed, the mechanism to determine the liveness of Brokers becomes a problem.

Network Partition #

How to prevent network partitions is also a topic that needs discussion. Currently, there are 4 possible types of network partition:

  1. A single Broker is completely isolated from the cluster;
  2. Broker cannot communicate with each other;
  3. Broker cannot communicate with ZooKeeper;
  4. Broker cannot communicate with Controller.

The following four diagrams show these four scenarios:

-

Let’s discuss each of them separately.

Scenario 1: A single Broker is isolated from other Brokers in the cluster. This is not a serious problem. The current design can handle this situation well. Once a Broker is isolated, the Controller removes it from the cluster. Although the availability is reduced, the consistency of the entire cluster is still guaranteed.

Scenario 2: Brokers cannot communicate with each other, resulting in the inability to execute the message replication mechanism and Kafka needs to shrink ISR. There is still a decrease in availability, but the consistency is not compromised.

Scenario 3: Broker cannot communicate with ZooKeeper. The Broker can still operate normally, but it is unable to communicate with ZooKeeper. At this point, we say that the Broker is in a zombie state. In the Jira community, there has been a consistent bug introduced by the Zoobie state. The community has been addressing this issue for years, mainly through mechanisms like fencing, such as Leader Epoch.

Scenario 4: Broker cannot communicate with the Controller. In this situation, all metadata update channels are blocked. Even if this Broker is still healthy, the metadata information it maintains may be very outdated. In such cases, clients connecting to this Broker may encounter various strange issues. I have answered similar questions before, and you can click the link to take a look.

This scenario is more complicated, so let me explain it further. In fact, the community currently does not have a good solution for this situation, mainly because the liveliness of the Broker is completely handled by ZooKeeper. Once there is no problem with the interaction between Broker and ZooKeeper, it is impossible to completely avoid liveliness problems caused by other reasons.

The fourth category, Network Partition, introduces a classic scenario: inconsistent metadata. Currently, each Broker caches a copy of the cluster’s metadata, and this data is updated asynchronously. When the fourth category partition occurs, the metadata information cached on the Broker side will inevitably be out of sync with the Controller, resulting in various problems.

Next, let me briefly introduce the process of metadata update. The main process is that the Controller synchronously pulls the full metadata information of the cluster from ZooKeeper when it starts, and then asynchronously synchronizes it to other Brokers. There is often a time difference between the synchronization between other Brokers and the Controller, which means that the metadata accessed by clients may not be the latest. Personally, I believe that many flaky test failures in the community are caused by this reason.

In fact, in actual usage scenarios, there are many cases where the metadata on the Broker side is never synchronized with the Controller side. Usually, if we don’t restart the Broker, the metadata on this Broker will continue to be “incorrect” forever.

Fortunately, the community also provides a final “trick”: log in to the ZooKeeper SHELL, manually execute rmr /controller to force Controller re-election, then reload metadata, and refresh all Brokers. However, I doubt whether anyone in the actual production environment would really do this, as it has a significant cost and, most importantly, it may still have two problems:

  1. How can we ensure that the data between Controller and Broker is consistent?
  2. The process of loading metadata is usually very slow.

Let me elaborate on the second point, which is the performance issue of loading metadata.

Overall, loading metadata is an O(N) time complexity process, where N is the total number of partitions in your cluster. Considering that the Controller needs to push it to other Brokers after loading from ZooKeeper, the total time complexity of performing this task is O(N * M), where M is the number of Brokers in the cluster. It can be seen that when both M and N are large, broadcasting metadata in the cluster is not a fast process.

Considering all the problems mentioned earlier, how does the community solve them after Kafka abandons ZooKeeper? The overall idea is Metadata as an Event Log + Controller quorum. Let’s first talk about metadata as an event log.

Metadata as an Event Log #

If you have read Jay Kreps’ “I ❤️ Logs”, you should be aware that the entire architecture of Kafka is built on logs. Each partition of a topic is essentially a commit log, but the storage of metadata information is not in log form. In the current architectural design, you can basically think of the data structure of metadata as key-value pairs (KV form). This time, the community has adopted the same data storage method as messages, that is, storing metadata as logs, as shown in the table below:

Here are four advantages of using Kafka’s own log mechanism to store metadata:

  • High Availability: Each metadata change is saved as a message in the log, and this log can be considered as a regular Kafka topic that is replicated to multiple brokers.
  • Sequentiality: One advantage of logs is that they have a clear chronological order, which means that the time of each event can be sorted. With appropriate processing logic, we can ensure that the handling of metadata changes is processed in the order they occur, without any disorder.
  • Incremental Synchronization: After using the log mechanism, the synchronization of metadata between brokers can use synchronous incremental data (delta) instead of synchronizing the full data every time. Currently, metadata synchronization between Kafka brokers is done in a full state synchronization manner. As mentioned earlier, this can be quite costly when the number of cluster partitions is large. If we can only synchronize the incremental state, it will greatly reduce the synchronization cost.
  • Monitorability: Logs provide rich monitoring metrics. Based on these metrics, we can easily obtain the progress of metadata synchronization.

With the log mechanism, other brokers are like regular consumers, pulling metadata change messages or events from the controller. Since each broker is a consumer, they maintain their own consumer offsets, as shown in the diagram below:

In this design, the broker where the controller resides must take on the management of all metadata topics, including creating topics, managing the leader of topic partitions, and creating corresponding events for each metadata change. Since the community chose a processing method similar to __consumer_offsets, a natural question is whether the management of this metadata topic can reuse Kafka’s existing replication mechanism. The answer is: not feasible. The reason is that the existing replication mechanism relies on the controller, so Kafka cannot rely on the existing replication mechanism to implement the controller. As our common saying goes, this is a typical “chicken and egg” problem, which belongs to a typical circular dependency.

To achieve this, Kafka needs a leader election protocol, and this protocol or algorithm does not depend on the controller, that is, it is a self-managing quorum cluster (I apologize, but I have not found an appropriate translation for quorum in the distributed field, especially in the field of distributed consensus algorithms, so I used the original word quorum directly). In the end, the community decided to use Raft to implement this group of quorums. This is the second solution we mentioned above: Controller quorum.

Controller Quorum #

Unlike relying on a Controller to select a Leader, Raft allows nodes to autonomously choose a Leader and eventually reach consensus among all nodes. This is a good feature for selecting a Controller. In fact, Kafka’s existing replication mechanism is already very similar to Raft. You can take a look at the table below and compare them.

Comparison

At a glance, you will find that Kafka’s replication mechanism is similar to Raft, for example, the offset in Kafka is equivalent to the index in Raft, and epoch corresponds to term. However, Raft uses a quorum mechanism to ensure message commits and Leader elections, while Kafka has an In-Sync Replica (ISR) mechanism for achieving these two points. Overall, the community believes that with only a few minor modifications to the replication mechanism, it should be easy to switch to a Raft-based algorithm.

The following figure illustrates a more intuitive Controller quorum:

Controller Quorum

The entire Controller quorum functions like a small cluster. Similar to ZooKeeper, this quorum typically consists of 3 or 5 machines, and not every Broker in Kafka needs to become a node in this quorum automatically.

There is one Leader in this quorum responsible for handling read and write requests from the Clients. This Leader is known as the active Controller in Kafka. According to ZooKeeper’s Zab protocol, the Leader handles all write requests, while the Followers can handle read requests. When a write request is sent to a Follower, the Follower forwards the request to the Leader for processing.

However, I guess Kafka may not implement it this way, as it is likely that only the Leader (i.e., the active Controller) handles all read and write requests, and the Clients (i.e., other Brokers) do not send any read and write requests to the Followers. In this regard, this design is consistent with the existing Kafka request processing mechanism.

Now, one question remains: How is the Leader selected? Since it is Raft-based, it also adopts the Leader election strategy of the Raft algorithm. The Leader elected by Raft is referred to as the active Controller. There are many articles on the Internet about Raft leader election, so I won’t go into detail here. If you are interested, you can read the Raft paper: In Search of an Understandable Consensus Algorithm(Extended Version).

This Raft quorum has two benefits.

The first benefit is that it naturally provides low-latency failover, so the Leader switch will be very quick and timely. In theory, there is no longer a metadata loading process, as all metadata is now synchronously stored in the memory of the Follower nodes. They already have all the metadata information that other Brokers need to fetch!

What’s even cooler is that it avoids the inefficient behavior of fully fetching metadata when the Controller switches. The Broker does not need to re-fetch the previously “consumed” metadata changes; it only needs to continue “consuming” from the new Leader.

The other benefit is that with this mechanism, Kafka can cache metadata (metadata caching). This means that the Broker can save metadata on disk. Just like I mentioned earlier, the Broker only needs to read the portion of data it is concerned about. Moreover, similar to the current snapshot mechanism, if a Broker’s saved metadata lags behind the Controller too much or if it is a brand new Broker, Kafka can even directly send a snapshot file like Raft to quickly bring it up to date. Of course, in most cases, the Broker only needs to fetch delta increment data.

Summary #

Based on the above solutions, the community plans to complete the dependency on ZooKeeper for Kafka in three steps:

  1. Step 1: Remove the dependency on ZooKeeper for clients. This step has been mostly completed, with the exception of a few APIs in the AdminClient that still rely on ZooKeeper. Other client-side components should no longer need to access ZooKeeper.

  2. Step 2: Remove the dependency on ZooKeeper for brokers. This involves removing the code that requires brokers to access ZooKeeper, adding new broker-side APIs such as AlterISR, and finally consolidating all access to ZooKeeper in the controller.

  3. Final step: Implement controller quorum, which means implementing a Raft-based quorum responsible for controller election.

It can be said that removing ZooKeeper functionality is one of the most significant proposals in the community in recent years. This proposal is very rare in terms of the scope of components involved, the length of time it takes, and the complexity. Once this functionality is fully implemented in the future, Apache Kafka will greatly improve its maintainability and appear before us in a “refreshed” image. As for the final outcome, let’s wait and see.

After-class discussion #

As I mentioned earlier, the community plans to write their own Raft-based algorithm to implement Controller election. Why do you think the community chose not to directly use mature third-party Raft libraries?

Feel free to share your thoughts in the comment section and discuss it with me. You are also welcome to share today’s content with your friends.