06 Request Channels How Kafka Request Queues Are Implemented

06 Request Channels How Kafka Request Queues are Implemented #

Hello, I am Hu Xi. We have finished discussing the log module, how well did you grasp it? If you encounter any problems while exploring the source code, remember to write down your confusion in the comment section. I promise to address any doubts you may have.

Now, let’s embark on a new journey of studying the source code for the “request processing module”. To be honest, this is a module that I personally defined for the Kafka source code. There is no specific mention of a “request processing module” in the source code. But I always feel that this kind of division can help you clearly define the roles of different parts in the source code. It will enable you to read more purposefully and achieve better learning efficiency.

In this module, I will guide you through the key concepts of request processing, including the request processing channel, analysis of the complete request processing process, and analysis of the request entry class, etc. Today, let’s start by learning how Kafka implements the request queue. The main implementation class is located in the RequestChannel.scala file under core/src/main/scala/kafka/network.

When we talk about the Kafka server, also known as the broker, we often say that it performs the function of message persistence. But in essence, it is just a Java process that continuously receives external requests, processes them, and then sends back the result of the processing.

You may find it strange that the broker is not written in Scala, so why is it referred to as a Java process here? This is because Scala code is compiled into .class files, which have the same effect as Java code after compilation. Therefore, after the broker is started, it is still an ordinary Java process.

Efficiently managing the queued requests is essential to ensure high processing performance of the broker. Since that’s the case, you must be eager to know how the request queue is implemented on the broker. Next, let’s take a look at the modeling of the underlying request objects in the broker and the implementation principles of the request queue, as well as the core monitoring metrics for request processing in the broker.

Currently, the interaction between the broker and clients is mainly based on the Request/Response mechanism. Therefore, it is necessary for us to learn how the source code models or defines the Request and Response.

Request #

Let’s take a look at the code defining the Request in the RequestChannel source code.

sealed trait BaseRequest
case object ShutdownRequest extends BaseRequest

class Request(val processor: Int,
              val context: RequestContext,
              val startTimeNanos: Long,
              memoryPool: MemoryPool,
              @volatile private var buffer: ByteBuffer,
              metrics: RequestChannel.Metrics) extends BaseRequest {
  ......
}

To start with, the “trait” keyword in Scala is roughly equivalent to the “interface” in Java. From the code, we can see that BaseRequest is a trait interface that defines the basic request types. It has two implementation classes: ShutdownRequest and Request.

ShutdownRequest serves as a flag. When the Broker process is shutting down, the request handler (RequestHandler, which I will discuss in detail in Lesson 9) sends a ShutdownRequest to the dedicated request handling thread. Upon receiving this request, the thread will initiate a series of broker shutdown logic.

Request is the actual implementation class that defines various client or broker requests. Its defined properties include processor, context, startTimeNanos, memoryPool, buffer, and metrics. Let’s take a closer look at each of them.

processor #

processor represents the number of the Processor thread that receives and processes the request. The num.network.threads parameter controls the number of Processor threads created per listener in the broker.

Suppose your listeners configuration is PLAINTEXT://localhost:9092,SSL://localhost:9093. By default, the broker will create six Processor threads upon startup, divided into two groups of three threads each, with group numbers 0, 1, and 2 corresponding to the two listeners.

You may wonder why we need to keep track of the processor thread number. This is because after the Request has been processed by the subsequent I/O thread, it still needs to rely on the Processor thread to send the response back to the requester. Therefore, the Request must record which Processor thread it was received by. Additionally, it’s important to note that Processor threads are only responsible for network reception and do not execute the actual request processing logic, which is handled by the I/O threads.

context #

context is used to identify the request’s context information. The Kafka source code defines a RequestContext class, which, as the name suggests, holds all the context information about the request. The RequestContext class is defined in the clients module, and the following is the main logic code. I will use comments to explain the meaning of the main code.

public class RequestContext implements AuthorizableRequestContext {
    public final RequestHeader header; // Request header data, mainly metadata invisible to users, such as request type, request API version, clientId, etc.
    public final String connectionId; // TCP connection identification string of the request sender, defined by Kafka according to certain rules, mainly used to indicate the TCP connection
    public final InetAddress clientAddress; // IP address of the request sender
    public final KafkaPrincipal principal;  // Kafka user authentication class used for authentication and authorization
    public final ListenerName listenerName; // Listener name, can be predefined listener (e.g., PLAINTEXT) or self-defined
    public final SecurityProtocol securityProtocol; // Security protocol type, currently supports 4 types: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
    public final ClientInformation clientInformation; // User-defined connection information
    // Extract the request and corresponding size value from the given ByteBuffer
    public RequestAndSize parseRequest(ByteBuffer buffer) {
         ......
    }
    // Other getter methods
    ......
}

startTimeNanos #

startTimeNanos records the time when the Request object is created, mainly used for calculating various time statistics.

Many JMX metrics in the request object, especially time-related statistics, require the use of the startTimeNanos field. What you need to note is that it is a timestamp information in nanoseconds and can achieve very fine-grained time statistics accuracy.

memoryPool #

memoryPool represents a non-blocking memory buffer defined in the source code, mainly used to avoid infinite memory usage of Request objects.

Currently, the interface class and implementation class of this memory buffer are MemoryPool and SimpleMemoryPool, respectively. You can pay special attention to the tryAllocate method of SimpleMemoryPool to see how it allocates memory for Request objects.

buffer #

The buffer is the byte buffer that actually holds the content of the Request object. The request sender must write bytes to this buffer in the format specified by the Kafka RPC protocol, otherwise an InvalidRequestException will be thrown. This logic is mainly implemented by the parseRequest method of RequestContext.

public RequestAndSize parseRequest(ByteBuffer buffer) {
    if (isUnsupportedApiVersionsRequest()) {
        // An unsupported ApiVersions request type is regarded as a request of version V0 and no parsing operation is performed, directly returned
        ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), (short) 0, header.apiVersion());
        return new RequestAndSize(apiVersionsRequest, 0);
    } else {
        // Get ApiKeys information from the request header data
        ApiKeys apiKey = header.apiKey();
        try {
            // Get version information from the request header data
            short apiVersion = header.apiVersion();
            // Parse the request
            Struct struct = apiKey.parseRequest(apiVersion, buffer);
            AbstractRequest body = AbstractRequest.parseRequest(apiKey, apiVersion, struct);
            // Wrap the parsed request object and request size and return
            return new RequestAndSize(body, struct.sizeOf());
        } catch (Throwable ex) {
            // Any problem during parsing is regarded as an invalid request and an exception is thrown
            throw new InvalidRequestException("Error getting request for apiKey: " + apiKey +
                    ", apiVersion: " + header.apiVersion() +
                    ", connectionId: " + connectionId +
                    ", listenerName: " + listenerName +
                    ", principal: " + principal, ex);
        }
    }
}

As mentioned before, the main purpose of this method is to extract the corresponding Request object and its size from the ByteBuffer.

First, the code checks if this Request is an unsupported version of ApiVersions request by Kafka. If it is unsupported, it directly constructs a request of version V0 of ApiVersions and returns it. Otherwise, it continues with the code below.

Here I will briefly explain the purpose of ApiVersions request. When the Broker receives an ApiVersionsRequest, it returns a list of request types currently supported by the Broker, including request type names, the earliest supported version, and the latest supported version. If you check the bin directory of Kafka, you should be able to find a script tool named kafka-broker-api-versions.sh. Its implementation principle is to construct an ApiVersionsRequest object and send it to the corresponding Broker.

You may wonder why the code checks the version of ApiVersions request if it is an ApiVersions request type. This is because, unlike handling other types of requests, Kafka must ensure that ApiVersions requests with version numbers higher than the latest supported version can also be processed. This is mainly to consider compatibility issues between client and server versions. When the client sends a request to the Broker, it may not know which versions of requests the Broker actually supports. It needs to use the ApiVersionsRequest to get the complete list of request version support. However, if this check is not performed, the Broker may not be able to handle the ApiVersionsRequest sent by the client.

After this check, the code gets the ApiKeys information and the corresponding version information from the request header data, then parses the request, and finally wraps the parsed request object and request size, and returns them.

metrics #

metrics is a management class for various monitoring metrics related to the Request. It builds a Map inside, encapsulating all request JMX metrics. In addition to these important field properties mentioned above, most of the code in the Request class is related to monitoring metrics, which we will discuss in detail later.

Response #

After discussing the Request code, let’s talk about the Response. Kafka defines one abstract superclass and five concrete subclasses for Response, as shown in the diagram below:

Seeing so many classes, you may feel a bit confused. What are these used for? Don’t worry, now I will introduce the purpose of each class separately.

  • Response: Defines the abstract base class for Response. Each Response object contains the corresponding Request object. The most important method in this class is the onComplete method, which is used to implement the callback logic to be executed after each Response is processed.
  • SendResponse: Most Kafka requests require executing a callback logic after processing. SendResponse is a Response subclass that holds the returned result. The most important field in it is onCompletionCallback, which specifies the callback logic to be executed after processing.
  • NoResponse: Some requests do not require executing additional callback logic after processing. NoResponse is designed for this type of Response.
  • CloseConnectionResponse: Used in scenarios where TCP connection needs to be closed after an error occurs. In this case, CloseConnectionResponse is returned to the sender of the Request, explicitly notifying it to close the connection.
  • StartThrottlingResponse: Used to notify the Socket Server component of the broker (which I will explain later in several lessons) that a TCP connection communication channel has started throttling.
  • EndThrottlingResponse: Corresponding to StartThrottlingResponse, it notifies the SocketServer component of the broker that the throttling on a TCP connection communication channel has ended.

You may ask again, “Do I need to master all these classes?” Actually, you don’t need to. You only need to understand that SendResponse represents a normal Response that needs to be sent, while NoResponse represents a Response that does not need to be sent. As for CloseConnectionResponse, it is used to identify a Response that closes the connection channel. The latter two Response classes are not commonly used. They are only useful when throttling the Socket connection, which I won’t go into detail here.

Okay, now let’s take a look at the code related to Response.

abstract class Response(val request: Request) {
  locally {
    val nowNs = Time.SYSTEM.nanoseconds
    request.responseCompleteTimeNanos = nowNs
    if (request.apiLocalCompleteTimeNanos == -1L)
      request.apiLocalCompleteTimeNanos = nowNs
  }
  def processor: Int = request.processor
  def responseString: Option[String] = Some("")
  def onComplete: Option[Send => Unit] = None
  override def toString: String
}

This abstract base class has only one property field: request. This means that each Response object needs to store its corresponding Request object. As I mentioned earlier, the onComplete method is where the specified callback logic is called. The SendResponse class overrides this method, as shown below:

class SendResponse(request: Request,
                   val responseSend: Send,
                   val responseAsString: Option[String],
                   val onCompleteCallback: Option[Send => Unit]) 
  extends Response(request) {
    ......
    override def onComplete: Option[Send => Unit] = onCompleteCallback
}

Here, the SendResponse class inherits from the Response superclass and redefines the onComplete method. The logic of the override is very simple, which is to specify the input parameter onCompleteCallback. In fact, there is not much to say about the method itself. Instead, the Scala syntax here is worth mentioning.

In Scala, Unit is similar to void in Java, while “Send => Unit” represents a method. This method takes an instance of the Send class as input and then executes a block of code logic. Scala is a functional programming language, and functions are “first-class citizens” in Scala. Therefore, you can pass a function as a parameter to another function or return a function as a result. Here, the onComplete method uses the second usage, which is assigning a function to another function and returning it as a result. The advantage of doing this is that you can flexibly change onCompleteCallback to achieve different callback logic.

RequestChannel #

As the name suggests, RequestChannel is the channel for transmitting Request/Response. With the foundation of Request and Response, now we can learn about the implementation of the RequestChannel class.

Let’s first take a look at the definition and important fields of the RequestChannel class.

class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
 import RequestChannel._
 val metrics = new RequestChannel.Metrics
 private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
 private val processors = new ConcurrentHashMap[Int, Processor]()
 val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
 val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)

 ......
}

The RequestChannel class implements the KafkaMetricsGroup trait, which encapsulates many useful methods for monitoring metrics. For example, the newGauge method is used to create numerical metrics, and the newHistogram method is used to create histogram metrics.

As for the main functionality of the RequestChannel class itself, it defines three core attributes: requestQueue, queueSize, and processors. Let me explain their meanings separately.

Each instance of the RequestChannel object creates a queue to store various types of requests received by the Broker. This queue is called the request queue or Request queue. Kafka uses the ArrayBlockingQueue provided by Java to implement this request queue and utilizes its built-in thread-safety to ensure that multiple threads can access the request queue concurrently in a safe and efficient manner. In the code, this queue is defined by the variable requestQueue.

The field queueSize represents the maximum length of the Request queue. When the Broker starts, the SocketServer component creates a RequestChannel object and assigns the value of the Broker-side parameter queued.max.requests to queueSize. Therefore, in the default case, the length of the queue on each RequestChannel is 500.

The processors field encapsulates the Processor thread pool under RequestChannel’s jurisdiction. Each Processor thread is responsible for handling specific request processing logic. Let me explain the management of Processors in detail.

Processor Management #

The sixth line in the code above creates a Processor thread pool, which is stored in a ConcurrentHashMap built with Java. The Key in the Map is the processor number we mentioned earlier, and the Value corresponds to the specific Processor thread object.

The existence of this thread pool tells us that all network threads on the Kafka Broker side are maintained in the RequestChannel. Since the thread pool is created, there must be operations to manage the thread pool in the code. The addProcessor and removeProcessor methods in the RequestChannel are exactly what they do.

def addProcessor(processor: Processor): Unit = {
 // Add the Processor to the Processor thread pool
 if (processors.putIfAbsent(processor.id, processor) != null)
  warn(s"Unexpected processor with processorId ${processor.id}")
  newGauge(responseQueueSizeMetricName, 
   () => processor.responseQueueSize,
   // Create corresponding monitoring metrics for the given Processor object
   Map(ProcessorMetricTag -> processor.id.toString))
}

def removeProcessor(processorId: Int): Unit = {
 processors.remove(processorId) // Remove the given Processor thread from the Processor thread pool
 removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString)) // Remove the corresponding monitoring metrics for the Processor
}

The code is quite simple. Essentially, it calls the putIfAbsent and remove methods of ConcurrentHashMap to add and remove threads, respectively. Each time the Broker starts, it calls the addProcessor method to add num.network.threads Processor threads to the RequestChannel object.

If you refer to the Kafka official documentation, you will find that the update mode of the num.network.threads parameter is Cluster-wide. This means that Kafka allows you to dynamically modify the parameter value. For example, if you specify num.network.threads as 8 when the Broker starts, and then modify it to 3 using the kafka-configs command. Obviously, this operation will reduce the number of threads in the Processor thread pool. In this scenario, the removeProcessor method will be called.

Handling Requests and Responses #

In addition to Processor management, another important functionality of the RequestChannel is to handle Requests and Responses, which includes sending Requests and sending Responses. For example, the methods for sending and receiving Requests are sendRequest and receiveRequest, respectively:

def sendRequest(request: RequestChannel.Request): Unit = {
 requestQueue.put(request)
}
def receiveRequest(timeout: Long): RequestChannel.BaseRequest =
 requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
def receiveRequest(): RequestChannel.BaseRequest =
 requestQueue.take()

Sending a Request is just putting the Request object into the Request queue, while receiving a Request is taking the Request out from the queue. The entire process forms a mini version of the “producer-consumer” pattern and relies on the thread safety provided by ArrayBlockingQueue to ensure the thread safety of the entire process, as shown below:

As for Responses, there is no such thing as receiving Responses, only sending Responses. In other words, the sendResponse method is used to send the Response object, i.e., add the Response to the Response queue:

def sendResponse(response: RequestChannel.Response): Unit = {
 if (isTraceEnabled) {  // Construct the Trace log output string
  val requestHeader = response.request.header
  val message = response match {
   case sendResponse: SendResponse =>
    s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${sendResponse.responseSend.size} bytes."
   case _: NoOpResponse =>
    s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
   case _: CloseConnectionResponse =>
    s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
   case _: StartThrottlingResponse =>
    s"Notifying channel throttling has started for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
   case _: EndThrottlingResponse =>
    s"Notifying channel throttling has ended for client ${requestHeader.clientId} for ${requestHeader.apiKey}"
  }
  trace(message)
 }
 // Find out which Processor thread the response corresponds to, i.e., the Processor thread that processed the request initially
 val processor = processors.get(response.processor)
 // Put the Response object into the Response queue of the corresponding Processor thread
 if (processor != null) {
  processor.enqueueResponse(response)
 }
}

The logic of the sendResponse method is actually very simple.

The previous large block of if code is just to construct the Trace log output content. Depending on the type of Response, the code needs to determine the content to be output for the Trace log.

Next, the code finds the Processor thread corresponding to the Response object. When the Processor completes the processing of a Request, it encapsulates its own ID into the corresponding Response object. Once the Processor thread that originally processed the Request is found, the code directly calls the enqueueResponse method of that Processor to put the Response into the Response queue, waiting to be sent later.

Monitoring Metrics Implementation #

The RequestChannel class also defines rich monitoring metrics to dynamically monitor the performance of requests and responses. Let’s take a look at the specific metrics.

object RequestMetrics {
  val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
  val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
  val RequestsPerSec = "RequestsPerSec"
  val RequestQueueTimeMs = "RequestQueueTimeMs"
  val LocalTimeMs = "LocalTimeMs"
  val RemoteTimeMs = "RemoteTimeMs"
  val ThrottleTimeMs = "ThrottleTimeMs"
  val ResponseQueueTimeMs = "ResponseQueueTimeMs"
  val ResponseSendTimeMs = "ResponseSendTimeMs"
  val TotalTimeMs = "TotalTimeMs"
  val RequestBytes = "RequestBytes"
  val MessageConversionsTimeMs = "MessageConversionsTimeMs"
  val TemporaryMemoryBytes = "TemporaryMemoryBytes"
  val ErrorsPerSec = "ErrorsPerSec"
}

As you can see, there are many metrics, but don’t worry, we just need to understand a few important ones.

  • RequestsPerSec: The number of requests processed per second, used to evaluate the busy state of the broker.
  • RequestQueueTimeMs: Calculates the average waiting time of requests in the request queue, in milliseconds. If the waiting time of requests in the queue is too long, you usually need to increase the number of backend I/O threads to speed up the retrieval of requests from the queue.
  • LocalTimeMs: Calculates the actual processing time of requests, in milliseconds. If the value of this metric is large, you need to further investigate the logic of request processing and analyze which step consumes too much time.
  • RemoteTimeMs: Kafka’s read and write requests (PRODUCE and FETCH requests) involve waiting for other broker operations. RemoteTimeMs represents the time waiting for other brokers to complete specific operations. Since the waiting is for other brokers, it is called “Remote Time”. This metric is very important! In Kafka production environments, the main reason for high message sending delay in Producer programs with acks=all setting is often high Remote Time. Therefore, if you encounter such a problem, it is recommended to first determine if Remote Time is the bottleneck.
  • TotalTimeMs: Calculates the complete processing time of a request. This is the most practical monitoring metric, without a doubt! After all, we usually judge if there is a problem in the system based on TotalTimeMs. Once a problem is discovered, we can further determine the cause using the previous monitored metrics.

The RequestChannel defines the updateMetrics method to update the metrics. The logic is very simple, so I won’t go into detail. You can read it yourself.

Summary #

Alright, it’s time for a summary again.

Today, I took you through the source code implementation of the Kafka request queue. Around this topic, we covered several key points.

  • Request: Defines various types of requests supported by Kafka Brokers.
  • Response: Defines various types of responses corresponding to requests.
  • RequestChannel: Implements the Kafka request queue.
  • Monitoring Metrics: Encapsulates important monitoring metrics related to the request queue.

I hope you can take some time to reflect on the flow of requests and responses in the request channel and even the SocketServer based on what we discussed today. This will greatly help you understand how Kafka handles external requests. Of course, if you find this difficult, don’t worry, because I will dedicate a whole lesson to explaining these contents later.

After-class discussion #

If I want to monitor the current usage of the Request queue (e.g., how many Requests have been saved so far), can you point out, with reference to the source code, which monitoring metric should be used?

Feel free to express your thoughts and communicate with me in the comments section. You are also welcome to share today’s content with your friends.