08 in Socket Server Requests Also Need to Distinguish Priority Levels

08 In SocketServer Requests also Need to Distinguish Priority Levels #

Hello, I am Hu Xi.

In the previous lesson, I provided you with a detailed introduction to the architecture of the Kafka network layer, as well as how the Acceptor and Processor threads in the SocketServer component utilize Java NIO for network communication. I also briefly mentioned the request queue and response queue.

Today, we will continue discussing the SocketServer source code and focus on how the community prioritizes different types of requests.

Case study #

In Kafka, processing requests is not prioritized, as Kafka treats all requests equally. This absolute fair strategy is sometimes problematic. Let me share a real case study with you, and you will understand. I guarantee that you have encountered similar issues in a real production system.

Once, we created a topic with a single partition and two replicas in the production environment. At that time, Broker A in the cluster hosted the leader replica of the partition, while Broker B hosted the follower replica. One day, there was a sudden surge in external business volume, which caused Broker A to be overwhelmed with a large number of pending PRODUCE requests. To make matters worse, the operations personnel “unfortunately” executed a Preferred Leader election, explicitly switching Broker B to become the leader.

Now, here’s the problem: if the producer application sets the acks to all, then those pending PRODUCE requests, which were backlogged before the LeaderAndIsr request (responsible for adjusting replica roles, such as follower and leader role transitions, etc.), would not be able to complete properly. These requests need to wait for all follower replicas in the in-sync replicas (ISR) to synchronize.

However, at this point, Broker B becomes the leader and the replicas on it stop pulling messages. This may result in a situation where these unfinished PRODUCE requests remain in the Purgatory cache on Broker A. The role transition between leader and follower prevents replica synchronization, so these requests cannot be successfully processed. In the end, Broker A throws a timeout exception and returns it to the producer application.

It is worth mentioning that the Purgatory cache is where the broker temporarily stores delayed requests. I will explain this component in detail later in the course.

This problem arises from the lack of prioritizing requests. Later, we confirmed this in the SocketServer source code. At the same time, combining the knowledge obtained from reading the source code, I found the corresponding Jira ticket on the official Jira website, which provided a complete understanding of how the community resolved this issue.

In fact, this is also a method I highly recommend for you to delve into learning Kafka: find the corresponding source code for the issues you encounter in the actual environment, read it carefully, form your own solution, and then verify the merits and drawbacks of your approach in the community. While continuously iterating this process, you will find that you become more and more familiar with Kafka’s code, and you will be able to easily solve various problems in the production environment.

After saying so much, I believe you are eager to read this part of the source code with me. So, let’s get started.

Essential Terms and Concepts #

Before reading the SocketServer code and delving into the implementation of request priority mechanism, we need to first grasp some basic concepts, which serve as the foundation for understanding the subsequent content.

1. Data Plane and Control Plane

The Kafka community classifies Kafka request types into two categories: data requests and control requests. Data plane and control plane correspond to data requests and control requests, respectively. In other words, the data plane is responsible for handling data requests, while the control plane is responsible for handling control requests.

Currently, there are three types of requests for interactions between the Controller and Broker: LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest. These three types of requests belong to control requests and are typically assigned high priority. PRODUCE and FETCH requests, which we are familiar with, are typical examples of data requests.

Distinguishing between these two categories of requests is the core logic implemented in the SocketServer source code.

2. Listener

Currently, the main approach used in the source code to differentiate the handling of data requests and control requests is through listeners. In other words, multiple sets of listeners are created to execute the processing code for data requests and control requests separately.

In Kafka, the Broker-side parameters listeners and advertised.listeners are used to configure listeners. In the source code, listeners are defined using the EndPoint class, as shown in the code snippet below:

case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
  // Constructs the complete listener connection string
  // The format is: listenerName://hostname:port
  // For example: PLAINTEXT://kafka-host:9092
  def connectionString: String = {
    val hostport =
      if (host == null)
        ":"+port
      else
        Utils.formatAddress(host, port)
    listenerName.value + "://" + hostport
  }
  // There is a Java version of the Endpoint class available in the clients project for use in client-side code.
  // This method constructs an instance of the Java version of the Endpoint class.
  def toJava: JEndpoint = {
    new JEndpoint(listenerName.value, securityProtocol, host, port)
  }
}

Each EndPoint object defines four attributes, which we will go through one by one:

  • host: Broker hostname.
  • port: Broker port number.
  • listenerName: Listener name. Currently defined predefined names include PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL. Kafka allows you to customize other listener names, such as CONTROLLER and INTERNAL.
  • securityProtocol: Security protocol used by the listener. Kafka supports four security protocols: PLAINTEXT, SSL, SASL_PLAINTEXT, and SASL_SSL.

Here, I briefly mention that the Broker-side parameter listener.security.protocol.map is used to specify the security protocol used by listeners with different names.

Let me give you an example. If the corresponding Broker-side parameters are configured as follows:

listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL
listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093

This means that Kafka has configured three sets of listeners with names CONTROLLER, INTERNAL, and EXTERNAL, which use the security protocols PLAINTEXT, PLAINTEXT, and SSL, respectively.

With this basic knowledge, let’s now see how the SocketServer separates the data plane and control plane.

Of course, before that, we need to understand the definition of SocketServer.

Definition of SocketServer #

First, let’s take a look at the basic properties of the SocketServer class. I will use a mind map to show you the fields or properties related to request priority implementation:

What do these fields mean? Let’s examine the code together.

class SocketServer(val config: KafkaConfig, 
  val metrics: Metrics,
  val time: Time,  
  val credentialProvider: CredentialProvider) 
  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
  // SocketServer implements the BrokerReconfigurable trait, which means that some of its parameter configurations can be dynamically modified without broker restarting
  // The length of the request queue in SocketServer is determined by the broker-side parameter "queued.max.requests" and the default value is 500
  private val maxQueuedRequests = config.queuedMaxRequests
  ......
  // data-plane
  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() // Processor thread pool for handling data-class requests
  // Acceptor thread pool for handling data-class requests, each listener corresponds to an Acceptor thread
  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
  // RequestChannel exclusively for handling data-class requests
  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
  // control-plane
  // Processor thread for handling control-class requests
  // Note: Currently, a dedicated Processor thread is defined for handling control-class requests instead of a thread pool
  private var controlPlaneProcessorOpt : Option[Processor] = None
  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
  // RequestChannel exclusively for handling control-class requests
  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix))
  ......
}

First, the SocketServer class defines a field maxQueuedRequests, which specifies the maximum length of the request queue. The default value is determined by the broker-side parameter queued.max.requests.

Secondly, in the above code, you must have noticed that SocketServer implements the BrokerReconfigurable interface (in Scala, it is a trait). This indicates that some configurations in SocketServer can be dynamically modified. If you check the definition of the companion object class for SocketServer, you will find the following code:

object SocketServer {
  ......
  val ReconfigurableConfigs = Set(
    KafkaConfig.MaxConnectionsPerIpProp,
    KafkaConfig.MaxConnectionsPerIpOverridesProp,
    KafkaConfig.MaxConnectionsProp)
  ......
}

Based on this code, we can know that the broker-side parameters max.connections.per.ip, max.connections.per.ip.overrides, and max.connections can be dynamically modified.

Additionally, in the code of SocketServer definition that we looked at just now, there are a group of variables defined under the comments “data-plane” and “control-plane”, which are instances of Processor thread pool, Acceptor thread pool, and RequestChannel.

  • Processor thread pool: This is the network thread pool mentioned in the previous lesson, responsible for rapidly putting requests into the request queue.
  • Acceptor thread pool: It contains the Acceptor threads defined by SocketServer for each listener. Each Acceptor thread is responsible for dispatching inbound connection establishment requests on that listener.
  • RequestChannel: It carries the request queue for request processing.

Strictly speaking, for the data plane, it is correct to say that there is a thread pool because there are indeed multiple Processor threads and there may be multiple Acceptors because SocketServer creates a corresponding Acceptor thread for each EndPoint (i.e., each listener).

But for the control plane, the situation is different.

You may have noticed that the variables in the control plane section all end with Opt, which means they are of the Option type. This indicates an important fact: you can completely not use the control plane suite, that is, you can make Kafka not differentiate request types, just like it was designed before 2.2.0.

However, once you enable the control plane setting, there will be only one Processor thread and one Acceptor thread. Moreover, you should note that the length of the corresponding request queue in its RequestChannel is hardcoded as 20, instead of a configurable value. This reveals an assumption made by the community here: the number of control-class requests should be much smaller than data-class requests, so there is no need to create a thread pool and a deeper request queue for them.

Creating resources for the Data plane #

After understanding the definition of the SocketServer class, let’s learn about how SocketServer creates the necessary resources for the Data plane and the Control plane. Let’s first look at how resources are created for the Data plane.

The method createDataPlaneAcceptorsAndProcessors of SocketServer is responsible for creating the necessary resources for the Data plane. Let’s take a look at its implementation:

private def createDataPlaneAcceptorsAndProcessors(
  dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = {
  // Iterate through the endpoints
  endpoints.foreach { endpoint =>
    // Add the listener to the connection quotas
    connectionQuotas.addListener(config, endpoint.listenerName)
    // Create an Acceptor thread for the listener
    val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
    // Create multiple Processor threads for the listener. The specific number is determined by num.network.threads
    addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
    // Save the mapping of <listener, Acceptor thread> for unified management
    dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
    info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
  }
}

The logic of this code is very clear. Let me explain it with a diagram:

The createDataPlaneAcceptorsAndProcessors method iterates through all the listeners configured, and then executes the following logic for each listener:

  1. Initializes the maximum connection counter for the listener. These counters will be used later to ensure that there are no quota violations.
  2. Creates an Acceptor thread for the listener, which means calling the constructor of the Acceptor class to generate an instance of the Acceptor thread.
  3. Creates a Processor thread pool. For the Data plane, the number of thread pools is determined by the broker-side parameter num.network.threads.
  4. Adds the <listener, Acceptor thread> mapping to the Acceptor thread pool for unified management.

Please note that the source code performs these four steps for each listener configured.

For example, if you configure listeners as PLAINTEXT://localhost:9092, SSL://localhost:9093, by default, the source code will create one Acceptor thread and one Processor thread pool for both the PLAINTEXT and SSL listeners.

It is important to note that the specific listeners for which the resources are created are based on the configuration. Most importantly, Kafka only creates these resources for the listeners used in the Data plane. I will provide more detailed information on how to specify whether a listener is intended for the Data plane or the Control plane.

Creating resources for Control plane #

As mentioned earlier, assuming that the load on control plane due to control requests is much smaller than the load on data plane due to data requests, the resources required for the control plane are only 1 Acceptor thread + 1 Processor thread + a request queue with a depth of 20. Compared to the data plane, these configurations are relatively modest, but in most cases, they should be sufficient.

The SocketServer provides the createControlPlaneAcceptorAndProcessor method to create the necessary resources for the control plane. Here is the source code:

private def createControlPlaneAcceptorAndProcessor(
  endpointOpt: Option[EndPoint]): Unit = {
  // If a listener is configured for the control plane
  endpointOpt.foreach { endpoint =>
    // Add the listener to the connection quotas management
    connectionQuotas.addListener(config, endpoint.listenerName)
    // Create the Acceptor thread for the listener
    val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix)
    // Create the Processor thread for the listener
    val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
    controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
    controlPlaneProcessorOpt = Some(controlPlaneProcessor)
    val listenerProcessors = new ArrayBuffer[Processor]()
    listenerProcessors += controlPlaneProcessor
    // Add the Processor thread to the RequestChannel dedicated to control requests
    // i.e., add it to the Processor thread pool saved in the RequestChannel instance
    controlPlaneRequestChannelOpt.foreach(
      _.addProcessor(controlPlaneProcessor))
    nextProcessorId += 1
    // Add the Processor object to the Processor thread pool managed by the Acceptor thread
    controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix)
    info(s"Created control-plane acceptor and processor for endpoint: ${endpoint.listenerName}")
  }
}

I also use a flowchart to illustrate:

The overall process is very similar to createDataPlaneAcceptorsAndProcessors, except that at the beginning of the method, it checks if a listener is configured for the control plane. Currently, Kafka stipulates that only one set of listeners can be used for the control plane, unlike the data plane, which can have multiple sets of listeners.

If you look closely, you will notice that the flowcharts above do not mention starting the Acceptor and Processor threads. So when are these threads started?

Actually, the Processor and Acceptor threads are started after the SocketServer component is started. The specific code is in the startup method of the KafkaServer.scala file, as shown below:

// KafkaServer.scala
def startup(): Unit = {
    try {
      info("starting")
      ......
      // Create the SocketServer component
      socketServer = new SocketServer(config, metrics, time, credentialProvider)
      // Start the SocketServer, but do not start the Processor threads
      socketServer.startup(startProcessingRequests = false)
      ......
      // Start all the threads for the data plane and control plane
      socketServer.startProcessingRequests(authorizerFutures)
      ......
    } catch {
      ......
    }
}

Huh? I still didn’t see the code to start the Acceptor and Processor threads. In fact, the startProcessingRequests method in SocketServer is the method that starts these threads. Let’s take a look at the logic of this method:

def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = {
  info("Starting socket server acceptors and processors")
  this.synchronized {
    if (!startedProcessingRequests) {
      // Start the Processor and Acceptor threads for handling control requests
      startControlPlaneProcessorAndAcceptor(authorizerFutures)
      // Start the Processor and Acceptor threads for handling data requests
      startDataPlaneProcessorsAndAcceptors(authorizerFutures)
      startedProcessingRequests = true
    } else {
      info("Socket server acceptors and processors already started")
    }
  }
  info("Started socket server acceptors and processors")
}

If you can’t see the startProcessingRequests method in your environment, there’s no need to panic. It was just added on April 16th this year. You just need to use the git command to fetch the latest Trunk branch code to see this method.

This method further calls startDataPlaneProcessorsAndAcceptors and startControlPlaneProcessorAndAcceptor methods to start the threads for the Data plane and Control plane, respectively. Since these two methods have similar logic, let’s focus on the implementation of the startDataPlaneProcessorsAndAcceptors method.

private def startDataPlaneProcessorsAndAcceptors(
  authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
  // Get the listener used for inter-broker communication, which is PLAINTEXT by default
  val interBrokerListener = dataPlaneAcceptors.asScala.keySet
    .find(_.listenerName == config.interBrokerListenerName)
    .getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
  val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
    dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
  orderedAcceptors.foreach { acceptor =>
    val endpoint = acceptor.endPoint
    // Start the Processor and Acceptor threads
    startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
  }
}

The main logic of this method is to start the Acceptor and Processor threads by calling the startAcceptorAndProcessors method. Before that, the code needs to get the listener used for inter-broker communication and find the Acceptor thread and its associated Processor thread pool for that listener.

Now, let’s talk about where the listener for the Control plane is configured. The control.plane.listener.name Broker-side parameter is used to set the listener for the Control plane.

By default, this parameter is empty (null). Null means that Kafka does not enable the request priority differentiation mechanism. But if you set this parameter, Kafka will use it to find the corresponding listener in the listeners property.

Let me give you an example to explain. Suppose the Broker-side configuration is as follows:

listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL

listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093

control.plane.listener.name=CONTROLLER

In this case, the listener with the name CONTROLLER will be used for the Control plane. In other words, the listeners with the names INTERNAL and EXTERNAL will be used for the Data plane. In the code, how does Kafka know that the CONTROLLER listener is for the Control plane? Simply put, this is done through three methods in the KafkaConfig class. The KafkaConfig class encapsulates all the Broker-side parameter information and defines many useful utility methods.

Now, let me explain the calling relationship and the main logic of the three methods in the code using a diagram.

Diagram

The diagram involves three methods, and the calling relationship is from bottom to top, meaning that the method at the bottom calls the one in the middle, and the one in the middle calls the one at the top. Now, I will explain how the code finds the listener for the Control plane.

First, the code needs to get the value of the control.plane.listener.name Broker-side parameter. In this example, the value is the string “CONTROLLER”.

After that, the code reads the value of the listener.security.protocol.map Broker-side parameter and finds the security protocol corresponding to CONTROLLER. In this example, the security protocol corresponding to CONTROLLER is PLAINTEXT. The controlPlaneListenerName method is used to get this value, i.e., the tuple <listener name, security protocol>.

Finally, the controlPlaneListener method takes this tuple and uses the listener name CONTROLLER to find the corresponding listener in the listeners Broker-side parameter. In this case, the listener is CONTROLLER://192.1.1.8:9091. This is how the complete logic of finding the Control plane listener is implemented.

You can open the KafkaConfig.scala file to find the implementation code for these three methods. Here, we will focus on the implementation of the getControlPlaneListenerNameAndSecurityProtocol method:

private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
  // Get the value of the control.plane.listener.name Broker-side parameter
  // to determine if the control plane listener is enabled
  Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
    // If it's enabled
    case Some(name) => 
      val listenerName = ListenerName.normalised(name)
      // You must also set the listener.security.protocol.map Broker-side parameter
      // and extract the security protocol corresponding to this listener from that parameter value
      val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
        throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
          s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
      // Return the <listener name, security protocol> tuple
      Some(listenerName, securityProtocol)
    // If this parameter is not set, directly return None, indicating that the control plane listener is not enabled
    case None => None  
 }
}

The core of this code is the line getString. Kafka will extract the value of the ControlPlaneListenerNameProp parameter, which corresponds to the control.plane.listener.name parameter.

After getting this parameter value, the controlPlaneListener method will record this value and pass it to the createControlPlaneAcceptorAndProcessor method of SocketServer. This way, the SocketServer knows whether you have set an exclusive listener for the Control plane.

Now, I have explained the content of the Data plane, Control plane, and request priority. Strictly speaking, Kafka does not assign numeric priority to requests, so we cannot directly sort all requests based on priority. So far, Kafka has only implemented coarse-grained priority handling by categorizing requests into data requests and control requests, without defining mutually comparable priorities for these two categories. How do we associate all the things we just discussed with the request priority?

From what we have just learned, the community has defined multiple sets of listeners and underlying processing thread mechanisms to differentiate these two major categories of requests. Although it is difficult to directly compare the priorities of these two categories of requests, in practice, since the number of data requests is much greater than the number of control requests, separating the processing resources for control requests effectively prioritizes their processing. From this perspective, this approach indirectly implements differentiated treatment based on priority.

Summary #

Alright, let’s summarize what we have learned. Today, we focused on the methods for implementing different types of request priorities in a community. Combining the concept of listeners, we delved into the source code of SocketServer and analyzed the implementation principles of the Data plane and Control plane. Let’s review the key points of this lesson.

  • Data plane: Responsible for handling data requests, which usually do not require high-priority processing.
  • Control plane: Responsible for handling control requests, which require high-priority processing.
  • Listeners: Kafka allows brokers to define multiple sets of listeners, each of which can be used for the Data plane or Control plane.
  • Priority implementation principle: What you need to know is that the community has designed two sets of resources to handle Data plane and Control plane requests, respectively.

In the next lesson, I will guide you through connecting all the components of the network I/O layer and, with the help of the source code, deepen your understanding of how a request is processed in Kafka. Stay tuned.

Post-discussion #

Finally, let’s think about a question: if we don’t use a multi-resource approach and instead make improvements at the level of the request queue, do you think we can meet the requirements of different priorities for different requests? For example, could we transform the request queue into a preemptive priority queue solution? Can you discuss the advantages and disadvantages of these two approaches?

Feel free to leave your comments in the message area and discuss with me. You are also welcome to share today’s content with your friends.