13 Controller Event Manager How the Controller Handles Events After Transforming Into a Single Thread

13 ControllerEventManager How the Controller Handles Events After Transforming into a Single Thread #

Hello, I’m Hu Xi. Today, let’s learn about the source code of the single-threaded event handler for the controller.

The so-called single-threaded event handler is a component defined on the controller side. This component contains a dedicated thread that is responsible for handling controller events sent by other threads. In addition, it defines some management methods to deliver events to the dedicated thread for processing.

Before version 0.11.0.0, the source code of the controller component was very complex. The cluster metadata information was accessed by multiple threads at the same time, so there were a large number of Monitor locks, Lock locks, or other thread safety mechanisms in the source code. This made the code difficult to understand and difficult to modify because you didn’t know if modifying the data accessed by this thread would affect other threads. At the same time, developers struggled to fix controller bugs.

In view of this reason, since version 0.11.0.0, the community has gradually redesigned the controller code structure. One of the most important changes is to transform the multi-threaded concurrent access method into a single-threaded event queue method.

Here, the single-threaded nature does not mean that the controller only has one thread, but rather that access to local states is limited to a dedicated thread, which allows this specific thread to exclusively operate on the controller’s metadata information.

As a result, the entire component code no longer needs to worry about various thread safety issues caused by multi-threaded access, and the source code can abandon unnecessary lock mechanisms, ultimately greatly simplifying the code structure on the controller side.

This part of the source code is very important, it can help you understand the principles behind how the controller handles various events, which will greatly enhance your ability to handle various controller problems in practical scenarios. Therefore, I recommend you to read it multiple times to thoroughly understand how the controller handles various events.

Basic Terminology and Concepts #

Next, let’s take a macroscopic overview of the Controller’s single-threaded event queue processing model and its basic components.

From the diagram, we can see that there are multiple threads on the Controller side writing events of different types to the event queue, such as the Watcher thread registered on the ZooKeeper side, the KafkaRequestHandler thread, the Kafka scheduled task thread, and so on. On the other end of the event queue, there is only one thread named ControllerEventThread that is responsible for “consuming” or processing the events in the queue. This is called the single-threaded event queue model.

There are four source code classes involved in implementing this model.

  • ControllerEventProcessor: The event processor interface on the Controller side.
  • ControllerEvent: The Controller event, i.e., the object being processed in the event queue.
  • ControllerEventManager: The event processor, used to create and manage the ControllerEventThread.
  • ControllerEventThread: The dedicated event processing thread whose sole purpose is to handle different types of ControllerEvents. This class is an internally defined thread class in the ControllerEventManager class.

Today, our important goal is to understand these four classes. As I mentioned earlier, they together fully construct the single-threaded event queue model. Below, we will study their source code one by one, and you should focus on understanding the implementation of the event queue and how the dedicated thread accesses the event queue.

ControllerEventProcessor #

This interface is located in the ControllerEventManager.scala file in the controller package. It defines an interface for supporting normal and preemptive processing of Controller events, as shown in the code below:

trait ControllerEventProcessor {
  def process(event: ControllerEvent): Unit
  def preempt(event: ControllerEvent): Unit
}

This interface defines two methods:

  • process: Accepts a Controller event and processes it.
  • preempt: Accepts a Controller event and preemptively processes it before other events in the queue.

Currently, in the Kafka source code, the KafkaController class is the implementation class for the Controller component and it is also the only implementation class for the ControllerEventProcessor interface.

For this interface, you should focus on understanding the role of the process method because it is the main method for handling Controller event processing. You should understand the code structure for handling different types of Controller events in the process method, and be able to accurately locate the sub-methods that handle each type of event.

As for the preempt method, you only need to understand that Kafka uses it to implement preemptive processing for certain high-priority events. Currently, only two types of events (ShutdownEventThread and Expire) require preemptive processing, and their occurrence rate is not very high.

ControllerEvent #

This is the Controller event mentioned earlier, which corresponds to the ControllerEvent interface in the source code. This interface is defined in the KafkaController.scala file and is essentially a trait type, as shown below:

sealed trait ControllerEvent {
  def state: ControllerState
}

Each ControllerEvent defines a state. When handling specific events, the Controller will make corresponding state changes. This state is defined by the abstract class ControllerState in the source code file ControllerState.scala, as shown below:

sealed abstract class ControllerState {
  def value: Byte
  def rateAndTimeMetricName: Option[String] =
    if (hasRateAndTimeMetric) Some(s"${toString}RateAndTimeMs") else None
  protected def hasRateAndTimeMetric: Boolean = true
}

Each ControllerState class defines a value to represent the order of the Controller state, starting from 0. In addition, the rateAndTimeMetricName method is used to construct the monitoring metric name for the Controller state rate.

For example, TopicChange is a type of ControllerState that represents a change in the total number of topics. In order to monitor the rate of this state change, the rateAndTimeMetricName method in the code will define a metric named TopicChangeRateAndTimeMs. However, not all ControllerStates have corresponding rate monitoring metrics. For example, the Idle state, which represents an idle state, does not have a corresponding metric.

Currently, the Controller defines a total of 25 types of events and 17 states. Their corresponding relationships are shown in the following table:

The content may seem overwhelming, so how should we use this table?

In practice, you don’t need to remember the corresponding relationships for each row. This table is more like a tool that you can use when you detect abnormal rates of Controller state changes. By referring to this table, you can quickly identify the Controller events that may be causing bottlenecks and locate the function code responsible for handling that event. This can help you further debug the issue.

Also, it’s important to note that multiple ControllerEvents may belong to the same ControllerState.

For example, both the TopicChange and PartitionModifications events belong to the TopicChange state, as they are both related to changes in topics. The former is for creating a topic, while the latter is for modifying properties of a topic, such as the number of partitions or replication factor.

Similarly, both the BrokerChange and BrokerModifications events belong to the BrokerChange state, representing modifications to the properties of a broker.

ControllerEventManager #

With these preparations, we can now start learning the implementation code for the event handler.

In Kafka, the Controller event handler code is located in the ControllerEventManager.scala file under the controller package. I will use a diagram to show the structure of this file:

ControllerEventManager Structure

As shown in the diagram, this file mainly consists of 4 parts.

  • ControllerEventManager Object: It stores some string constants, such as thread names.
  • ControllerEventProcessor: As mentioned before, it is the event handler interface, and currently only KafkaController implements this interface.
  • QueuedEvent: Represents an event object on the event queue.
  • ControllerEventManager Class: The companion class of ControllerEventManager, mainly used for creating and managing event processing threads and the event queue. As I mentioned before, this class defines the important ControllerEventThread thread class and some other important methods worth learning. We will discuss them in detail in a moment.

The ControllerEventManager object only defines 3 public variables and does not have any logic. You can simply take a look at it. As for the ControllerEventProcessor interface, we have just learned about it. Next, we will focus on the last two classes.

QueuedEvent #

Let’s start with the definition of QueuedEvent. The complete code is as follows:

// Each QueuedEvent has two fields.
// event: Represents a ControllerEvent, which is a Controller event.
// enqueueTimeMs: Represents the timestamp when the Controller event is put into the event queue.
class QueuedEvent(val event: ControllerEvent,
                  val enqueueTimeMs: Long) {
  // Indicates whether the event has started processing.
  val processingStarted = new CountDownLatch(1)
  // Indicates whether the event has been processed.
  val spent = new AtomicBoolean(false)
  // Process the event.
  def process(processor: ControllerEventProcessor): Unit = {
    if (spent.getAndSet(true))
      return
    processingStarted.countDown()
    processor.process(event)
  }
  // Preemptively process the event.
  def preempt(processor: ControllerEventProcessor): Unit = {
    if (spent.getAndSet(true))
      return
    processor.preempt(event)
  }
  // Block and wait for the event to be processed.
  def awaitProcessing(): Unit = {
    processingStarted.await()
  }
  override def toString: String = {
    s"QueuedEvent(event=$event, enqueueTimeMs=$enqueueTimeMs)"
  }
}

As you can see, each QueuedEvent instance encapsulates a ControllerEvent. In addition, each QueuedEvent defines the process, preempt, and awaitProcessing methods, which represent processing the event, preemptively processing the event, and waiting for the event to be processed, respectively.

In the implementation of the process and preempt methods, the given ControllerEventProcessor interface is used to call the process and preempt methods, which is very straightforward.

In the QueuedEvent object, we once again see the CountDownLatch as I mentioned in [Lesson 7]. Kafka’s source code often uses CountDownLatch to control various conditions, such as detecting whether a thread has been successfully started or closed.

Here, QueuedEvent uses it for the sole purpose of ensuring that the expire event is processed before establishing a ZooKeeper session.

If it is not in this scenario, the code uses spent to indicate whether the event has been processed. If it has been processed, calling the process method again will return directly without doing anything.

ControllerEventThread #

After understanding QueuedEvent, let’s take a look at the ControllerEventThread class that consumes them.

First, here is the definition of this class:

class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
  logIdent = s"[ControllerEventThread controllerId=$controllerId] "
  ...
}

This class is just an ordinary thread class that extends the ShutdownableThread base class, which is the common parent class defined by Kafka for many thread classes. The latter is a subclass of the Java Thread class, and its main code for the run method, the thread’s logical method, is as follows:

def doWork(): Unit
override def run(): Unit = {
  ...
  try {
    while (isRunning)
      doWork()
  } catch {
    ...
  }
  ...
}

As you can see, this base class repeatedly executes the logic of the doWork method in a loop, and the implementation of this method is left to the subclasses.

As the Controller’s only event processing thread, we need to constantly monitor the running status of this thread. Therefore, we must know the name of this thread on the JVM so that we can monitor it specifically later. The name of this thread is defined by the ControllerEventThreadName variable in the ControllerEventManager object, as shown below:

object ControllerEventManager {
  val ControllerEventThreadName = "controller-event-thread"
  ...
}

Now let’s take a look at how the doWork method of the ControllerEventThread class is implemented. The code is as follows:

override def doWork(): Unit = {
  // Get the pending Controller event from the event queue, or wait if none.
  val dequeued = queue.take()
  dequeued.event match {
    // If it is a shutdown thread event, do nothing. The thread shutdown is performed externally.
case ShutdownEventThread =>
case controllerEvent =>
  _state = controllerEvent.state
  // Update the time the corresponding event is saved in the queue
  eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)
  try {
    def process(): Unit = dequeued.process(processor)
    // Process the event and calculate the processing rate
    rateAndTimeMetrics.get(state) match {
      case Some(timer) => timer.time { process() }
      case None => process()
    }
  } catch {
    case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)
  }
  _state = ControllerState.Idle
}

The execution flow is summarized in the diagram below:

In general, the logic is simple.

First, the thread uses the take method of LinkedBlockingQueue to obtain a QueuedEvent object to be processed. Note that the use of the take method indicates that if there is no QueuedEvent in the event queue, the ControllerEventThread thread will be blocked until a new event is inserted into the event queue.

Once a QueuedEvent is obtained, the thread checks if it is a ShutdownEventThread event. When the ControllerEventManager is being shut down, a ShutdownEventThread event is explicitly added to the event queue to indicate that the ControllerEventThread thread should be closed. If it is a ShutdownEventThread event, the ControllerEventThread does nothing, as it is going to be closed. On the other hand, if it is any other event, it calls the process method of QueuedEvent to execute the corresponding processing logic and calculates the rate at which the event is being processed.

The process method in turn calls the process method of ControllerEventProcessor. Below is a part of the code which demonstrates this:

def process(processor: ControllerEventProcessor): Unit = {
  // If already processed, return directly
  if (spent.getAndSet(true))
    return
  processingStarted.countDown()
  // Call the process method of ControllerEventProcessor to process the event
  processor.process(event)
}

The method first checks if the event has already been processed. If it has, it returns directly. Otherwise, it calls the process method of ControllerEventProcessor to process the event.

You may be wondering, where are the process methods implemented for each ControllerEventProcessor? In fact, they are all encapsulated in the KafkaController.scala file. Remember I mentioned earlier that the KafkaController class is the only implementation of the ControllerEventProcessor interface in the current source code?

In reality, the process method is implemented by the KafkaController class. Due to the length and repetitive structure of the code, I will only show a part of it:

override def process(event: ControllerEvent): Unit = {
  try {
    // Match ControllerEvent events one by one
    event match {
      case event: MockEvent =>
        event.process()
      case ShutdownEventThread =>
        error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread")
      case AutoPreferredReplicaLeaderElection =>
        processAutoPreferredReplicaLeaderElection()
      ......
    }
  } catch {
    // If the Controller has moved to another Broker
    case e: ControllerMovedException =>
      info(s"Controller moved to another broker when processing $event.", e)
      // Execute the Controller resignation logic
      maybeResign()
    case e: Throwable =>
      error(s"Error processing event $event", e)
  } finally {
    updateMetrics()
  }
}

The process method takes a ControllerEvent instance and then determines which type of Controller event it is before calling the corresponding processing method. For example, if it is an AutoPreferredReplicaLeaderElection event, it calls the processAutoPreferredReplicaLeaderElection method. For other types of events, it calls process*** methods.

Other Methods #

In addition to QueuedEvent and ControllerEventThread, the put and clearAndPut methods are also important as they are responsible for producing elements into the event queue.

In these methods, put inserts a specified ControllerEvent into the event queue, while clearAndPut first processes the preemptive event with higher priority, then clears the entire event queue, and finally inserts the specified event.

The following code snippets correspond to these two methods:

// put method
def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
  // Create a QueuedEvent instance
  val queuedEvent = new QueuedEvent(event, time.milliseconds())
  // Insert it into the event queue
  queue.put(queuedEvent)
  // Return the newly created QueuedEvent instance
  queuedEvent
}
// clearAndPut method
def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {
  // Process the preemptive event first
  queue.forEach(_.preempt(processor))
  // Clear the event queue
  queue.clear()
  // Call the put method above to insert the given event into the event queue
  put(event)
}

The overall code is simple, and there are not many points that need explanation. However, I would like to discuss one question with you.

Have you noticed that the put method is protected by the putLock in the source code?

Personally, I don’t think this putLock is necessary because the LinkedBlockingQueue data structure itself is already thread-safe. The put method only deals with the shared global variable queue, so its thread safety can be delegated to the implementation of LinkedBlockingQueue. Moreover, LinkedBlockingQueue already maintains a putLock and a takeLock internally to protect read and write operations.

Of course, I agree with using the lock in the clearAndPut method as we need to ensure that accessing preemptive events and the clearing operation form an atomic operation.

Summary #

Today, we focused on the implementation of the single-threaded event queue on the Controller side. This is achieved by ControllerEventManager constructing ControllerEvent, ControllerState, and the corresponding ControllerEventThread, and combining them with a dedicated event queue to handle events. Let’s review the key points of this lesson.

  • ControllerEvent: Defines various event names that the Controller can handle. Currently, a total of 25 types of events are defined.
  • ControllerState: Defines the state of the Controller. You can think of it as the higher-level category of ControllerEvent, so ControllerEvent and ControllerState have a many-to-one relationship.
  • ControllerEventManager: The event manager defined by the Controller, dedicated to defining and maintaining the dedicated thread and the corresponding event queue.
  • ControllerEventThread: The event processing thread created by the event manager. This thread exclusively reads the event queue and processes all the events in the queue.

In the next lesson, we will formally begin the study of KafkaController. This is a large file with over 2100 lines, but most of the code implements the logic for handling the 27 types of ControllerEvent. Therefore, don’t be intimidated by it. We will first learn how the Controller is elected and then discuss in detail the specific role of the Controller.

Post-class Discussion #

Do you think it is necessary to protect the code of the put method in ControllerEventManager with a Lock?

Feel free to express your thoughts in the comments and discuss with me. You are also welcome to share today’s content with your friends.