26 the Kafka Controller You Definitely Shouldn't Miss

26 The Kafka Controller You Definitely Shouldn’t Miss #

Hello, I’m Hu Xi. Today, I want to share with you the topic of the controller component in Kafka.

The controller component, as a core component of Apache Kafka, plays a vital role in managing and coordinating the entire Kafka cluster with the help of Apache ZooKeeper. Any broker in the cluster can act as the controller, but during runtime, only one broker can become the controller to perform its management and coordination responsibilities. In other words, in any properly functioning Kafka cluster, there is always only one controller at any given time. There is a JMX metric called “activeController” available on the official website, which can help us monitor the live status of the controller in real-time. This JMX metric is crucial, and you must regularly check its value during operational tasks. Now, let’s discuss the principles and internal workings of the controller in detail.

Before we begin, let me briefly introduce Apache ZooKeeper framework. It’s important to know that the controller heavily relies on ZooKeeper. Therefore, it’s necessary to spend some time understanding what ZooKeeper does.

Apache ZooKeeper is a highly reliable distributed coordination service framework. It uses a data model similar to a file system’s tree structure, with the root directory starting with “/”. Each node on this structure is called a “znode” and is used to store some metadata coordination information.

In terms of znode persistence, znodes can be classified as either persistent or ephemeral. Persistent znodes do not disappear when the ZooKeeper cluster restarts, while ephemeral znodes are associated with the ZooKeeper session that created them and are automatically deleted once the session ends.

ZooKeeper allows clients to monitor changes in znodes through a feature called “Watch”. When a znode is created, deleted, or when the number of its child nodes changes, or when the data stored in the znode itself changes, ZooKeeper explicitly notifies the client through node change handlers.

Based on the aforementioned features, ZooKeeper is often used to implement functionalities such as cluster membership management, distributed locks, leader election, etc. The Kafka controller extensively utilizes the watch feature to coordinate and manage the cluster. Let’s take a look at an image showing the znode distribution created by Kafka in ZooKeeper. You don’t need to understand the role of each znode, but you can get a rough idea of Kafka’s dependence on ZooKeeper.

Znode Distribution in ZooKeeper

With this basic knowledge of ZooKeeper, we can now proceed with the discussion on the Kafka controller.

How is the controller selected? #

You must be curious about how the controller is selected. As we mentioned earlier, every broker has the ability to act as a controller. So, when the cluster starts, how does Kafka determine which broker is the controller?

In reality, when a broker starts up, it tries to create a /controller node in ZooKeeper. The current rule for electing the controller in Kafka is: the first broker to successfully create the /controller node will be designated as the controller.

What does a controller do? #

We often say that a controller is a component that plays a coordinating role, but what exactly does this coordinating role refer to? After pondering for a while, I think the responsibilities of a controller can be roughly divided into five categories. Let’s take a look together.

  1. Topic management (creation, deletion, adding partitions)

Topic management refers to the controller helping us complete operations such as creating, deleting, and adding partitions to Kafka topics. In other words, when we execute the kafka-topics script, most of the background work is done by the controller. In the following sections, I will provide a detailed explanation of how to use the kafka-topics script.

  1. Partition reassignment

Partition reassignment mainly refers to the fine-grained partition allocation provided by the kafka-reassign-partitions script (which I will also introduce later). This functionality is also implemented by the controller.

  1. Preferred leader election

Preferred leader election is mainly a solution provided by Kafka to avoid overloading a certain broker. I will discuss the tool in detail later in this column, but for now, you just need to understand that this is also within the scope of the controller’s responsibilities.

  1. Cluster member management (adding brokers, active broker shutdown, broker failure)

This is the fourth category of functionality provided by the controller, including automatic detection of newly added brokers, active broker shutdown, and passive broker failures. This automatic detection relies on the Watch mechanism mentioned earlier and the combination with ZooKeeper’s ephemeral nodes.

For example, the controller component will use the Watch mechanism to check changes in the number of child nodes under the /brokers/ids node in ZooKeeper. Currently, when a new broker starts, it will create a dedicated znode under /brokers. Once the creation is complete, ZooKeeper will push a notification message to the controller through the Watch mechanism, allowing the controller to automatically perceive this change and initiate subsequent operations for the new broker.

Detecting broker liveliness relies on another mechanism mentioned earlier: ephemeral nodes. After each broker starts, it creates an ephemeral znode under /brokers/ids. When a broker fails or is actively shut down, the broker’s session with ZooKeeper ends and this znode will be automatically deleted. Similarly, ZooKeeper’s Watch mechanism pushes this change to the controller, allowing the controller to know that a broker has been shut down or has failed, and therefore take appropriate measures.

  1. Data service

The last major category of work performed by the controller is providing data services to other brokers. The controller holds the most complete cluster metadata information, and all other brokers regularly receive metadata update requests from the controller to update their in-memory cached data.

What data does the controller save? #

Next, let’s take a detailed look at what data is saved in the controller. I will use a chart to illustrate.

As you can see in the chart, there is a lot of data displayed. It includes almost all the data related to the Kafka cluster that we can think of. The important data stored in the controller includes:

  • All topic information, including specific partition information such as leader replica, and the set of in-sync replicas (ISR).
  • All broker information, including currently running brokers, and brokers that are in the process of being shut down.
  • All partitions involved in maintenance tasks, including the list of partitions currently undergoing Preferred Leader Election and partition reassignment.

It is worth noting that this data is also stored in ZooKeeper. Every time the controller is initialized, it reads the corresponding metadata from ZooKeeper and populates its own cache. With this data, the controller can provide data services externally. Here, “externally” refers to other brokers, as the controller synchronizes this data to other brokers by sending requests to them.

Controller Failure Failover #

As mentioned earlier, during the operation of a Kafka cluster, only one broker can act as the controller. This poses a risk of single point of failure (SPOF). How does Kafka handle SPOF? The answer is by providing failover capability for the controller.

Failover refers to the ability of Kafka to quickly detect and immediately activate a standby controller to replace a failed controller when it unexpectedly crashes or terminates during operation. This process is automatically carried out and does not require manual intervention.

Next, let’s look at a diagram that briefly illustrates the process of controller failover.

At the beginning, Broker 0 is the controller. When Broker 0 crashes, ZooKeeper detects it through a watch mechanism and deletes the /controller ephemeral node. Afterwards, all surviving brokers start competing for the new controller role. Eventually, Broker 3 wins the election and successfully rebuilds the /controller node in ZooKeeper. After that, Broker 3 reads the cluster metadata information from ZooKeeper and initializes it in its own cache. At this point, the failover of the controller is completed, and it can perform its normal duties.

Internal Design Principles of the Controller #

Before Kafka version 0.11, the design of the controller was quite cumbersome, and the code was somewhat chaotic, which resulted in many controller-related bugs in the community that could not be fixed. The controller is designed with multiple threads and creates many threads internally. For example, the controller needs to create a corresponding socket connection for each broker, and then create a dedicated thread to send specific requests to these brokers. If there are many brokers in the cluster, the controller needs to create many threads. In addition, the controller connects to ZooKeeper and creates a separate thread to handle the notification callback of the watch mechanism. In addition to these threads, the controller also creates additional I/O threads for topic deletion.

Even worse than the multi-threaded design is that these threads also access shared controller cache data. We all know that multi-threaded access to shared mutable data is the biggest challenge in maintaining thread safety. In order to protect the data security, the controller had to use a large number of ReentrantLock synchronization mechanisms in the code, which further slows down the processing speed of the entire controller.

In view of these reasons, the community refactored the underlying design of the controller in version 0.11, and the biggest improvement is to change the multi-threaded solution to a single-threaded solution with an event queue. I will use a diagram provided by the community to illustrate it directly.

From this diagram, we can see that the community has introduced an event handling thread to handle various controller events, and then the controller models all the operations that were previously executed as independent events and sends them to a dedicated event queue for consumption by this thread. This is what is called a single-threaded + queue implementation.

It is worth noting that the single thread here does not mean that all the threads mentioned earlier are “killed off”. The controller simply delegates the work of cache state changes to this thread.

The biggest advantage of this solution is that the status stored in the controller cache is handled by only one thread, so there is no longer a need for heavyweight thread synchronization mechanisms to maintain thread safety. Kafka no longer has to worry about the problem of concurrent access by multiple threads, which is very beneficial for the community to locate and diagnose various controller problems. In fact, since the refactoring of the controller code in version 0.11, there have been significantly fewer bugs related to the controller in the community, which also demonstrates the effectiveness of this solution.

The second improvement for the controller is to change all synchronous operations with ZooKeeper to asynchronous operations. The ZooKeeper API itself provides both synchronous and asynchronous write methods. Previously, the controller used synchronous API for operations with ZooKeeper, resulting in poor performance, mainly manifested when there are a large number of topic partitions undergoing changes, ZooKeeper can easily become a bottleneck in the system. The new version of Kafka has modified this part of the design, completely abandoning the previous synchronous API calls, and instead using asynchronous API to write to ZooKeeper, greatly improving performance. According to the community’s tests, after changing to asynchronous, ZooKeeper write performance has improved by 10 times!

In addition to the above, the community has recently made a major improvement! Previously, all requests received by the broker were treated equally, without any differentiation. This design is very unfair to the requests sent by the controller because such requests should have higher priority.

As a simple example, let’s say we delete a certain topic, and then the controller will send a request called StopReplica to the brokers where all the replicas of this topic reside. If there are a large number of backlogged Produce requests on the broker at this time, then this StopReplica request can only wait in line. If these Produce requests are meant to send messages to this topic, it becomes ironic: the topic is going to be deleted, is it meaningful to process these Produce requests? In this case, the most reasonable processing order should be to give the StopReplica request higher priority so that it can be preemptively processed.

Before version 2.2, this was not possible. But starting from version 2.2, Kafka officially supports the handling of requests with different priorities. Simply put, Kafka separates the requests sent by the controller from ordinary data requests and implements separate processing logic for controller requests. Since this improvement is a very new feature, let’s wait and see the specific effects.

Summary #

Alright, I have finished discussing the content about Kafka controllers. Finally, I will share with you a little trick. When you encounter problems with the controller component, such as being unable to delete a topic or partition reassignment hanging, you don’t need to restart Kafka Broker or the controller. There is a simple and fast way to manually delete the /controller node in ZooKeeper. The specific command is rmr /controller. By doing this, you can trigger controller re-election and avoid message processing interruption caused by broker restart.

Open Discussion #

Currently, the controller still heavily relies on ZooKeeper. In the future, what do you think are the possible directions to reduce dependence on ZooKeeper?

Please feel free to share your thoughts and answers, and let’s discuss together. If you find it helpful, you are also welcome to share this article with your friends.