12 Controller Channel Manager How the Controller Manages Request Sending

12 ControllerChannelManager How the Controller Manages Request Sending #

Hello, I’m Hu Xi. In the previous lesson, we delved into the source code file ControllerContext.scala and understood the important metadata defined in the Kafka cluster. Today, we will learn how the Controller sends requests to other Brokers.

By understanding the implementation details of this part, you will have a better understanding of how the Controller interacts with the cluster Brokers to manage cluster metadata. Moreover, reading this part of the source code can also help you locate and solve issues in production. Let me share a real-life case with you first.

Back in the time of Kafka 0.10.0.1, we suddenly found that many metadata changes were not being synchronized across all Brokers in the production environment. Specifically, some Brokers were still unaware of the creation of certain topics.

My initial suspicion was that there was a problem with the Controller, but I was unsure of how to investigate and verify it. Later, I thought, could it be due to an excessive backlog of requests in the request queue of the Controller? This was highly likely because the Broker hosting the Controller itself was under heavy load due to its own business responsibilities.

After examining the relevant code, we added a monitoring metric in the corresponding source code to track the length of the Controller’s request queue in real-time. When this was deployed to the production environment, we easily identified the problem. Indeed, the high load on the Broker hosting the Controller resulted in a backlog of requests, causing a delay in updating the metadata. Once we accurately identified the problem, solving it became straightforward. Later on, the community officially introduced related monitoring metrics in version 0.11.

You see, by reading the source code, besides learning from well-written code by excellent developers, we can also customize solutions based on our own circumstances to achieve some out-of-the-box functionality.

Controller Request Types #

Now, let’s officially enter the study of the Controller request sending management part. You might ask, “Does the Controller also send requests to the Broker?” Of course! The Controller will send network requests to all Brokers in the cluster, including the Broker it resides in. The purpose of sending requests is to instruct the Broker to perform the corresponding actions. I will use a diagram to show which requests the Controller will send:

Currently, the Controller only sends three types of requests to the Broker, which are LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest. Note that I said “currently”! I mean that currently there are only these three types, but it doesn’t mean that there won’t be any changes in the future. In fact, I can almost guarantee that the types of RPC protocols that can be sent in the future will definitely change. Therefore, you need to understand the principles of request sending. After all, all request sending is done through the same mechanism.

Do you remember the control class requests I mentioned in Lesson 8? That’s right, these three types of requests are typical control class requests. Let me explain their functions.

  • LeaderAndIsrRequest: The main function is to inform the Broker which Broker the Leader replica of each topic partition is located on and which replicas are in the ISR. In my opinion, it should be given the highest priority, because it has the ability to make data class requests invalid. Just imagine, if the Leader replica changes in this request, won’t all the PRODUCE requests sent to the old Leader be invalid? Therefore, I consider it a very important control class request.
  • StopReplicaRequest: Informs the specified Broker to stop its replica objects, and this request can even delete the underlying log data of the replica. The main use cases for this request are partition replica migration and topic deletion. In both of these cases, it is necessary to stop the replicas on the Broker.
  • UpdateMetadataRequest: As the name suggests, this request updates the metadata cache on the Broker. All metadata changes in the cluster first happen on the Controller side and are then broadcast to all Brokers on the cluster through this request. In the case I just shared, it is because this request was not processed in a timely manner that the cluster Brokers couldn’t get the latest metadata information.

Now, the community is increasingly inclined to move important data structure source code from the server-side core project to the client-side clients project. The definitions of these three types of request Java classes are encapsulated in the clients project, and their abstract base class is the AbstractControlRequest class, which defines the common fields of these three types of requests.

Let me show you the definitions of these three types of requests and their abstract parent class in code, so that you can have a basic understanding of the types of requests sent by the Controller. These classes are located in the path clients/src/main/java/org/apache/kafka/common/requests.

Let’s start with the main code of the AbstractControlRequest class:

public abstract class AbstractControlRequest extends AbstractRequest {
    public static final long UNKNOWN_BROKER_EPOCH = -1L;
    public static abstract class Builder<T extends AbstractRequest> extends AbstractRequest.Builder<T> {
        protected final int controllerId;
        protected final int controllerEpoch;
        protected final long brokerEpoch;
        ......
}

In contrast to other data class requests, abstract class requests must include three fields.

  • controllerId: The ID of the Broker where the Controller is located.
  • controllerEpoch: The version information of the Controller.
  • brokerEpoch: The Epoch of the target Broker.

The latter two Epoch fields are used to isolate the Zombie Controller and Zombie Broker to ensure the consistency of the cluster.

In the same source code path, you can find the definitions of LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest, as shown below:

public class LeaderAndIsrRequest extends AbstractControlRequest { ...... }
public class StopReplicaRequest extends AbstractControlRequest { ...... }
public class UpdateMetadataRequest extends AbstractControlRequest { ...... }

RequestSendThread #

After discussing what requests the Controller sends, let’s talk about how they are sent.

The Kafka source code is fond of the producer-consumer pattern. This pattern offers the advantage of decoupling the logic of producers and consumers, separating their centralized interaction. Now that you have learned about the “Request Handling” module, you must agree with this statement. Do you remember the SocketServer component on the Broker side? It defines an internally shared request queue: its Processor threads act as producers, while the KafkaRequestHandler threads act as consumers.

Similarly, the Controller uses this pattern as well. It is also a thread-safe blocking queue. The Controller event handling thread (which we will discuss in lesson 13) is responsible for writing requests to this queue, while a thread named RequestSendThread is responsible for actually sending the requests. The diagram below illustrates this:

The Controller creates a corresponding RequestSendThread for each Broker in the cluster. This thread on the Broker continuously retrieves requests waiting to be sent from the blocking queue.

So what data does the Controller put on the blocking queue? It is actually defined by the QueueItem class in the source code. Here is the code:

case class QueueItem(apiKey: ApiKeys, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest], callback: AbstractResponse => Unit, enqueueTimeMs: Long)

The core field of each QueueItem is the AbstractControlRequest.Builder object. You can basically consider it as an AbstractControlRequest type on the blocking queue.

Note the “<:” symbol, which represents an upper bound in Scala. It means that the request field must be a subclass of AbstractControlRequest, referring to the three types of requests mentioned earlier.

In other words, each QueueItem actually stores one of the three types of requests. If we use a BlockingQueue object to store these QueueItems, the code implements a request blocking queue. This is what the RequestSendThread class does.

Now let’s learn about the definition of the RequestSendThread class. I will add comments to some of the main fields.

class RequestSendThread(val controllerId: Int, // Controller's Broker Id
    val controllerContext: ControllerContext, // Controller metadata information
    val queue: BlockingQueue[QueueItem], // Request blocking queue
    val networkClient: NetworkClient, // Network I/O class used for sending
    val brokerNode: Node, // Target Broker node
    val config: KafkaConfig, // Kafka configuration information
    val time: Time, 
    val requestRateAndQueueTimeMetrics: Timer,
    val stateChangeLogger: StateChangeLogger,
    name: String) extends ShutdownableThread(name = name) {
    ......
}

In fact, the most important part of RequestSendThread is its doWork method, which is the method that executes the thread logic:

override def doWork(): Unit = {
    def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
    val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take() // Remove a request from the blocking queue in a blocking way
    requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS) // Update statistics
    var clientResponse: ClientResponse = null
    try {
      var isSendSuccessful = false
      while (isRunning && !isSendSuccessful) {
        try {
          // If there is no TCP connection with the target Broker, or the connection is temporarily unavailable
          if (!brokerReady()) {
            isSendSuccessful = false
            backoff() // Wait for retry
          }
          else {
            val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
              time.milliseconds(), true)
            // Send the request and wait for the response
            clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
            isSendSuccessful = true
          }
        } catch {
          case e: Throwable =>
            warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " +
              s"to broker $brokerNode. Reconnecting to broker.", e)
            // If an exception occurs, close the connection with the corresponding Broker
            networkClient.close(brokerNode.idString)
            isSendSuccessful = false
            backoff()
        }
      }
      // If a response is received
      if (clientResponse != null) {
        val requestHeader = clientResponse.requestHeader
        val api = requestHeader.apiKey
        // The request type of this response must be one of LeaderAndIsrRequest, StopReplicaRequest or UpdateMetadataRequest
        if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA)
          throw new KafkaException(s"Unexpected apiKey received: $apiKey")
        val response = clientResponse.responseBody
        stateChangeLogger.withControllerEpoch(controllerContext.epoch)
          .trace(s"Received response " +
          s"${response.toString(requestHeader.apiVersion)} for request $api with correlation id " +
          s"${requestHeader.correlationId} sent to broker $brokerNode")

        if (callback != null) {
          callback(response) // Handle the callback
        }
      }
    } catch {
      case e: Throwable =>
        error(s"Controller $controllerId fails to send a request to broker $brokerNode", e)
        networkClient.close(brokerNode.idString)
    }
  }

I will use a diagram to explain the execution logic of doWork:

On the whole, the logic of doWork is straightforward. Its main purpose is to retrieve pending requests from the blocking queue, send them, and then wait for the response to return. The thread will be in a blocking state while waiting for the response. After receiving the response and executing the callback logic, the thread can retrieve the next pending request from the blocking queue.

It is worth noting that the RequestSendThread thread handles the request sending in a different way from the Broker. The sendAndReceive method it calls enters a blocking state after sending the request, waiting for the response to return. Only after receiving the response and executing the callback logic, the thread can retrieve the next pending request from the blocking queue for processing.

ControllerChannelManager #

After understanding the source code of the RequestSendThread thread, let’s study the ControllerChannelManager class.

This class has a cooperative relationship with RequestSendThread. In my opinion, it has two main tasks.

  • Manage the connection between the Controller and the cluster Brokers, and create a RequestSendThread instance for each Broker.
  • Put the requests to be sent into the blocking queue of the specified Broker, and wait for the dedicated RequestSendThread of that Broker to process them.

It can be seen that they are closely connected.

The most important data structure in the ControllerChannelManager class is brokerStateInfo, which is defined in the following line of code:

protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]

This is a HashMap with Integer types as keys, representing the IDs of brokers in the cluster, and ControllerBrokerStateInfo as values.

You may not be familiar with the ControllerBrokerStateInfo class, so let me explain it first. It is essentially a POJO class, serving as a container for several data structures, as shown below:

case class ControllerBrokerStateInfo(networkClient: NetworkClient,
    brokerNode: Node,
    messageQueue: BlockingQueue[QueueItem],
    requestSendThread: RequestSendThread,
    queueSizeGauge: Gauge[Int],
    requestRateAndTimeMetrics: Timer,
    reconfigurableChannelBuilder: Option[Reconfigurable])

It has three key fields.

  • brokerNode: The target Broker node object, which encapsulates the connection information of the target Broker, such as the hostname and port number.
  • messageQueue: The blocking queue for request messages. You can see that the Controller creates a message queue for each target Broker.
  • requestSendThread: The Controller uses this thread to send requests to the target Broker.

Remember these three fields, as they are key factors in implementing the Controller’s request sending.

Why is that so? Let’s think about it. If the Controller wants to send a request to a Broker, it needs to address three questions: Who to send it to? What to send? How to send? The “Who to send it to” is determined by brokerNode; the messageQueue contains the request to be sent, thus resolving the “What to send” question; finally, the “How to send” relies on the requestSendThread variable.

Now, let’s go back to ControllerChannelManager. It defines five public methods, which I will introduce one by one.

  • startup: When the Controller component starts up, it calls the startup method of ControllerChannelManager. This method finds the list of brokers in the metadata information and, one by one, calls addBroker for them to add them to the brokerStateInfo variable. Finally, it starts the RequestSendThread threads in brokerStateInfo.
  • shutdown: Closes all RequestSendThread threads and clears necessary resources.
  • sendRequest: As the name suggests, it sends a request by submitting the request object to the request queue.
  • addBroker: Adds the target broker to the brokerStateInfo data structure and creates necessary resources such as request queues and RequestSendThread objects accordingly. Finally, the RequestSendThread starts the thread.
  • removeBroker: Removes the data related to the target broker from brokerStateInfo.

Most of the methods here have simple logic, evident from their names. I will focus on addBroker and the underlying private methods addNewBroker and startRequestSendThread.

After all, addBroker is the most important logic. Whenever a new broker is added to the cluster, the Controller calls this method to add a new RequestSendThread for the new broker.

Let’s first take a look at addBroker:

def addBroker(broker: Broker): Unit = {
    brokerLock synchronized {
        // If the broker is a new broker
        if (!brokerStateInfo.contains(broker.id)) {
            // Add the new broker to Controller management and create the corresponding RequestSendThread
            addNewBroker(broker) 
            // Start the RequestSendThread
            startRequestSendThread(broker.id)
        }
    }
}

The entire code block is protected by brokerLock. Do you remember the definition of brokerStateInfo? It is simply a HashMap object, and because it is not thread-safe, any access to this variable needs to be protected by a lock.

The logic of this code block is as follows: it checks whether the ID of the target broker is already saved in brokerStateInfo. If it is, it means that this broker has already been added, so there is no need to add it again. Otherwise, addBroker performs two operations for the current broker:

  1. Adds the broker node to brokerStateInfo.
  2. Starts the RequestSendThread corresponding to the broker.

These two steps are implemented by the addNewBroker and startRequestSendThread methods, respectively.

The logic of the addNewBroker method is quite complex. I will indicate the main steps with comments:

private def addNewBroker(broker: Broker): Unit = {
    // Construct a request blocking queue for the broker
}

Here the key logic is to construct a request blocking queue for the broker. The remaining implementation details are not directly relevant to the translation, so I will omit them. // Get information about the target broker node to be connected val brokerNode = broker.node(controllerToBrokerListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] “) val (networkClient, reconfigurableChannelBuilder) = { val channelBuilder = ChannelBuilders.clientChannelBuilder( controllerToBrokerSecurityProtocol, JaasContext.Type.SERVER, config, controllerToBrokerListenerName, config.saslMechanismInterBrokerProtocol, time, config.saslInterBrokerHandshakeRequestEnable, logContext ) val reconfigurableChannelBuilder = channelBuilder match { case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable) Some(reconfigurable) case _ => None } // Create NIO Selector instance for network data transmission val selector = new Selector( NetworkReceive.UNLIMITED, Selector.NO_IDLE_TIMEOUT_MS, metrics, time, “controller-channel”, Map(“broker-id” -> brokerNode.idString).asJava, false, channelBuilder, logContext ) // Create NetworkClient instance // The NetworkClient class is the top-level network client API encapsulated in the Kafka clients project // It provides rich methods for network-layer IO data transmission val networkClient = new NetworkClient( selector, new ManualMetadataUpdater(Seq(brokerNode).asJava), config.brokerId.toString, 1, 0, 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, config.requestTimeoutMs, ClientDnsLookup.DEFAULT, time, false, new ApiVersions, logContext ) (networkClient, reconfigurableChannelBuilder) } // Set the thread name for this RequestSendThread val threadName = threadNamePrefix match { case None => s"Controller-${config.brokerId}-to-broker-${broker.id}-send-thread” case Some(name) => s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread" } // Create request processing rate monitoring metrics val requestRateAndQueueTimeMetrics = newTimer( RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTags(broker.id) ) // Create RequestSendThread instance val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient, brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName) requestThread.setDaemon(false)

  val queueSizeGauge = newGauge(QueueSizeMetricName, () => messageQueue.size, brokerMetricTags(broker.id))
  // Create a ControllerBrokerStateInfo instance exclusively for this broker
  // and add it to the brokerStateInfo for unified management
  brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
    requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))
}

To help you understand, I also created a flowchart to illustrate its execution process:

The key point of addNewBroker is to create a series of matching resources for the target broker, such as NetworkClient for network I/O operations, messageQueue for the blocking queue, requestThread for sending requests, etc.

As for the startRequestSendThread method, it is much simpler, with just a few lines of code.

protected def startRequestSendThread(brokerId: Int): Unit = {
  // Get the dedicated RequestSendThread instance for the specified broker
  val requestThread = brokerStateInfo(brokerId).requestSendThread
  if (requestThread.getState == Thread.State.NEW)
    // Start the thread
    requestThread.start()
}

First, it finds the corresponding ControllerBrokerStateInfo object from brokerStateInfo based on the given broker ID. Having this object means having all the matching resources to serve that target broker. The next step is to retrieve the RequestSendThread object from ControllerBrokerStateInfo and start it.

Summary #

Today, I focused on analyzing the implementation principles of the request sending mechanism from the Controller to the Broker based on the ControllerChannelManager.scala file.

The Controller mainly sends requests to the Broker through the ControllerChannelManager class. The RequestSendThread defined in the ControllerChannelManager class is the main thread implementation class responsible for actually sending requests to the cluster’s Brokers. In addition to the RequestSendThread, the ControllerChannelManager also defines corresponding management methods, such as adding Brokers and removing Brokers. Through these management methods, the Controller can quickly respond to cluster scaling changes and complete the creation and destruction of corresponding Broker connections.

Let’s review the key points of this lesson.

  • Controller-side requests: The Controller sends three types of requests to the Broker, namely LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest.
  • RequestSendThread: This thread is responsible for sending requests to the relevant or all Brokers in the cluster.
  • Request blocking queue + RequestSendThread: The Controller will create corresponding request blocking queues and RequestSendThread threads for all Brokers in the cluster.

In fact, everything we talked about today is only the “consumer” part in the second diagram of this lesson. We haven’t delved into how requests are placed in the request queue. Next, we will explore how the Controller’s single-threaded event processor is implemented in depth for this issue.

Post-class discussion #

What are the advantages and disadvantages of creating a RequestSendThread for each Broker?

Please feel free to write down your thoughts and answers in the comments section, and let’s discuss and exchange ideas. You are also welcome to share today’s content with your friends.