24 How Are Requests Processed

24 How Are Requests Processed #

Hello, I am Hu Xi. Today, I want to share with you the topic of how Kafka requests are processed.

Whether it is the Kafka client or the broker, their interaction is done through the “request/response” mechanism. For example, the client sends a message production request to the broker via the network, and after the broker processes it, it sends the corresponding response back to the client.

Apache Kafka defines its own set of request protocols to implement various types of interactions. For example, the commonly used PRODUCE request is used for message production, the FETCH request is used for message consumption, and the METADATA request is used to request metadata information from the Kafka cluster.

In summary, Kafka defines many similar request formats. I counted, and as of the latest version 2.3, Kafka has defined as many as 45 different request formats. All requests are communicated through TCP network using the Socket mechanism.

Today, let’s discuss in detail the entire process of how Kafka brokers handle requests.

Two common approaches to handle requests #

When it comes to handling requests, there are two commonly used approaches.

  1. Sequential request handling. If represented in pseudocode, it would look like this:

    while (true) {
       Request request = accept(connection);
       handle(request);
    }
    

    This method is simple to implement but has a fatal flaw - poor throughput. Since each request is processed in sequence, each request must wait for the previous request to be handled before it can be processed. This approach is only suitable for systems with infrequent request sending.

  2. Dedicated thread per request. This means creating a new thread for each inbound request to handle it asynchronously. Let’s take a look at the pseudocode for this approach:

    while (true) {
       Request request = accept(connection);
       Thread thread = new Thread(() -> {
          handle(request);
       });
       thread.start();
    }
    

    This method takes the opposite approach and fully utilizes asynchronous processing. A dedicated thread is created for each inbound request to handle it. The advantage of this method is that it is completely asynchronous, and the handling of each request does not block the processing of the next request. However, the drawback is also apparent. The approach of creating a thread for each request comes with high overhead and can potentially overwhelm the entire service. As the saying goes, this method is only suitable for business scenarios with a very low request frequency.

Since neither of these two approaches is ideal, how does Kafka handle requests? In a nutshell, Kafka uses the Reactor pattern.

How does Kafka handle requests? #

When it comes to the Reactor pattern, Doug Lea’s “Scalable IO in Java” is probably the best introductory material. Even if you haven’t heard of Doug Lea, you’ve probably used ConcurrentHashMap, which he wrote. In fact, the entire java.util.concurrent package is his masterpiece!

Ok, let’s get back to the Reactor pattern. In simple terms, the Reactor pattern is an implementation of event-driven architecture, which is particularly suitable for scenarios where multiple clients concurrently send requests to the server. I will use one of Doug Lea’s PowerPoint slides to explain the architecture of the Reactor pattern and introduce Kafka’s request handling model.

The architecture of the Reactor pattern is shown in the following diagram:

From this diagram, we can see that multiple clients send requests to the Reactor. The Reactor has a request dispatch thread called the Dispatcher, which is the Acceptor in the diagram. The Dispatcher dispatches different requests to multiple worker threads for processing.

In this architecture, the Acceptor thread is only used for request distribution and is not involved in specific logical processing. It is very lightweight and therefore has a high throughput performance. The number of worker threads can be dynamically adjusted according to the actual business processing needs, enabling dynamic load balancing.

If we were to draw a similar diagram for Kafka, it would look like this:

Obviously, these two diagrams look very similar. Kafka’s broker-side has a SocketServer component, similar to the Dispatcher in the Reactor pattern. It also has a corresponding Acceptor thread and a worker thread pool, but in Kafka, this worker thread pool has a special name, called the network thread pool. Kafka provides a broker-side parameter called num.network.threads to adjust the number of threads in this network thread pool. The default value is 3, which means that each broker will create 3 network threads specifically for handling client requests.

The Acceptor thread uses a polling mechanism to distribute inbound requests fairly among all network threads. Therefore, in actual usage, these threads usually have an equal chance of being assigned to pending requests. This polling strategy is simple to implement and avoids request processing skewness, making it conducive to achieving fair request processing scheduling.

Now that you understand that client requests sent are distributed by the Acceptor thread of the broker-side to any of the network threads, let’s see how they are processed once received by the network thread. You might think they can be sequentially processed, right? In fact, Kafka adds another layer of asynchronous thread pool processing at this stage. Let’s take a look at the following diagram together.

When a network thread receives a request, instead of processing it on its own, it puts the request into a shared request queue. The broker-side also has an I/O thread pool responsible for taking requests from this queue and performing the actual processing. For PRODUCE requests, it writes messages to the underlying disk log; for FETCH requests, it reads messages from the disk or page cache.

The threads in the I/O thread pool are the ones that perform the request logic. The broker-side parameter num.io.threads controls the number of threads in this thread pool. The current default value is 8, which means that each broker automatically creates 8 I/O threads to handle requests after startup. You can set the number of threads in this thread pool based on the actual hardware conditions.

For example, if the CPU resources on your machine are abundant, you can increase this parameter, allowing more concurrent requests to be processed simultaneously. After the I/O thread processes a request, it sends the generated response to the response queue of the network thread pool, which is then responsible for returning the response to the client.

If you have been paying attention, you may have noticed the difference between the request queue and the response queue: the request queue is shared by all network threads, while each network thread has its own response queue. The reason for this design is that the Dispatcher is only responsible for request distribution and not for response transmission. Thus, each network thread can only send the response to the client itself, so there is no need to place these responses in a common place.

Let’s take another look at the previous diagram. You will see a component called Purgatory, which is Kafka’s famous “purgatory” component. It is used to cache delayed requests. Delayed requests are those requests that cannot be processed immediately because the conditions are not met at the moment. For example, for PRODUCE requests with acks=all, once acks=all is set, the request must wait until all replicas in the In-Sync Replicas (ISR) have received the message before returning. At this time, the I/O thread that handles the request must wait for the write result from other brokers. When a request cannot be processed immediately, it will be temporarily stored in Purgatory. Once the completion condition is met later, the I/O thread will continue to process the request and put the response into the response queue of the corresponding network thread.

With that, we have completed the story of Kafka’s request processing analysis. I believe you should now understand how a Kafka broker processes a request from start to finish. But we won’t wrap up just yet. I want to give you a little bonus and tell you something different today.

Separating Control Requests from Data Requests #

So far, I mentioned that the request handling process is applicable to all requests, which means that Kafka Broker treats all requests equally. However, within Kafka, in addition to the PRODUCE and FETCH requests sent by clients, there are many other types of requests that perform different operations, such as the LeaderAndIsr request responsible for updating the Leader replica, Follower replica, and ISR set, as well as the StopReplica request responsible for ordering the replica to go offline. These requests are different from PRODUCE and FETCH requests in one obvious way: they are not data requests but control requests. In other words, they do not operate on message data, but rather execute specific internal actions in Kafka.

The Kafka community refers to requests such as PRODUCE and FETCH as data requests and requests such as LeaderAndIsr and StopReplica as control requests. If we look into it, the current approach of treating all requests equally is not reasonable for control requests. Why is that? Because control requests have this ability: they can directly invalidate data requests!

Let me give an example to illustrate. Suppose we have a topic with only one partition, and this partition has two replicas configured, with the Leader replica stored on Broker 0 and the Follower replica stored on Broker 1. Let’s say Broker 0 is backed up with many PRODUCE requests. At this point, if you use the Kafka command to forcibly switch the roles of the Leader and Follower for this partition, the Kafka internal controller component will send a LeaderAndIsr request to Broker 0, explicitly telling it that it is no longer the Leader but the Follower, and the Follower replica on Broker 1 stops fetching messages from Broker 0 because it has been selected as the new Leader.

Now, an awkward situation arises: if the backlogged PRODUCE requests were set with acks=all, then these requests made before the LeaderAndIsr request was sent would not be able to complete normally. As mentioned earlier, they would be temporarily stored in the Purgatory and retried until they eventually time out and return to the client.

Just imagine, if Kafka could prioritize the handling of LeaderAndIsr requests, Broker 0 would immediately throw a “NOT_LEADER_FOR_PARTITION” exception, quickly indicating that these backlogged PRODUCE requests have failed, so that the client does not have to wait for the timeout of the requests in the Purgatory and can immediately detect it, thereby reducing the processing time of the requests. Even if acks is not “all”, the backlogged PRODUCE requests can successfully write to the Leader replica’s log. However, after handling LeaderAndIsr, when the Leader on Broker 0 becomes the Follower, it still needs to perform explicit log truncation (that is, after the original Leader replica becomes a Follower, it will delete all the messages written but not committed), which is still a lot of unnecessary work.

Let’s take another example. On a broker with a large number of backlogged data requests, when you delete a topic, the Kafka controller (which I will specifically introduce later in the column) sends a StopReplica request to that broker. If this request cannot be processed in a timely manner, the topic deletion operation will hang indefinitely, thereby increasing the delay of deleting the topic.

Based on these problems, the community officially implemented the separation of data requests and control requests in version 2.3. In fact, before the community introduced this solution, I tried to modify this design myself. At that time, my idea was to implement a priority queue in the broker and give control requests a higher priority. This was a very natural idea, so I thought the community would also implement it like this. But later, my solution was clearly recorded in the “rejected solution” list.

The biggest problem with this solution is that it cannot handle the situation where the request queue is full. When the request queue is unable to accommodate any new requests, even with priorities, it cannot process new control requests.

So, how did the community solve it? It’s quite simple. You can take another look at the third picture today. The community completely copied a set of components from this picture and implemented the separation of the two types of requests. In other words, after Kafka Broker starts, it will create two sets of network thread pools and IO thread pools in the background, each handling data requests and control requests respectively. As for the socket ports used, different ports are naturally used. You need to provide different “listeners” configurations to explicitly specify which set of ports is used to handle which types of requests.

Summary #

With that said, the analysis of Kafka Broker’s request processing flow should now be relatively complete. Understanding the process of request processing is crucial for future Kafka performance optimization. If you can think about the working principle of Kafka from the perspective of requests, you will find that optimizing Kafka is not a difficult task.

Open Discussion #

To be honest, I am a little bit reluctant to accept the community’s rejection of the priority queue solution. If it were up to you, how would you propose avoiding the issue of a full queue in the priority queue solution?

Feel free to share your thoughts and answers. Let’s discuss together. If you find this article helpful, please feel free to share it with your friends.