09 Under Socket Server Full Process Source Code Analysis of Request Handling

09 Under SocketServer Full Process Source Code Analysis of Request Handling #

Hello, I am Hu Xi. In the previous lessons, we spent a lot of time studying the source code of the core components of SocketServer, including the Acceptor thread, Processor thread, and we also explored the Data plane and Control plane’s handling strategies for different types of requests.

Today, I will guide you through the complete Kafka request processing workflow. This workflow involves multiple source code files, and in order to understand the principles behind it, we must “jump around” between different methods. Compared to studying a single source code file, it is much more challenging to piece together methods from multiple files to form a complete workflow. Therefore, it is best for you to spend more time carefully studying all the methods related to this workflow.

Of course, you may have the question: “Why should I care about the request processing workflow? What is the significance of reading this part of the source code?” In fact, understanding these principles is extremely helpful for us to optimize the performance of request processing on the Broker side in a targeted manner.

For example, the Broker side has two parameters related to this workflow: num.network.threads and num.io.threads. Without understanding the request processing workflow, we wouldn’t be able to adjust these parameters effectively.

You see, the Kafka official website doesn’t tell us what network threads and I/O threads are. If we don’t understand that “requests are received by network threads and placed in a request queue,” we might make the mistake of increasing the value of num.network.threads blindly when the request queue is almost full. But the end result might be counterproductive. After today’s lesson, I believe that when encountering such a situation, you will know that we should increase the value of num.io.threads instead.

The num.io.threads parameter represents the size of the I/O thread pool. The so-called I/O thread pool is the KafkaRequestHandlerPool, also known as the request processing thread pool. In this lesson, I will first explain the source code of KafkaRequestHandlerPool, and then provide a detailed analysis of the code for the complete request processing workflow.

KafkaRequestHandlerPool #

The KafkaRequestHandlerPool is the place where Kafka requests are actually processed. Note that the class responsible for processing requests in Kafka is not the SocketServer or RequestChannel, but the KafkaRequestHandlerPool.

The KafkaRequestHandlerPool is defined in the KafkaRequestHandler.scala file, located in kafka.server in the core package. It is a small file of less than 400 lines and is relatively easy to understand.

Let me show you which components are included in this file with a diagram:

  • KafkaRequestHandler: This is the request processing thread class. Each instance of the request processing thread is responsible for fetching request objects from the request queue of SocketServer’s RequestChannel and processing them.
  • KafkaRequestHandlerPool: This is the request processing thread pool, responsible for creating, maintaining, managing, and destroying the request processing threads under it.
  • BrokerTopicMetrics: This is the management class for broker-related monitoring metrics.
  • BrokerTopicStats (C): This class defines the management operations for broker-related monitoring metrics.
  • BrokerTopicStats (O): This is the companion object class for BrokerTopicStats, which defines broker-related monitoring metrics, such as MessagesInPerSec and MessagesOutPerSec.

Let’s focus on the code for the first two components. The remaining three classes or objects are related to monitoring metrics and mostly consist of utility methods or constant definitions, which are easy to understand. So, we don’t need to spend too much time on them and should focus on learning about KafkaRequestHandler and its related management classes.

KafkaRequestHandler #

Let’s start by looking at its definition:

// Key field descriptions
// id: I/O thread number
// brokerId: Broker number, i.e., broker.id value
// totalHandlerThreads: I/O thread pool size
// requestChannel: Request channel for request processing
// apis: KafkaApis class, the actual implementation of the request processing logic
class KafkaRequestHandler(
  id: Int,
  brokerId: Int,
  val aggregateIdleMeter: Meter,
  val totalHandlerThreads: AtomicInteger,
  val requestChannel: RequestChannel,
  apis: KafkaApis,
  time: Time) extends Runnable with Logging {
  ......
}

From the definition, we can see that KafkaRequestHandler is a Runnable object, so you can think of it as a thread. Each instance of KafkaRequestHandler has four key properties.

  • id: The number of the request processing thread, similar to the ID number of the Processor thread, used only to identify which thread is in the thread pool.
  • brokerId: The broker number, used to identify the request processing thread on which broker.
  • requestChannel: The RequestChannel object in SocketServer. Why does KafkaRequestHandler define this field? We mentioned that it is responsible for processing requests, so where are the requests stored? In fact, the requests are stored in the request queue of the RequestChannel. Therefore, when constructing an instance of KafkaRequestHandler, Kafka must associate it with the RequestChannel instance in the SocketServer component, which means that the I/O thread can find where the requests are stored.
  • apis: This is a KafkaApis class. If KafkaRequestHandler is responsible for processing requests, then KafkaApis is the place where the actual request processing logic is executed. In Lesson 10, I will explain the code of KafkaApis in detail. For now, what you need to know is that it has a handle method for executing the request processing logic.

Since KafkaRequestHandler is a thread class, apart from the regular close, stop, initiateShutdown, and awaitShutdown methods, the most important aspect is the implementation of the run method, as shown below:

def run(): Unit = {
  // As long as the thread is not stopped, keep looping to execute the processing logic
  while (!stopped) {
    val startSelectTime = time.nanoseconds
    // Get the next request to be processed from the request queue
    val req = requestChannel.receiveRequest(300)
    val endTime = time.nanoseconds
    // Calculate the idle time of the thread
    val idleTime = endTime - startSelectTime
    // Update the idle percentage metric
    aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)
    req match {
      // Shutdown request
      case RequestChannel.ShutdownRequest =>
        debug(s"Kafka request handler $id on broker $brokerId received shut down command")
        // Shutdown the thread
        shutdownComplete.countDown()
        return
      // Normal request
      case request: RequestChannel.Request =>
        try {
          request.requestDequeueTimeNanos = endTime
          trace(s"Kafka request handler $id on broker $brokerId handling request $request")
          // Execute the corresponding processing logic using the KafkaApis.handle method
          apis.handle(request)
        } catch {
          // If a fatal error occurs, close the thread immediately
          case e: FatalExitError =>
            shutdownComplete.countDown()
            Exit.exit(e.statusCode)
          // If it is a normal exception, log the error
          case e: Throwable => error("Exception when handling request", e)
        } finally {
          // Release the memory buffer resources occupied by the request object
          request.releaseBuffer()
        }
      case null => // Continue
    }
  }
  shutdownComplete.countDown()
}

Although I have added comments to some important code, to help you understand better, I will draw a diagram to show you the processing logic of the KafkaRequestHandler thread:

Let me explain the main running logic of the run method. All the execution logic of the run method is under the while loop, so as long as the stopped flag, which indicates the thread’s closing state, is false, the run method will continue executing the statements under the while loop.

The first step is to get the next request to be processed from the request queue and update some related statistics. If no request is fetched in this iteration, the current loop ends and proceeds to the next iteration. If it is a ShutdownRequest request, it means that the broker initiated a shutdown operation.

When the broker closes, it calls the shutdown method of KafkaRequestHandler, which in turn calls the initiateShutdown method and the sendShutdownRequest method of RequestChannel. The latter writes the ShutdownRequest into the request queue.

Once the ShutdownRequest is obtained from the request queue, the run method calls the countDown method of shutdownComplete to officially complete the shutdown operation of the KafkaRequestHandler thread. You will understand how this works by looking at the shutdown method code of KafkaRequestHandlerPool. // Once the countDown method is called in the run method, the waiting state is released here handler.awaitShutdown() info(“shut down completely”) }

As the code comment says, once the countDown method is called in the run method, the waiting state at awaitShutdown is released, completing the entire thread shutdown operation.

Let’s continue with the run method. If a normal request is obtained from the request queue, the timestamp of the request being removed from the queue is updated first, and then the actual request processing logic is executed by the handle method in KafkaApis. Once the request processing is completed and the buffer resources are released, the code enters the next iteration, repeatedly executing the aforementioned logic.

KafkaRequestHandlerPool #

From the analysis above, the logic of KafkaRequestHandler is relatively simple. Now let’s take a look at the implementation of the KafkaRequestHandlerPool thread pool. It manages the I/O thread pool and its implementation logic is not complex. I have already mentioned its shutdown method earlier, here we will focus on how it creates these threads and when it creates them.

First, let’s look at its definition:

    // Key field explanations
    // brokerId: The index of the corresponding broker, i.e., the broker.id value
    // requestChannel: The `RequestChannel` object under `SocketServer` component
    // api: `KafkaApis` class, the actual request processing logic class
    // numThreads: The initial size of the I/O thread pool
    class KafkaRequestHandlerPool(
      val brokerId: Int, 
      val requestChannel: RequestChannel,
      val apis: KafkaApis,
      time: Time,
      numThreads: Int,
      requestHandlerAvgIdleMetricName: String,
      logAndThreadNamePrefix : String) 
      extends Logging with KafkaMetricsGroup {
      // Size of the I/O thread pool
      private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
      // I/O thread pool
      val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
      ......
    }

The KafkaRequestHandlerPool object defines 7 properties, and 4 of them are key. Let me explain each of them.

  • brokerId: Same as in KafkaRequestHandler, it stores the index of the broker.
  • requestChannel: The request handling channel of SocketServer. The request queue under it is shared by all the I/O threads. The requestChannel is also an important property of the KafkaRequestHandler class.
  • apis: An instance of KafkaApis that executes the actual request processing logic. It is also an important property of the KafkaRequestHandler class.
  • numThreads: The initial number of threads in the thread pool. It is the value of the broker parameter num.io.threads. Currently, Kafka supports dynamically modifying the size of the I/O thread pool, so numThreads is the initial number of threads, and the actual size of the I/O thread pool after adjustment may not be the same as numThreads.

Now I will further explain the relationship between the numThreads property and the actual number of threads in the thread pool. As I just mentioned, the size of the I/O thread pool can be modified. If you check the KafkaServer.scala file, you will see the following two lines of code:

    // KafkaServer.scala
    dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)

    controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, 1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)

From the code, we can see that the initial number of threads in the dataPlaneRequestHandlerPool for Data plane is config.numIoThreads, which is the value of the broker parameter nums.io.threads; and the number of threads for the Control plane’s thread pool is hardcoded as 1. Therefore, you can see that the value of the num.io.threads parameter on the Broker side controls the number of KafkaRequestHandler threads when the Broker starts. So, when you want to improve the request handling capacity on the Broker side at the beginning, you can try increasing this parameter value.

In addition to the above four properties, this class also defines a variable called threadPoolSize. Essentially, it just wraps the numThreads with an AtomicInteger.

Why is this done? This is because Kafka currently supports dynamically adjusting the number of threads in the KafkaRequestHandlerPool thread pool, but once the numThreads is passed into the class definition, it cannot be changed. Therefore, a separate variable is needed to support updating the thread pool size. As for why AtomicInteger is used, you should be able to imagine that this is to ensure the thread safety of concurrent access. After all, this attribute of the thread pool size may be accessed by multiple threads, and the atomic operations provided by AtomicInteger can effectively ensure this concurrent access while also providing the necessary memory visibility.

Since it is a class that manages the I/O thread pool, the most important field in KafkaRequestHandlerPool is the thread pool field runnables. In terms of code, Kafka chooses to use the Scala array object class to implement the I/O thread pool.

createHandler method

When the thread pool is initialized, Kafka uses the following code to create threads in batches and add them to the thread pool:

for (i <- 0 until numThreads) {
  createHandler(i) // Create numThreads I/O threads
}

// Create an I/O thread object with the specified id and start the thread
def createHandler(id: Int): Unit = synchronized {
  // Create a KafkaRequestHandler instance and add it to runnables
  runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
  // Start the KafkaRequestHandler thread
  KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}

Let me explain this code. The source code uses a for loop to call the createHandler method in batches to create multiple I/O threads. The body of the createHandler method can be divided into three steps:

  1. Create a KafkaRequestHandler instance.
  2. Add the created thread instance to the thread pool array.
  3. Start the thread.

resizeThreadPool method

Now let’s talk about the code of the resizeThreadPool method. The purpose of this method is to resize the number of threads in the I/O thread pool to the specified value. The code is as follows:

def resizeThreadPool(newSize: Int): Unit = synchronized {
  val currentSize = threadPoolSize.get
  info(s"Resizing request handler thread pool size from $currentSize to $newSize")
  if (newSize > currentSize) {
    for (i <- currentSize until newSize) {
      createHandler(i)
    }
  } else if (newSize < currentSize) {
    for (i <- 1 to (currentSize - newSize)) {
      runnables.remove(currentSize - i).stop()
    }
  }
  threadPoolSize.set(newSize)
}

This method first gets the current thread count. If the desired count is greater than the current count, it uses the previously mentioned createHandler method to fill the thread count to the target value newSize. Otherwise, it removes and stops the excess threads from the thread pool. Finally, it adjusts the value of the variable threadPoolSize, which represents the number of threads.

So now we have learned the three methods of KafkaRequestHandlerPool class: shutdown, createHandler, and resizeThreadPool. Overall, it is a class responsible for managing the I/O thread pool.

Overall Processing Flow #

With the above introduction, we can now learn about the code path for handling Kafka requests.

Let’s take a look at this image from [Lesson 7] again. Last time, I mainly wanted to use it to give you a general understanding of the network thread pool. Today, I will explain in detail the complete request processing logic shown in this image.

image

There are a total of 6 steps in the image. I will explain each step and also show you the corresponding source code.

Step 1: Clients or Other Brokers Send Requests to Acceptor Threads #

In [Lesson 7], I explained that the Acceptor threads continuously receive and handle incoming requests from external sources. Once a request is received, a corresponding socket channel is created, as shown in the following code:

// Fragment of the run() method in SocketServer.scala
// Get the number of I/O operations ready on the underlying channel
val ready = nioSelector.select(500)
// If there are ready I/O events
if (ready > 0) {
  // Get the corresponding SelectionKey collection
  val keys = nioSelector.selectedKeys()
  val iter = keys.iterator()
  // Iterate through these SelectionKeys
  while (iter.hasNext && isRunning) {
    try {
      val key = iter.next
      iter.remove()
      // Test if the underlying channel of the SelectionKey can accept new socket connections
      if (key.isAcceptable) {
        // Accept the connection and allocate the corresponding Processor thread
        accept(key).foreach { socketChannel =>
          var processor: Processor = null
          do {
            retriesLeft -= 1
            processor = synchronized {
              currentProcessorIndex = currentProcessorIndex % processors.length
              processors(currentProcessorIndex)
            }
            currentProcessorIndex += 1
            // Add the new socket connection to the Processor thread's pending connection queue
            // and wait for the Processor thread to handle it later
          } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
        }
      } else {
        ......
      }
  ......
}

As you can see, the Acceptor thread creates a SocketChannel by calling the accept method and then passes the SocketChannel instance to the assignNewConnection method, waiting for the Processor thread to put the socket connection request into its pending connection queue for processing. The run method of the Processor thread will continuously take out these socket connection requests from the queue, create corresponding socket connections, and process them.

The assignNewConnection method’s main purpose is to store the newly created SocketChannel object in the newConnections queue of the Processors thread. Subsequently, the Processor thread will continuously poll the pending Channels in this queue (you can refer to the configureNewConnections method in [Lesson 7]), and register a Java NIO Selector on these Channels for actual request retrieval and response sending I/O operations.

Strictly speaking, this step of the Acceptor thread is not truly fetching requests. It simply creates the required Socket channels and passes them to the subsequent Processor threads, paving the way for the Processor threads to fetch requests.

Steps 2 & 3: Processor Threads Handle Requests and Put Them into the Request Queue #

Once the Processor thread successfully registers a Selector on the SocketChannel, the requests sent by the Clients or other Broker sides can be obtained through this SocketChannel. The specific method is processCompleteReceives in the Processor:

// SocketServer.scala
private def processCompletedReceives(): Unit = {
    // Extract all received request data from the Selector
    selector.completedReceives.asScala.foreach { receive =>
      try {
        // Open the Socket Channel corresponding to the sender, throw an exception if no available Channel is found
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
            ......
            val header = RequestHeader.parse(receive.payload)
            if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
              ……
            else {
              val nowNanos = time.nanoseconds()
              if (channel.serverAuthenticationSessionExpired(nowNanos)) {
                ……
              } else {
                val connectionId = receive.source
                val context = new RequestContext(header, connectionId, channel.socketAddress,
                  channel.principal, listenerName, securityProtocol,
                  channel.channelMetadataRegistry.clientInformation)
                // Build a Request object based on the Receive object obtained from the Channel
                val req = new RequestChannel.Request(processor = id, context = context,
                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)

                ……
                // Put the request into the request queue
                requestChannel.sendRequest(req)
                ......
              }
            }
        }
      } catch (e: Throwable) {
        ......
      }
    }
}

As you can see, once the Processor thread successfully registers a Selector on the SocketChannel, the requests sent by the Clients side or other Broker side can be retrieved from the Selector via this SocketChannel. In the processCompleteReceives method, the Processor thread obtains all the received requests from the Selector, opens the corresponding Socket Channel for the sender, and builds a Request object based on the Receive object obtained from the Channel. Finally, the Processor thread puts the request into the request queue. …… } } catch { ……. } }

}

Because there is a lot of code, I have simplified it and only kept the most crucial logic. This method will convert all the Receive objects obtained by the Selector into corresponding Request objects, and then put these Request instances into the request queue, just as shown in steps 2 and 3 in the diagram.

The so-called "Processor" thread handling requests refers to the process of receiving data from the underlying I/O, converting it into Request object instances, and finally adding them to the request queue.

### Step 4: I/O Thread handling requests

The so-called "I/O Thread" refers to the KafkaRequestHandler thread mentioned at the beginning, and its processing logic is in the "run" method of the KafkaRequestHandler class:

```scala
// KafkaRequestHandler.scala
def run(): Unit = {
  while (!stopped) {
    .......
    // Get a Request instance from the request queue
    val req = requestChannel.receiveRequest(300)
    .......
    req match {
      case RequestChannel.ShutdownRequest =>
        .......
      case request: RequestChannel.Request =>
        try {
          .......
          apis.handle(request)
        } {
          .......
        }
      case null => // Do nothing
    }
  }
  .......
}

The KafkaRequestHandler thread continuously gets a Request instance from the request queue and then hands it over to the “handle” method of KafkaApis to execute the actual request processing logic.

Step 5: KafkaRequestHandler thread puts Response into Processor thread’s response queue #

This step is completed by the KafkaApis class, specifically by the KafkaRequestHandler thread. In KafkaApis.scala, there is a “sendResponse” method that sends the processing result of the Request, namely, the Response. Essentially, it calls the “sendResponse” method of the RequestChannel, as shown in the following code:

def sendResponse(response: RequestChannel.Response): Unit = {
  .......
  // Find out which Processor thread processed this Request
  val processor = processors.get(response.processor)
  if (processor != null) {
    // Add the Response to the response queue of that Processor thread
    processor.enqueueResponse(response)
  }
}

Step 6: Processor thread sends Response to the sender of the Request #

The last step is for the Processor thread to take out the Response from the response queue and return it to the sender of the Request. The specific code is in the “processNewResponses” method of the Processor thread:

// SocketServer.scala
private def processNewResponses(): Unit = {
    var currentResponse: RequestChannel.Response = null
    while ({currentResponse = dequeueResponse(); currentResponse != null}) { // Loop to get Responses from the response queue
      val channelId = currentResponse.request.context.connectionId
      try {
        currentResponse match {
          case response: NoOpResponse => // No need to send the Response
            updateRequestMetrics(response)
            trace(s"Socket server received empty response to send, registering for read: $response")
            handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
            tryUnmuteChannel(channelId)

          case response: SendResponse => // Need to send the Response
            sendResponse(response, response.responseSend)
        ......
        }
      }
      ......
    }
  }

From this code snippet, we can see that the core part is the “sendResponse” method, which sends the Response. This method uses the Selector to implement the actual sending logic at a lower level. This completes the entire process of handling a request.

Finally, I would like to add that, as I mentioned before, some Responses require callback logic, right?

In fact, after step 6 is completed, the Processor thread usually attempts to execute the callback logic in the Response. However, not all Requests or Responses specify callback logic. In fact, only a few Responses carry callback logic. For example, after a FETCH request sends a Response, it requires updating the statistics related to message format conversion on the Broker side.

Summary #

Today, we have learned about the KafkaRequestHandlerPool thread pool and its subordinate KafkaRequestHandler threads. These threads are referred to as I/O threads by the Kafka community. Additionally, I have explained Kafka’s request handling process by reviewing the source code. Let’s review the key points of this lesson.

  • KafkaRequestHandler: An I/O thread responsible for processing Request objects dispatched by the Processor thread.
  • KafkaRequestHandlerPool: Creates and manages a group of KafkaRequestHandler threads.
  • Request handling process: Consists of 6 steps.
  1. Clients or other brokers initiate connection requests through the Selector mechanism.
  2. The Processor thread receives the requests and converts them into processable Request objects.
  3. The Processor thread places the Request objects into the Request queue.
  4. KafkaRequestHandler threads retrieve pending requests from the Request queue and process them.
  5. KafkaRequestHandler threads return the Response to the Response queue of the corresponding Processor thread.
  6. The Processor thread sends the Response back to the original request sender.

In fact, when discussing the execution of Request logic today, I teased a bit—I mentioned that KafkaApis is the actual method for processing request logic. In other words, all types of request handling logic are encapsulated in the KafkaApis file, but I did not go into it in depth. In the next lesson, I will focus on this KafkaApis class and discuss it with you. I have always believed that this class is the primary entry point for examining all Kafka source code, and it is definitely worth spending an entire lesson learning about it.

After-class Discussion #

Finally, please think about a question based on today’s content: In the request handling process, which parts do you think apply the classical “producer-consumer” pattern?

You are welcome to express your thoughts and discuss with me in the comments section. Feel free to share today’s content with your friends.