14 Controller Election How It's Implemented

14 Controller Election How It’s Implemented #

Hello, I am Hu Xi.

In the last class, we learned the code for handling Controller events using the single-threaded event queue model. The Controller component constructs a blocking queue through the ControllerEventManager class and is accompanied by a dedicated event processing thread to handle various ControllerEvents.

This design approach ensures both thread safety required for multithreaded access and simplifies the code structure on the Controller side, greatly improving code maintainability.

Today, let’s learn about the source code for the Controller election part.

Do you remember the “recovery method” I mentioned in the [11th class] case study - deleting the /controller node in ZooKeeper? At that time, we relied on this “cheat code” to pass through danger, not only recovering the incorrect cluster state but also avoiding restarting the entire production environment.

But have you ever wondered why deleting the /controller node can bring the cluster metadata back in sync? Without understanding the underlying principle, we dare not take such action in the production environment. Today, we will learn the whole implementation logic behind this, focusing on how the Controller is elected.

I always believe that only by mastering this knowledge can we truly get started with the server-side code of Kafka. As one of the most important components on the Broker side, the Controller is irreplaceable in Kafka. The entire Kafka cluster has only one Controller, and in a sense, it is the only “single point” in the distributed system of Kafka.

Therefore, understanding the triggering scenarios for the election of this “single point” and how it is elected is very helpful for us to deeply understand the role of the Controller in the cluster. After all, some services provided by the Controller to the outside world also adopt similar implementation principles.

Overview #

ZooKeeper /controller node #

Let me emphasize again that in a Kafka cluster, only one broker can be elected as the Controller at a given time. Over time, different brokers may take turns serving as the Controller, but at any given moment, only one broker can be the Controller.

So which broker should be chosen as the Controller? Currently, the process of electing the Controller relies on ZooKeeper. In addition to playing the role of the “source of truth” for cluster metadata, ZooKeeper defines the /controller ephemeral node to assist in the Controller election process.

The following code snippet shows the /controller node in a two-broker Kafka cluster in ZooKeeper:

{"version":1,"brokerid":0,"timestamp":"1585098432431"}
cZxid = 0x1a
ctime = Wed Mar 25 09:07:12 CST 2020
mZxid = 0x1a
mtime = Wed Mar 25 09:07:12 CST 2020
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100002d3a1f0000
dataLength = 54
numChildren = 0

There are two key points you should pay attention to:

  • The Controller Broker Id is 0, indicating that the broker with index 0 is the cluster Controller.
  • The ephemeralOwner field is not 0x0, indicating that this is an ephemeral node.

Since it is an ephemeral node, once the session between the broker and ZooKeeper is terminated, the node will disappear. The Controller election relies on this feature. Each broker listens to the /controller node at all times, ready to apply for the Controller role. The following diagram shows the interaction between the broker and the /controller node:

Interaction between Broker and /controller node

As shown in the diagram, all brokers in the cluster are continuously monitoring this node on ZooKeeper in real-time. “Monitoring” has two meanings here:

  • Monitoring the existence of the node. If the node is found to be nonexistent, the broker immediately “registers” the node by creating the /controller node. The broker that successfully creates this node is elected as the new Controller.
  • Monitoring changes in the node data. Similarly, if changes are detected in the content of the node, the broker immediately starts a new round of Controller election.

Now that you understand these fundamentals, let’s read the specific source code file: KafkaController.scala. This is a large file consisting of 2200 lines. Let me first give you an overview of the structure of this file so that you don’t get lost in the details.

Source code structure #

The code structure of KafkaController file is illustrated in the following diagram:

Code structure of KafkaController

On the whole, this file consists of five main parts.

  • Election Trigger: This election refers to the election of leaders for topic partition replicas, i.e., selecting the leader replicas for each partition. We will cover this in detail when we study the replica manager and partition manager later.
  • KafkaController Object: The companion object of KafkaController, only defining some constants and callback function types.
  • ControllerEvent: Defines the types of Controller events. In the previous lesson, we learned in detail about Controller events and the event-based single-threaded event queue model. Although there is a lot of code in this part, they are actually repetitive. Once you understand the definition of one event, the others will be straightforward.
  • Various ZooKeeper listeners: Define ZooKeeper listeners to monitor changes in various nodes in ZooKeeper. Today, we will focus on the listener that monitors changes in the /controller node. This is also the core part for us to understand the Controller election process.
  • KafkaController Class: Defines the KafkaController class and its actual processing logic. This is the main object of our study today.

Next, I will give you a detailed introduction to the KafkaController class, ZooKeeper listeners, and the Controller election process. Among the numerous ZooKeeper listeners, I will provide detailed information about the listener that monitors changes in the Controller, which is also the core part for us to understand the Controller election process.

The KafkaController class #

This class contains approximately 1900 lines of code and defines a large number of variables and methods. Most of these methods handle different controller events. When we talk about the election process later, I will select some representative methods to introduce. I hope you can use this as a guide to understand the code of other methods. After all, they do similar things, and at least the code style is very similar.

Before studying the important methods, we first need to understand the definition of the KafkaController class. Next, we will study it from four dimensions: native fields, auxiliary fields, various ZooKeeper listener fields, and statistical fields.

After understanding the meanings of these fields, it will be easier to understand the methods that operate on these fields.

Native Fields #

First, let’s look at the native fields. The so-called native fields refer to the fields that need to be specified when creating a KafkaController instance.

Let’s first look at the definition code of the KafkaController class:

// Field Meanings:
// config: Kafka configuration information, through which you can obtain the values ​​of all parameters on the broker side
// zkClient: ZooKeeper client, all interactions between the Controller and ZooKeeper are completed through this property
// time: Utility class that provides time services (such as getting the current time)
// metrics: Utility class that implements metric monitoring services (such as creating monitoring metrics)
// initialBrokerInfo: Broker node information, including host name, port number, and the listener used
// initialBrokerEpoch: Broker Epoch value used to isolate requests sent by the old Controller
// tokenManager: Utility class that implements Delegation token management. Delegation token is a lightweight authentication mechanism
// threadNamePrefix: The prefix of the Controller-side event processing thread name
class KafkaController(val config: KafkaConfig,
                      zkClient: KafkaZkClient,
                      time: Time,
                      metrics: Metrics,
                      initialBrokerInfo: BrokerInfo,
                      initialBrokerEpoch: Long,
                      tokenManager: DelegationTokenManager,
                      threadNamePrefix: Option[String] = None)
  extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
  ......
}

As I mentioned in the previous lesson, KafkaController implements the ControllerEventProcessor interface, thereby implementing the process method for handling Controller events. There are three important fields in it.

  • config: An instance of the KafkaConfig class, which encapsulates the values ​​of all parameters on the broker side.
  • zkClient: A ZooKeeper client class that defines all methods for interacting with ZooKeeper.
  • initialBrokerEpoch: The Epoch value of the Broker where the Controller is located. Kafka uses it to ensure that the Broker does not process requests sent by the old Controller.

Other fields are either utility class fields like time and metrics, or limited-use fields like initialBrokerInfo and tokenManager which I won’t go into detail about.

Auxiliary Fields #

In addition to the native fields, KafkaController also defines many auxiliary fields to help implement various controller functions.

Let’s take a look at some important auxiliary fields:

......
// Cluster metadata class that stores all cluster metadata
val controllerContext = new ControllerContext
// Controller-side channel manager class responsible for sending requests from the Controller to the Broker
var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
  stateChangeLogger, threadNamePrefix)
// Thread scheduler that is currently the only one responsible for regularly executing leader re-election for partition rebalancing
private[controller] val kafkaScheduler = new KafkaScheduler(1)
// Controller event manager that is responsible for managing event processing threads
private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,
  controllerContext.stats.rateAndTimeMetrics)
......
// Replica state machine responsible for replica state transitions
val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
// Partition state machine responsible for partition state transitions
val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
  new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
// Topic deletion manager responsible for deleting topics and their logs
val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
  partitionStateMachine, new ControllerDeletionClient(this, zkClient))
......

Among them, there are 7 fields that are of great importance.

  • controllerContext: Cluster metadata class that stores all cluster metadata.
  • controllerChannelManager: Controller-side channel manager class responsible for sending requests from the Controller to the Broker.
  • kafkaScheduler: Thread scheduler that is currently the only one responsible for regularly executing leader re-election for partition rebalancing.
  • eventManager: Controller event manager, responsible for managing event handling threads.
  • replicaStateMachine: Replica state machine, responsible for replica state transitions.
  • partitionStateMachine: Partition state machine, responsible for partition state transitions.
  • topicDeletionManager: Topic deletion manager, responsible for deleting topics and logs.

Various ZooKeeper Listeners #

The ControllerChangeHandler we learned at the beginning today is just one of them. In fact, this class defines many listeners, as shown below:

// ZooKeeper listener for the Controller node
private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
// ZooKeeper listener for the number of brokers
private val brokerChangeHandler = new BrokerChangeHandler(eventManager)
// Collection of ZooKeeper listeners for broker information changes
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
// ZooKeeper listener for the number of topics
private val topicChangeHandler = new TopicChangeHandler(eventManager)
// ZooKeeper listener for topic deletion
private val topicDeletionHandler = new TopicDeletionHandler(eventManager)
// ZooKeeper listener for topic partition changes
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
// ZooKeeper listener for partition reassignment
private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)
// ZooKeeper listener for Preferred Leader election
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
// ZooKeeper listener for ISR replica set changes
private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
// ZooKeeper listener for log directory changes
private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)

Let me explain the functions of these ZooKeeper listeners individually:

  • controllerChangeHandler: As mentioned earlier, it listens for changes in the /controller node. These changes include node creation, deletion, and data changes.
  • brokerChangeHandler: Listens for changes in the number of brokers.
  • brokerModificationsHandlers: Listens for data changes in brokers, such as changes in broker configuration information.
  • topicChangeHandler: Monitors changes in the number of topics.
  • topicDeletionHandler: Listens for changes in the number of child nodes in the /admin/delete_topics node for topic deletion.
  • partitionModificationsHandlers: Monitors changes in partition data for topics, such as addition of replicas or leader replica changes.
  • partitionReassignmentHandler: Listens for partition replica reassignment tasks. Once a new task is detected, replica reassignment is performed for the target partition.
  • preferredReplicaElectionHandler: Listens for Preferred Leader election tasks. Once a new task is detected, Preferred Leader election is performed for the target topic.
  • isrChangeNotificationHandler: Listens for changes in the ISR replica set. Once triggered, the list of partitions with ISR changes needs to be obtained, and then the corresponding leader and ISR cache metadata on the Controller side are updated.
  • logDirEventNotificationHandler: Listens for log directory changes. Once triggered, the list of affected brokers needs to be obtained, and then the invalid log directories on these brokers are handled.

I have created a mind map to help you remember these ZooKeeper listeners more efficiently:

ZooKeeper Listeners Mind Map

Statistics Fields #

Finally, let’s look at the statistics fields.

These statistics fields are mostly used to calculate statistical indicators. Some monitoring indicators are even very important Controller monitoring items, such as the ActiveControllerCount indicator. Now, let’s see what statistics fields are defined in KafkaController. The meanings of these indicators are clear at a glance, and I have provided the meaning of each field in the form of comments:

// Current Broker Id where the Controller is located
@volatile private var activeControllerId = -1
// Number of offline partitions
@volatile private var offlinePartitionCount = 0
// Number of partitions that satisfy the Preferred Leader election conditions
@volatile private var preferredReplicaImbalanceCount = 0
// Total number of topics
@volatile private var globalTopicCount = 0
// Total number of topic partitions
@volatile private var globalPartitionCount = 0
// Number of topics to be deleted
@volatile private var topicsToDeleteCount = 0
// Number of replicas to be deleted
@volatile private var replicasToDeleteCount = 0
// Number of topics that cannot be deleted temporarily
@volatile private var ineligibleTopicsToDeleteCount = 0
// Number of replicas that cannot be deleted temporarily
@volatile private var ineligibleReplicasToDeleteCount = 0

Alright, we have finished introducing the KafkaController class. Once again, I strongly recommend that you familiarize yourself with the meanings of these fields because all the methods that follow operate around these fields.

Next, I will use the Controller election process as an example to introduce the implementation of some methods in KafkaController. However, before that, we need to learn about the ControllerChangeHandler, the ZooKeeper listener for monitoring Controller changes.

ControllerChangeHandler Listener #

As I mentioned earlier, KafkaController defines over a dozen ZooKeeper listeners. The listener related to the Controller is ControllerChangeHandler, which is used to monitor changes in the Controller. The code is defined as follows:

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
  // The path of the Controller node in ZooKeeper, i.e., /controller
  override val path: String = ControllerZNode.path
  // Listen for the creation of the /controller node
  override def handleCreation(): Unit = eventManager.put(ControllerChange)
  // Listen for the deletion of the /controller node
  override def handleDeletion(): Unit = eventManager.put(Reelect)
  // Listen for data changes in the /controller node
  override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

This listener receives an instance of ControllerEventManager and implements three methods from the ZNodeChangeHandler interface: handleCreation, handleDeletion, and handleDataChange. The path variable under this listener is actually the string “/controller”, indicating that it is monitoring this node in ZooKeeper.

All three handle methods are used to monitor changes in the /controller node, but with slight differences in implementation details.

For handleCreation and handleDataChange, the handling method is to add a ControllerChange event to the event queue. For handleDeletion, the handling method is to add a Reelect event to the event queue.

Deletion indicates that the /controller node in ZooKeeper no longer exists, which means that the Controller in the Kafka cluster is temporarily vacant. Because it is a different state from Creation and DataChange, it needs to be treated differently. Therefore, the Reelect event involves more than the ControllerChange event: handling the ControllerChange event only requires the current Broker to execute the logic of “resigning” as the Controller, while the Reelect event requires the Broker to participate in the re-election in addition to executing the “resign” logic.

Since the process method in KafkaController is very long, I have excerpted the processing code for those two events mentioned earlier:

// partial process method
override def process(event: ControllerEvent): Unit = {
    try {
      event match {
       ......
       // ControllerChange event
       case ControllerChange =>
          processControllerChange()
       // Reelect event
       case Reelect =>
          processReelect()
        ......
      }
    }
    ......
}
// If it is a ControllerChange event, only the logic of resigning is executed
private def processControllerChange(): Unit = {
    maybeResign()
  }
// If it is a Reelect event, the resign logic is executed, and the elect method is also executed to participate in a new round of election
private def processReelect(): Unit = {
    maybeResign()
    elect()
}

As you can see, although the code is very long, the overall structure is neat and clear, all relying on pattern matching for event handling. The process method calls the corresponding process*** method to handle the event based on the given Controller event type. I have only listed the two types of events that the /controller node listener in ZooKeeper listens to, along with their corresponding handling methods.

For the ControllerChange event, the handling is to call maybeResign to execute the logic of resigning as the Controller. If it is a Reelect event, in addition to executing the resign logic, the elect method is also executed to participate in a new round of Controller election.

Controller Election Process #

After discussing the ControllerChangeHandler source code, let’s take a look at the Controller election process. The Controller election in Kafka refers to the selection of a Broker in the cluster to perform the Controller role. The entire election process consists of two steps: triggering the election and starting the election.

Triggering the Election #

First, let me use a diagram to illustrate the three scenarios that can trigger a Controller election.

These three scenarios are:

  1. When the cluster is started from scratch.
  2. When a Broker detects the disappearance of the controller node.
  3. When a Broker detects a change in the data of the controller node.

Although these scenarios are different, they all lead to the action of electing a Controller in the end. Let me explain each of these scenarios and then introduce the specific steps for electing the Controller.

Scenario 1: Cluster Start from Scratch #

When the cluster is started for the first time, a Controller has not been elected yet. So, when a Broker starts, it first writes the Startup ControllerEvent into the event queue, then starts the corresponding event handling thread and the ControllerChangeHandler ZooKeeper listener. Finally, it relies on the event handling thread to elect the Controller.

In the source code, the startup method of the KafkaController class does these things. When a Broker starts, it calls this method to start the ControllerEventThread. It’s important to note that every Broker needs to do these things, not just the Broker where the Controller resides.

Here is the main code of the startup method:

def startup() = {
  // Step 1: Register ZooKeeper state change handler, which is used to listen for ZooKeeper session expiration
  zkClient.registerStateChangeHandler(new StateChangeHandler {
    override val name: String = StateChangeHandlers.ControllerHandler
    override def afterInitializingSession(): Unit = {
      eventManager.put(RegisterBrokerAndReelect)
    }
    override def beforeInitializingSession(): Unit = {
      val queuedEvent = eventManager.clearAndPut(Expire)
      queuedEvent.awaitProcessing()
    }
  })
  // Step 2: Write the Startup event to the event queue
  eventManager.put(Startup)
  // Step 3: Start the ControllerEventThread to process ControllerEvents in the event queue
  eventManager.start()
}

First, the startup method registers a ZooKeeper state change handler to listen for the expiration of the session between the Broker and ZooKeeper. Then, it writes the Startup event to the event queue and starts the ControllerEventThread to process the Startup event in the event queue.

Next, let’s learn about the process method of the KafkaController, which handles the Startup event:

override def process(event: ControllerEvent): Unit = {
  try {
    event match {
     ......
     case Startup =>
        processStartup() // handle the Startup event
    }
  }
  ......
}
  
private def processStartup(): Unit = {
   // Register the ControllerChangeHandler ZooKeeper listener
   zkClient.registerZNodeChangeHandlerAndCheckExistence(
    controllerChangeHandler)
   // Execute the Controller election
   elect()
}

From this code, we can see that the process method calls the processStartup method to handle the Startup event. The processStartup method then calls the registerZNodeChangeHandlerAndCheckExistence method of the zkClient to register the ControllerChangeHandler listener.

It’s worth noting that although the three scenarios mentioned earlier are parallel, in practice, the last two scenarios must wait for the successful execution of this step in the first scenario before they can be triggered.

Since Controller election is required for all three scenarios, let’s learn about the implementation of the elect method.

In summary, when the cluster starts, the Broker triggers the election of the Controller by “inserting” the Startup event into the event queue.

Scenario 2: Disappearance of the /controller Node #

When a Broker detects the disappearance of the /controller node, it means that there is no Controller in the entire cluster. Therefore, all Brokers that detect the disappearance of the /controller node will immediately call the elect method to execute the election logic.

You may wonder, “How does the Broker detect this change in the /controller node on ZooKeeper?” In fact, this is a feature provided by the ZooKeeper listener. In other words, this is implemented by Apache ZooKeeper itself, which is why we say that Kafka relies on ZooKeeper to complete the Controller election.

Speaking of this, let me digress a bit. The community is currently discussing removing the dependency on ZooKeeper completely. Specifically, for the Controller, the change will be to implement a consensus algorithm similar to Raft within Kafka to elect the Controller. I will provide more details on the community’s plan to remove ZooKeeper in a special follow-up.

Scenario 3: Change in the Data of the /controller Node #

When a Broker detects a change in the data of the /controller node, it usually means that there has been a change of Controller. There are two possibilities in this case:

  • If the Broker was previously the Controller, it needs to first resign and then attempt to compete for the Controller role.
  • If the Broker was not previously the Controller, it can directly compete for the new Controller role.

In terms of code, the maybeResign method vividly demonstrates these two possibilities. You should pay attention to the word “maybe” in the method, which indicates that the Broker may need to perform the resignation operation or may not need to. Kafka source code likes to use maybe*** as method names to indicate logic that needs to be executed under specific conditions. Here is the implementation of maybeResign:

private def maybeResign(): Unit = {
  // A very important step! This is an important criterion for determining whether the resignation logic needs to be executed!
  // Check if the Broker was the Controller before
  val wasActiveBeforeChange = isActive
  // Register ControllerChangeHandler listener
  zkClient.registerZNodeChangeHandlerAndCheckExistence(
    controllerChangeHandler)
  // Get the Broker Id where the current Controller is located, if there is no Controller, return -1
  activeControllerId = zkClient.getControllerId.getOrElse(-1)
  // If the Broker was previously the Controller but not anymore
  if (wasActiveBeforeChange && !isActive) {
    onControllerResignation() // Execute the resignation logic
  }
}
}

The first line of the code is very crucial as it determines whether to execute the important basis for resignation. After all, if the Broker was not a Controller before, why would there be talk of “resignation”? After that, the code registers the ControllerChangeHandler listener and retrieves the Broker ID where the current cluster Controller is located. If there is no Controller, it returns -1. With this data, the maybeResign method needs to determine if the Broker was previously a Controller but is not anymore. If this is the case, it calls the onControllerResignation method to execute the Controller resignation logic.

Speaking of “resignation”, you may ask, “Which method is responsible for executing the resignation logic?” In fact, this is done by the onControllerResignation method. It is mainly used to clear the values of various data structures, unregister ZooKeeper listeners, close various state machines and managers, and so on. I will provide the logic implementation of the method in the form of comments:

private def onControllerResignation(): Unit = {
  debug("Resigning")
  // Unregister ZooKeeper listeners
  zkClient.unregisterZNodeChildChangeHandler(
    isrChangeNotificationHandler.path)
  zkClient.unregisterZNodeChangeHandler(
    partitionReassignmentHandler.path)
  zkClient.unregisterZNodeChangeHandler(
    preferredReplicaElectionHandler.path)
  zkClient.unregisterZNodeChildChangeHandler(
    logDirEventNotificationHandler.path)
  unregisterBrokerModificationsHandler(
    brokerModificationsHandlers.keySet)
  // Shutdown the Kafka thread scheduler, which is essentially canceling regular leader re-election
  kafkaScheduler.shutdown()
  // Clear all statistical fields
  offlinePartitionCount = 0
  preferredReplicaImbalanceCount = 0
  globalTopicCount = 0
  globalPartitionCount = 0
  topicsToDeleteCount = 0
  replicasToDeleteCount = 0
  ineligibleTopicsToDeleteCount = 0
  ineligibleReplicasToDeleteCount = 0
  // Shutdown the token expiration check scheduler
  if (tokenCleanScheduler.isStarted)
    tokenCleanScheduler.shutdown()
  // Unregister partition reassignment listener
  unregisterPartitionReassignmentIsrChangeHandlers()
  // Shutdown the partition state machine
  partitionStateMachine.shutdown()
  // Unregister topic change listener
  zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
  // Unregister partition change listener
  unregisterPartitionModificationsHandlers(
    partitionModificationsHandlers.keys.toSeq)
  // Unregister topic deletion listener
  zkClient.unregisterZNodeChildChangeHandler(
    topicDeletionHandler.path)
  // Shutdown the replica state machine
  replicaStateMachine.shutdown()
  // Unregister broker change listener
  zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
  // Shutdown the Controller channel manager
  controllerChannelManager.shutdown()
  // Reset the cluster metadata
  controllerContext.resetContext()
  info("Resigned")
}

Electing a Controller #

After discussing the triggering scenarios, we will now learn about the source code for electing a Controller. As mentioned earlier, the elect method is called to execute the election logic for all three election scenarios. Let’s take a look at its implementation:

private def elect(): Unit = {
  // Step 1: Get the serial number of the Broker where the current Controller is located, and if the Controller does not exist, explicitly mark it as -1
  activeControllerId = zkClient.getControllerId.getOrElse(-1)

  // Step 2: If the current Controller has already been elected, return directly
  if (activeControllerId != -1) {
    debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
    return
  }

  try {
    // Step 3: Register Controller-related information
    // Mainly creating the /controller node
    val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
    controllerContext.epoch = epoch
    controllerContext.epochZkVersion = epochZkVersion
    activeControllerId = config.brokerId

    info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
      s"and epoch zk version is now ${controllerContext.epochZkVersion}")

    // Step 4: Execute the subsequent logic after being elected as Controller
    onControllerFailover()
  } catch {
    case e: ControllerMovedException =>
      maybeResign()

      if (activeControllerId != -1)
        debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
      else
        warn("A controller has been elected but just resigned, this will result in another round of election", e)

    case t: Throwable =>
      error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
        s"Trigger controller movement immediately", t)
      triggerControllerMove()
  }
}

To help you better understand this method, I will provide another diagram to explain:

This method first checks if the Controller has already been elected. It’s important to note that all Brokers in the cluster execute this logic, so it is very possible that some Brokers may find that the Controller is already elected when executing the elect method. If the Controller has been elected, then there is nothing else to do. On the other hand, if the Controller has not been elected, the code will attempt to create the /controller node to contend for the Controller.

Once successfully registered, the onControllerFailover method is called to execute the actions after the election is successful. These actions include registering various ZooKeeper listeners, deleting log path change and ISR replica change notification events, starting the Controller channel manager, and starting the replica state machine and partition state machine.

If the registration fails, the code throws a ControllerMovedException. This typically indicates that the Controller has been occupied by another Broker in advance, so the code calls the maybeResign method to execute the resignation logic.

Summary #

Today, we have summarized the entire process of Controller election, including how the Controller listens to Controller nodes with the help of the ZooKeeper listener, and the triggering scenarios and complete process of Controller election. Let’s review the key points of this lesson.

  • The Controller relies on ZooKeeper to implement Controller election, mainly by using the /controller ephemeral node and the ZooKeeper listener mechanism.
  • There are three triggering scenarios for the Controller: when the cluster starts; when the /controller node is deleted; when the data of the /controller node changes.
  • The source code ultimately calls the elect method to implement Controller election.

Controller Election

In the next lesson, I will guide you through other important functions of the Controller, including how it manages Brokers and replicas, and so on. Don’t miss it.

Post-class discussion #

At the beginning of this class, I mentioned that deleting /controller would trigger a Controller election and then synchronize the metadata information of the cluster. So, do you know where the source code updates the metadata request?

Feel free to share your thoughts and discuss with me in the comments section. You are also welcome to share today’s content with your friends.