07 on Socket Server How Kafka Applies Nio to Implement Network Communication

07 On SocketServer How Kafka Applies NIO to Implement Network Communication #

Hello, I’m Hu Xi. In this lesson, we will talk about the source code of Kafka’s underlying NIO communication mechanism.

When it comes to the principles behind Kafka’s high performance and high throughput, many people are fascinated by its use of Java NIO. Understanding how Kafka actually applies NIO to implement network communication is not only a prerequisite for mastering the full process of Kafka request handling, but also greatly beneficial for understanding the implementation of the Reactor pattern and solving practical problems.

For example, when the broker’s processing speed is slow and needs optimization, you can only come up with the appropriate solution and provide targeted tuning parameters if you have a clear understanding of the working principle of the SocketServer component.

So, today let’s conquer this crucial NIO communication mechanism together.

Network Communication Layer #

Before diving into the various network components of Kafka, let’s first take a look at the overall architecture of the network communication layer as shown in the diagram below:

From the diagram, we can see that the Kafka network communication components mainly consist of two parts: SocketServer and KafkaRequestHandlerPool.

The core component is the SocketServer, which implements the Reactor pattern. It is used to handle concurrent requests from multiple external clients (broadly referring to producers, consumers, or other brokers) and is responsible for packaging the processing results into responses and returning them to the clients.

The KafkaRequestHandlerPool component is what we commonly refer to as the I/O thread pool. It defines several I/O threads that are used to execute the actual request processing logic.

The interaction between the two components happens through the RequestChannel object and the processor threads defined in the SocketServer. By the way, when I mention threads, they are essentially of the Runnable type in the code, whether it’s the Acceptor class, the Processor class, or the KafkaRequestHandler class that we will discuss separately later.

Now, let me give you a slight hint. In Lesson 9, I will provide a detailed introduction to the KafkaRequestHandlerPool thread pool. But for now, what you need to know is that the KafkaRequestHandlerPool thread pool defines multiple KafkaRequestHandler threads, which are the places where the actual request logic is handled. Compared to KafkaRequestHandler, the Acceptor and Processor threads I mentioned today can be considered as mere “porters” of requests and responses in a sense.

After understanding the complete architecture of the network communication layer, we need to focus on the SocketServer component. This component is the most important sub-module in Kafka’s network communication layer. The Acceptor threads, Processor threads, and RequestChannel objects it oversees are all important components for implementing network communication. You might be surprised to learn that this set of thread combinations exists multiple times in the source code, each serving different purposes. In the next lesson, I will share with you the specific scenarios in which different thread combinations are applied.

Now let’s delve into learning about the SocketServer component.

Overview of SocketServer #

The source code of the SocketServer component is located in the “core” package of the Kafka project. Specifically, it can be found in the “src/main/scala/kafka/network” directory, in a file named “SocketServer.scala”.

SocketServer.scala is an ancient source code file. In the history of Kafka’s source code development, many files have come and gone, but this file has consistently “remained strong” and continues to be improved. If you look at its Git revision history, you will find that its earliest modification dates back to August 2011, which highlights its long history.

Currently, SocketServer.scala is a large file with nearly 2000 lines of code, divided into 8 sections. Let me provide a brief introduction to these components, and then we will focus on studying the source code of the Acceptor class and the Processor class. After all, these two classes are critical components for implementing network communication. Also, the information I’m giving you today is just a general overview of the SocketServer component. In the next lesson, I will provide a more detailed explanation of its definition.

  1. AbstractServerThread class: This is the abstract base class for the Acceptor thread and the Processor thread. It defines common methods for these two threads, such as shutdown (to close the thread). I won’t go into detail about the code in this abstract class, but you should pay attention to the usage of the CountDownLatch class for starting and closing threads.

If you’re looking for best practices in Java thread-safe programming, you shouldn’t miss the CountDownLatch class. Kafka’s thread control code extensively uses programming techniques based on CountDownLatch to achieve elegant thread startup, thread shutdown, and other operations. Therefore, I recommend that you become proficient in using them and apply them in your future work.

  1. Acceptor thread class: This is the thread responsible for accepting and creating external TCP connections. Each SocketServer instance creates only one Acceptor thread. Its sole purpose is to create connections and pass received Requests to the downstream Processor thread for processing.

  2. Processor thread class: This is the thread responsible for handling all requests on a single TCP connection. By default, each SocketServer instance creates several (num.network.threads) Processor threads. Processor threads are responsible for adding received Requests to the RequestQueue of the RequestChannel, and also for returning Responses to the Request senders.

  3. Processor companion object class: This class only defines commonly used monitoring metrics and constants related to the Processor thread, such as Processor thread idle rate, etc.

  4. ConnectionQuotas class: This class controls the connection quotas. We can set the maximum number of Broker connections that can be created by a single IP address, as well as the maximum number of connections allowed by a single Broker.

  5. TooManyConnectionsException class: This is an exception class defined by SocketServer, used to indicate when the connection quota has been exceeded.

  6. SocketServer class: This class implements the management and operation of all the above components, such as creating and closing Acceptor and Processor threads.

  7. SocketServer companion object class: This class defines useful constants and clarifies which parameters in the SocketServer component can be dynamically modified.

Acceptor Thread #

In the classic Reactor pattern, there is a role called “Dispatcher” that receives external requests and dispatches them to the actual processing threads below. In Kafka, this Dispatcher is the Acceptor thread.

Let’s take a look at its definition:

private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              connectionQuotas: ConnectionQuotas,
                              metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  // Create the underlying NIO Selector object
  // The Selector object is responsible for executing the actual I/O operations, such as listening for connection creation requests, read/write requests, etc.
  private val nioSelector = NSelector.open() 
  // Create the corresponding ServerSocketChannel instance in the Broker
  // Register this Channel with the Selector object in the previous step
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
  // Create the Processor thread pool, which is actually an array of Processor threads
  private val processors = new ArrayBuffer[Processor]()
  private val processorsStarted = new AtomicBoolean

  private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
    "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
  ......
}

From the definition, the Acceptor thread receives 5 parameters, of which 3 are important:

  • endPoint: It is the connection information for your Kafka Broker, such as PLAINTEXT://localhost:9092. The Acceptor needs to use the host name and port information in endPoint to create a Server Socket.
  • sendBufferSize: It sets the SocketOptions SO_SNDBUF, which is used to set the size of the underlying buffer for outbound network I/O. The default value is the value of the Broker parameter socket.send.buffer.bytes, which is 100KB.
  • recvBufferSize: It sets the SocketOptions SO_RCVBUF, which is used to set the size of the underlying buffer for inbound network I/O. The default value is the value of the Broker parameter socket.receive.buffer.bytes, which is 100KB.

Speaking of this, I would like to give you an optimization suggestion. If in your production environment, the network latency between Clients and Brokers is high (e.g., RTT>10ms), I suggest increasing the two parameters that control the buffer size, namely sendBufferSize and recvBufferSize. Usually, the default value of 100KB is too small.

In addition to the class-defined fields, the Acceptor thread also has two very important custom properties.

  • nioSelector: It is an instance of the Java NIO library Selector object, and is the foundation for all network communication components implementing the Java NIO mechanism. If you are not familiar with Java NIO, I recommend you to learn from this tutorial: Java NIO.
  • processors: Network Processor thread pool. When the Acceptor thread is initialized, it needs to create the corresponding network Processor thread pool. As can be seen, Processor threads are managed and maintained in the Acceptor thread.

Since that’s the case, it must define related methods. In the Acceptor code, there are three methods related to Processor, namely addProcessors, startProcessors, and removeProcessors. Since their code is very simple, I will provide the main logic steps in comments:

addProcessors #

private[network] def addProcessors(
  newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
  processors ++= newProcessors // Add a group of new Processor threads
  if (processorsStarted.get) // If the Processor thread pool has already started
    startProcessors(newProcessors, processorThreadPrefix) // Start the new Processor threads
}

startProcessors #

private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
  if (!processorsStarted.getAndSet(true)) {  // If the Processor thread pool has not started
    startProcessors(processors, processorThreadPrefix) // Start the given Processor threads
  }
}

private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
  processors.foreach { processor => // Create and start Processor threads one by one
    // Thread naming convention: processor thread prefix-kafka-network-thread-brokerId-listenerName-securityProtocol-ProcessorId
    // Assume the Broker with ID 0 sets PLAINTEXT://localhost:9092 as the connection information, then the names of the three Processor threads will be:
    // data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0
    // data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-1
    // data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-2
    KafkaThread.nonDaemon(s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor).start()
  }
}

removeProcessors #

private[network] def removeProcessors(removeCount: Int, requestChannel: RequestChannel): Unit = synchronized {
  // Get the last removeCount processors from the processors thread pool
  val toRemove = processors.takeRight(removeCount)
  // Remove the last removeCount processors
  processors.remove(processors.size - removeCount, removeCount)
  // Shutdown the last removeCount processors
  toRemove.foreach(_.shutdown())
  // Remove these processors from the RequestChannel
  toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
}

To illustrate the logic of these methods more vividly, I drew a diagram that includes the execution flow of these 3 methods, as shown in the following figure:

The recently learned addProcessors, startProcessors, and removeProcessors methods are used to manage the Processor thread pool. It can be said that with these three methods, the Acceptor class has the basic functionality to manage the Processor thread pool. However, the main implementation method for dispatching logic in the Reactor pattern is actually the run method of the Acceptor class. Below, I will provide the overall running logic of the run method in comments, as shown below:

def run(): Unit = {
  // Register the OP_ACCEPT event
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
  // Wait for the Acceptor thread to complete startup
  startupComplete()
  try {
    // The current index of the Processor being used, starting from 0, with a maximum value of num.network.threads - 1
    var currentProcessorIndex = 0
    while (isRunning) {
      try {
        // Check for ready I/O events every 500 milliseconds
        val ready = nioSelector.select(500)
        if (ready > 0) { // If there are ready I/O events
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          while (iter.hasNext && isRunning) {
            try {
              val key = iter.next
              iter.remove()
              if (key.isAcceptable) {
                // Call the `accept` method to create a socket connection
                accept(key).foreach { socketChannel =>
                  var retriesLeft = synchronized(processors.length)
                  var processor: Processor = null
                  do {
                    retriesLeft -= 1
                    // Specify which Processor thread will handle the connection
                    processor = synchronized {
                      currentProcessorIndex = currentProcessorIndex % processors.length
                      processors(currentProcessorIndex)
                    }
                    // Update the Processor thread index
                    currentProcessorIndex += 1
                  } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) // Whether the Processor accepts this connection
                }
              } else
                throw new IllegalStateException("Unrecognized key state for acceptor thread.")
            } catch {
              case e: Throwable => error("Error while accepting connection", e)
            }
          }
        }
      }
      catch {
        case e: ControlThrowable => throw e
        case e: Throwable => error("Error occurred", e)
      }
    }
  } finally { // Perform various resource closing logic
    debug("Closing server socket and selector.")
    CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
    CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
    shutdownComplete()
  }
}

The code seems a bit long, so let me use a diagram to explain the main processing logic of the run method. The key point here is that the Acceptor thread first determines which Processor thread will handle each inbound request, and then calls the assignNewConnection method to instruct the Processor thread to create a connection with the sender.

Essentially, the Acceptor thread uses Java NIO Selector + SocketChannel to loop and poll for ready I/O events. Here, the I/O events mainly refer to the creation of network connections, i.e., the SelectionKey.OP_ACCEPT in the code. Once an external connection request is received, the Acceptor specifies a Processor thread and hands the request to it to create the actual network connection. In summary, this is what the Acceptor thread does.

Processor Thread #

Now let’s dive into the source code for the Processor thread.

If the Acceptor is responsible for handling inbound connections, then the Processor code is where the connections are actually created and requests are dispatched. Clearly, there is much more that the Processor needs to do compared to the Acceptor. Let’s start by looking at the run method of the Processor thread:

override def run(): Unit = {
    startupComplete() // Wait for Processor thread to start up
    try {
      while (isRunning) {
        try {
          configureNewConnections() // Create new connections
          // Register any new responses for writing
          processNewResponses() // Send response and add it to the inflightResponses temporary queue
          poll() // Execute NIO poll to get the I/O operations that are ready on the corresponding SocketChannel
          processCompletedReceives() // Add received Requests to the Request queue
          processCompletedSends() // Perform callback logic for Responses in the temporary Response queue
          processDisconnected() // Handle disconnections caused by failed sending
          closeExcessConnections() // Close connections that exceed the quota limit
        } catch {
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally { // Close underlying resources
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
}

The logic of the run method is well organized, and the boundaries of each sub-method are very clear. Therefore, from a high-level perspective, this method exhibits the encapsulation characteristic that is rarely seen in the object-oriented domain. I will use a diagram to illustrate what this method does:

Before going into the details of the run method, let’s first take a look at what needs to be done when the Processor thread is initialized.

Each Processor thread creates three queues upon creation. Note that these queues are broad in nature, and the underlying data structure used could be a blocking queue or just a Map object, as shown below:

private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

Queue 1: newConnections

It stores the information of the new connections to be created, specifically the SocketChannel object. This is a queue with a default limit of 20, and the length of the queue is hardcoded in the code, so you cannot change the length of this queue.

Whenever the Processor thread receives a new connection request, it puts the corresponding SocketChannel into this queue. Later, when creating a connection (i.e., when calling configureNewConnections), it retrieves the SocketChannel from this queue and registers the new connection.

Queue 2: inflightResponses

Strictly speaking, this is a temporary Response queue. After the Processor thread returns the Response to the requester, it will put the Response into this temporary queue.

Why do we need this temporary queue? This is because some callback logic needs to be executed after the Response is sent back to the requester, so the Response needs to be temporarily stored in a queue. This is the purpose of inflightResponses.

Queue 3: responseQueue

From the name, we can tell that this is a Response queue, not a Request queue. This tells us a fact: each Processor thread maintains its own Response queue; it is not shared among threads or stored in the RequestChannel, as some online articles claim. The responseQueue stores all the Response objects that need to be returned to the requesters.

Now that we understand these queues, let’s delve into the working logic of the Processor thread. Based on the method invocations in the run method, let’s start by examining the configureNewConnections method.

configureNewConnections #

As I mentioned earlier, configureNewConnections is responsible for handling new connection requests. The main logic of this method is as follows:

private def configureNewConnections(): Unit = {
    var connectionsProcessed = 0 // Counter for the number of connections configured so far
    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) { // If the quota is not exceeded and there are pending new connections
      val channel = newConnections.poll() // Retrieve the SocketChannel from the connection queue
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        // Register the channel with the given Selector; Underneath, it is equivalent to calling SocketChannel.register(selector, SelectionKey.OP_READ)
        selector.register(connectionId(channel.socket), channel)
        connectionsProcessed += 1 // Update the counter
      } catch {
        case e: Throwable =>
          val remoteAddress = channel.socket.getRemoteSocketAddress
          close(listenerName, channel)
          processException(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
}

The most important logic of this method is the registration of the SocketChannel using selector.register. Each Processor thread maintains an instance of the Selector class. The Selector class is a community-provided interface based on Java NIO Selector, used for performing non-blocking multi-channel network I/O operations. In terms of core functionality, the Selector provided by Kafka is consistent with the one provided by Java.

processNewResponses #

This method is responsible for sending the Response back to the requesters and adding the Response to the temporary Response queue. The processing logic is as follows:

private def processNewResponses(): Unit = {
    var currentResponse: RequestChannel.Response = null
    while ({currentResponse = dequeueResponse(); currentResponse != null}) { // If there are pending Responses in the Response queue
      val channelId = currentResponse.request.context.connectionId // Get the connection channel ID
      try {
        currentResponse match {
          case response: NoOpResponse => // No need to send the Response
            updateRequestMetrics(response)
            trace(s"Socket server received empty response to send, registering for read: $response")
            handleChannelMuteEvent(channelId, ChannelMuteEvent.RESPONSE_SENT)
            tryUnmuteChannel(channelId)
          case response: SendResponse => // Send the Response and add it to inflightResponses
            sendResponse(response, response.responseSend)
          case response: CloseConnectionResponse => // Close the corresponding connection
            updateRequestMetrics(response)
            trace("Closing socket connection actively according to the response code.")
            close(channelId)
          case _: StartThrottlingResponse =>
            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_STARTED)
          case _: EndThrottlingResponse =>
            handleChannelMuteEvent(channelId, ChannelMuteEvent.THROTTLE_ENDED)
            tryUnmuteChannel(channelId)
          case _ =>
            throw new IllegalArgumentException(s"Unknown response type: ${currentResponse.getClass}")
        }
      } catch {
        case e: Throwable =>
          processChannelException(channelId, s"Exception while processing response for $channelId", e)
      }
    }
}

The key here is the sendResponse method in the SendResponse branch. The core code of this method consists of just three lines:

if (openOrClosingChannel(connectionId).isDefined) { // If the connection is in a connectable state
  selector.send(responseSend) // Send the Response
  inflightResponses += (connectionId -> response) // Add the Response to inflightResponses
}

poll #

Strictly speaking, all the logic mentioned above for sending messages is not actually performing the sending. The method that actually performs the I/O actions is the poll method.

The core code of the poll method is only one line: selector.poll(pollTimeout). At the lower level, it actually calls the select method of the Java NIO Selector to perform the I/O operations that are ready, whether it’s receiving a request or sending a response. Therefore, it is important to remember that the poll method is where the actual I/O operations are performed.

processCompletedReceives #

This method is responsible for receiving and processing requests. The code is as follows:

private def processCompletedReceives(): Unit = {
  // Iterate through all the received requests
  selector.completedReceives.asScala.foreach { receive =>
    try {
      // Make sure the corresponding connection channel has been established
      openOrClosingChannel(receive.source) match {
        case Some(channel) =>
          val header = RequestHeader.parse(receive.payload)
          if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
            trace(s"Begin re-authentication: $channel")
          else {
            val nowNanos = time.nanoseconds()
            // If the authentication session has expired, close the connection
            if (channel.serverAuthenticationSessionExpired(nowNanos)) {
              debug(s"Disconnecting expired channel: $channel : $header")
              close(channel.id)
              expiredConnectionsKilledCount.record(null, 1, 0)
            } else {
              val connectionId = receive.source
              val context = new RequestContext(header, connectionId, channel.socketAddress,
                channel.principal, listenerName, securityProtocol,
                channel.channelMetadataRegistry.clientInformation)
              val req = new RequestChannel.Request(processor = id, context = context,
                startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics)
              if (header.apiKey == ApiKeys.API_VERSIONS) {
                val apiVersionsRequest = req.body[ApiVersionsRequest]
                if (apiVersionsRequest.isValid) {
                  channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
                    apiVersionsRequest.data.clientSoftwareName,
                    apiVersionsRequest.data.clientSoftwareVersion))
                }
              }
              // Core code: add the request to the request queue
              requestChannel.sendRequest(req)
              selector.mute(connectionId)
              handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
            }
          }
        case None =>
          throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
      }
    } catch {
      case e: Throwable =>
        processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
    }
  }
}

The code may seem lengthy, but the most important line is requestChannel.sendRequest(req), which adds the request to the request queue. The other code is just some validation and auxiliary logic.

The meaning of this method is that the Processor continuously reads received network requests from the underlying socket channel, converts them into Request instances, and adds them to the request queue. The logic is quite simple, isn’t it?

processCompletedSends #

This method is responsible for handling the callback logic of responses. As mentioned before, the response needs to be sent before the corresponding callback logic can be executed. Here is the code that implements this functionality:

private def processCompletedSends(): Unit = {
  // Iterate through the responses that have been sent by the underlying SocketChannels
  selector.completedSends.asScala.foreach { send =>
    try {
      // Retrieve the response from the inflightResponses corresponding to the send
      val response = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      updateRequestMetrics(response) // Update some monitoring metrics
      // Execute the callback logic
      response.onComplete.foreach(onComplete => onComplete(send))
      handleChannelMuteEvent(send.destination, ChannelMuteEvent.RESPONSE_SENT)
      tryUnmuteChannel(send.destination)
    } catch {
      case e: Throwable => processChannelException(send.destination,
        s"Exception while processing completed send to ${send.destination}", e)
    }
  }
}

Here, the onComplete method of the Response object is called to execute the callback function.

processDisconnected #

As the name suggests, this method handles the already disconnected connections. The logic of this method is quite simple, and the main execution steps are annotated in the code:

private def processDisconnected(): Unit = {
  // Iterate through the disconnected connections of the underlying SocketChannels
  selector.disconnected.keySet.asScala.foreach { connectionId =>
    try {
      // Get the remote hostname information of the disconnected connection
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      // Remove the connection from inflightResponses and update some monitoring metrics
      inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
      // Update quota data
      connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
    } catch {
      case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
    }
  }
}

The crucial code is to get the disconnected connections from the underlying Selector, remove them from the inflightResponses, and update their quota data.

closeExcessConnections #

This is the last step executed by the run method of the Processor thread, which is to close excessive connections. The code is very simple:

private def closeExcessConnections(): Unit = {
    // If the quota is exceeded
    if (connectionQuotas.maxConnectionsExceeded(listenerName)) {
      // Find the least recently used connection to close
      val channel = selector.lowestPriorityChannel() 
      if (channel != null)
        close(channel.id) // Close the connection
    }
}

“Least recently used” means finding the connection that has been unused most recently among the multiple TCP connections. “Unused” means that no requests have been sent to the Processor thread through this connection in recent time.

Summary #

Today, I have introduced you to the overall view of the Kafka network communication layer, providing a brief introduction to the core component SocketServer, as well as spending considerable time studying the code of the Acceptor and Processor threads under SocketServer. Let’s summarize briefly.

  • The network communication layer is composed of the SocketServer component and the KafkaRequestHandlerPool component.
  • SocketServer implements the Reactor pattern for high-performance concurrent processing of I/O requests.
  • SocketServer uses Java’s Selector implementation of NIO communication at the lower level.

In the next class, I will focus on the design and corresponding code for SocketServer to handle different types of requests. This is a significant initiative by the community to improve the handling of control class requests by the Broker, as well as an effort to improve Broker consistency, which is worthy of our attention.

Post-Class Discussion #

Finally, please consider the following question: why is the Request queue designed to be shared among threads, while the Response queue is exclusive to each Processor thread?

Feel free to express your thoughts and ideas in the comments section, and have a discussion with me. You are also welcome to share today’s content with your friends.