11 Controller Metadata What Kind of Data Does the Controller Store and Its Various States

11 Controller Metadata What Kind of Data Does the Controller Store and Its Various States #

Hello, I’m Hu Xi. Starting today, we officially begin our study of the third major module: the Controller module.

When it comes to the Controller component in Kafka, I believe you are not unfamiliar with it. In a sense, it is the most core component of Kafka. On one hand, it is responsible for electing leader replicas for all topic partitions in the cluster. On the other hand, it carries all the metadata of the cluster and is responsible for synchronizing this metadata to other Brokers. Since we are studying the Kafka source code, we definitely cannot miss such a heavyweight component.

I have drawn a picture to help you establish an overall understanding of this module. Today, let’s start by learning about the Controller Metadata.

Case Study #

Before formally studying the source code, I would like to share a real case with you.

In our company’s Kafka cluster environment, we once encountered a rather “strange” issue: certain core business topic partitions were always in an “unavailable” state.

By using the “kafka-topics” command to query, we found that the Leader of these partitions was displayed as -1. Previously, the Broker machine where these Leaders were located crashed due to high load, and when the Broker restarted, the Controller failed to successfully elect Leaders for these partitions. As a result, they remained in an “unavailable” state.

Since it was a production environment, our top priority was to immediately recover the damaged partitions and then investigate the cause of the problem. Someone proposed restarting all the Broker machines where the previous Leaders of these partitions were located - this was an easy idea to come up with, as the “restart method” has always been effective. However, this time it had no effect at all.

Later, someone suggested upgrading the restart method by restarting all the Brokers in the cluster - this was not acceptable at the time. Apart from many business processes still running, simply restarting the Kafka cluster itself was a very unplanned thing to do. After all, how could we casually restart a production environment?!

Later, I suddenly thought of the code in the Controller component that re-elects the Controller. Once the Controller is elected, it updates the cluster metadata to all the Brokers, which means it will “refresh” the state of these partitions.

So the question is, how can we kill the existing Controller and perform a new Controller election without restarting the cluster? The answer lies in the ControllerZNode.path in the source code, which is the /controller node in ZooKeeper. If we manually delete the /controller node, the Kafka cluster will trigger a Controller election. Therefore, we immediately implemented this solution, and the results were surprisingly good: all the previously damaged partitions were restored to normal, and business data could be produced and consumed normally.

Of course, the purpose of sharing this case with you is not to make you remember that you can casually delete the /controller node - this operation is actually a bit risky. In fact, I just want to use this real example to show you that many keys to “mastering Kafka” are hidden in the source code. So, let’s start looking for the “keys” next.

Cluster Metadata #

To fully understand the working principle of the Controller, we first need to learn about the data it manages. After all, much of the Controller’s code is only for data management operations. Today, let’s focus on learning about the metadata in the Kafka cluster.

If we say that ZooKeeper is the “Source of Truth” for the entire Kafka cluster’s metadata, then the Controller can be called the “Backup Source of Truth” for the cluster metadata. Well, the latter term is just something I made up. You just need to understand that the Controller carries all the metadata on ZooKeeper.

In fact, the cluster Brokers do not directly interact with ZooKeeper to obtain metadata. Instead, they always communicate with the Controller to get and update the latest cluster data. Moreover, the community plans to “kill off” ZooKeeper (I will explain the community’s action to eliminate ZooKeeper in a “special episode” later on) and make the Controller the new “Source of Truth.”

We often mention metadata, so what exactly is the metadata of a cluster, or in other words, what does the metadata of a Kafka cluster define? I will show you a complete picture that illustrates all the cluster metadata information currently defined by Kafka.

As you can see, currently the Controller defines 17 metadata items. However, not all metadata is equally important, and you don’t have to remember them all. We only need to focus on the most important metadata and understand what they are used for by combining them with the source code.

Before we dive into the specific metadata, let me introduce the ControllerContext class. All the metadata information I just mentioned is encapsulated in this class. It should be said that this class is the data container class of the Controller component.

ControllerContext #

The source code of the Controller component is located in the src/main/scala/kafka/controller directory of the core package. The ControllerContext class is located in the ControllerContext.scala file within this directory.

This file contains only a few hundred lines of code, and the most important data structure in it is the ControllerContext class. As mentioned earlier, it defines all the metadata information mentioned earlier, as well as many utility methods. For example, the allPartitions method to retrieve all topic partition objects on the cluster, the partitionReplicaAssignment method to retrieve the replica list of a specific topic partition, and so on.

First, let’s take a look at the definition of the ControllerContext class, as shown below:

class ControllerContext {
  val stats = new ControllerStats // Controller statistics class
  var offlinePartitionCount = 0 // Offline partition counter
  val shuttingDownBrokerIds = mutable.Set.empty[Int] // List of broker IDs that are currently shutting down
  private val liveBrokers = mutable.Set.empty[Broker] // List of currently running broker objects
  private val liveBrokerEpochs = mutable.Map.empty[Int, Long] // List of running broker epochs
  var epoch: Int = KafkaController.InitialControllerEpoch // Current controller epoch value
  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion // Epoch value of the corresponding ZooKeeper node for the controller
  val allTopics = mutable.Set.empty[String] // List of cluster topics
  val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]] // Replica list of topic partitions
  val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch] // Leader/ISR replica information of topic partitions
  val partitionsBeingReassigned = mutable.Set.empty[TopicPartition] // List of topic partitions currently undergoing replica reassignment
  val partitionStates = mutable.Map.empty[TopicPartition, PartitionState] // List of topic partition states
  val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState] // List of replica states for topic partitions
  val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]] // List of replicas on offline directories
  val topicsToBeDeleted = mutable.Set.empty[String] // List of topics to be deleted
  val topicsWithDeletionStarted = mutable.Set.empty[String] // List of topics with deletion started
  val topicsIneligibleForDeletion = mutable.Set.empty[String] // List of topics currently ineligible for deletion
  ...
}

This code defines exactly 17 fields, each corresponding to the metadata information shown in the previous diagram. Next, I will explain the meaning of some important metadata in detail.

These metadata are relatively easy to understand. Once you have a grasp of them, it will be much easier for you to understand MetadataCache, which is the metadata cache. For example, the liveBrokers information I will discuss next is the metadata that the Controller synchronizes to other brokers through UpdateMetadataRequest for the MetadataCache.

ControllerStats #

The first is the variable of the ControllerStats class. The complete code is as follows:

private[controller] class ControllerStats extends KafkaMetricsGroup {
  // Meter for tracking the rate of unclean leader elections per second
  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
  // Common rate and time metrics for Controller events
  val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state =>
    state.rateAndTimeMetricName.map { metricName =>
      state -> new KafkaTimer(newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
    }
  }.toMap
}

As the name suggests, it represents some statistics about the Controller. Currently, the source code defines two types of statistical indicators: UncleanLeaderElectionsPerSec and the execution rate and time of all Controller event states. Among them, the former is the number of Unclean Leader elections executed by the Controller per second. In general, executing Unclean Leader elections may result in data loss, so it is not recommended to enable it. Once enabled, you need to constantly monitor the value of this monitoring metric to ensure that the rate of Unclean Leader elections is maintained at a low level, otherwise, there will be a lot of data loss.

The latter is the rate and time information of all Controller states, measured in milliseconds. Currently, the Controller defines many events, such as TopicDeletion which is an event for executing topic deletion, and ControllerChange which is an event for executing Controller re-election. This metric of ControllerStats creates corresponding rate monitoring metrics for each type of Controller event by appending the string “RateAndTimeMs” to the end of each event name.

Since there are many types of Controller events, there are also many corresponding rate monitoring metrics. Some Controller events require additional attention from you.

For example, the IsrChangeNotification event denotes a change in the ISR list. If this event occurs frequently, it indicates that the ISR list of replicas frequently changes, which is usually considered abnormal. Therefore, it is best to monitor the rate monitoring metric of this event.

offlinePartitionCount #

This field counts the number of offline or unavailable topic partitions in the cluster. The so-called unavailable state refers to the situation mentioned earlier where “Leader=-1” in the example.

The updatePartitionStateMetrics method in ControllerContext determines whether a partition is in an offline state based on the current state and target state of the given topic partition. If it is, the value of the offlinePartitionCount field is incremented, otherwise it is decremented. The method code is as follows:

// Update metadata for offlinePartitionCount field
private def updatePartitionStateMetrics(
  partition: TopicPartition,
  currentState: PartitionState,
  targetState: PartitionState): Unit = {
  // If the topic is not currently in the process of deletion
  if (!isTopicDeletionInProgress(partition.topic)) {
    // targetState represents the state the partition is going to change to
    // If the current state is not OfflinePartition (offline state) and the target state is OfflinePartition
    // This if statement determines whether to change the partition status to OfflinePartition
    if (currentState != OfflinePartition && targetState == OfflinePartition) {
      offlinePartitionCount = offlinePartitionCount + 1
    // If the current state is already OfflinePartition, but the target state is not
    // This else if statement determines whether to change the partition status to non-offline
    } else if (currentState == OfflinePartition && targetState != OfflinePartition) {
      offlinePartitionCount = offlinePartitionCount - 1
    }
  }
}

To begin with, the method checks whether the topic to which this partition belongs is currently in the process of deletion. If so, Kafka cannot modify the state of this partition, so the code does nothing and returns directly. Otherwise, the code checks if the partition needs to be transitioned to the offline state. If the target state is OfflinePartition, then the offlinePartitionCount value is incremented because there is now an additional partition in the offline state. Conversely, if the currentState is OfflinePartition and the target state is not, then the offlinePartitionCount value is decremented.

shuttingDownBrokerIds #

As the name suggests, this field stores a list of all Broker IDs that are currently being shut down. When the Controller manages the cluster Broker, it relies on this field to determine whether a Broker is currently closed because a closed Broker is not suitable for performing certain operations such as partition reassignment and topic deletion.

In addition, Kafka must perform many cleanup operations for these shutting down Brokers. The Controller defines an onBrokerFailure method for this purpose. The code is as follows:

private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {
  info(s"Broker failure callback for ${deadBrokers.mkString(",")}")
  // deadBrokers: A list of Broker IDs that have terminated
  // Update Controller metadata by removing the given Brokers from the replicasOnOfflineDirs in the metadata
  deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)
  // Find all replica objects on these Brokers
  val deadBrokersThatWereShuttingDown =
    deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
  if (deadBrokersThatWereShuttingDown.nonEmpty)
    info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")
}

// Execute replica cleaning tasks val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet) onReplicasBecomeOffline(allReplicasOnDeadBrokers) // Unregister ZooKeeper listeners registered on these brokers unregisterBrokerModificationsHandler(deadBrokers) } This method receives a list of terminated broker IDs, first updates the controller metadata information, removes the given brokers from the replicasOnOfflineDirs and shuttingDownBrokerIds in the metadata, and then performs the necessary replica cleaning tasks for this group of brokers, which is what the onReplicasBecomeOffline method does.

This method mainly relies on the partition state machine and the replica state machine to complete the corresponding tasks. We will discuss the replica state machine and the partition state machine in detail in later courses, so just have a general understanding of what it does for now. After we finish learning about these two state machines, you can revisit the specific implementation details of this method.

The main purpose of this method is to mark the given replicas as offline. It consists of the following steps:

  1. Use the partition state machine to mark the partitions where the replicas are located as offline.
  2. Change the status of all new partitions and offline partitions on the cluster to online.
  3. Change the status of the corresponding replica objects to offline.

liveBrokers #

This field stores all currently running broker objects. Each broker object is a triple. ControllerContext defines many methods to manage this field, such as addLiveBrokersAndEpochs, removeLiveBrokers, and updateBrokerMetadata. Let’s explain the updateBrokerMetadata method, here is the source code:

def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
  liveBrokers -= oldMetadata
  liveBrokers += newMetadata
}

Whenever a new broker is added or an existing broker is removed, ZooKeeper updates its stored broker data, which triggers the Controller to modify the metadata, and the updateBrokerMetadata method is called to add or remove the object from the broker list. Super simple, right?!

liveBrokerEpochs #

This field stores the epoch information of all currently running brokers. Kafka uses Epoch data to prevent Zombie Brokers, i.e., a very old broker being elected as the Controller.

In addition, this field is mostly used to obtain the ID sequence of all currently running brokers, as defined by the following method:

def liveBrokerIds: Set[Int] = liveBrokerEpochs.keySet -- shuttingDownBrokerIds

The keySet method of liveBrokerEpochs returns the list of broker IDs, and then removes the IDs of brokers that are shutting down. The remaining IDs are the list of broker IDs that are running.

epoch & epochZkVersion #

Let’s talk about these two fields together because they both have the word “epoch” in them. Talking about them together will help you better understand the difference between the two. In fact, epoch is the value of the /controller_epoch node in ZooKeeper. You can think of it as the version number of the Controller for the entire Kafka cluster, while epochZkVersion is the dataVersion value of the /controller_epoch node.

Kafka uses epochZkVersion to determine and prevent Zombie Controllers. This means that the Controller operations performed during the term of the old Controller cannot succeed with the new Controller, because the epochZkVersion of the new Controller is higher than that of the old Controller.

Also, you may ask, “What’s the difference between these two Epochs and the liveBrokerEpochs mentioned above?” In fact, these two Epoch values are both Controller-side data, while liveBrokerEpochs is the Epoch value of each broker itself.

allTopics #

This field stores the names of all topics on the cluster. Whenever there are changes in the topics, the Controller updates the value of this field.

For example, if the Controller has a method called processTopicChange, which, judging from its name, handles topic changes. Let’s take a look at its implementation below, where I marked the main logic with comments:

private def processTopicChange(): Unit = {
  if (!isActive) return // If the Controller is already closed, return directly
  val topics = zkClient.getAllTopicsInCluster(true) // Get the current list of all topics from ZooKeeper
  val newTopics = topics -- controllerContext.allTopics // Find the topics that exist in ZooKeeper but not in the current metadata, considered as newly added topics
  val deletedTopics = controllerContext.allTopics -- topics // Find the topics that exist in the current metadata but not in ZooKeeper, considered as deleted topics
  controllerContext.allTopics = topics // Update the Controller metadata
  // Perform subsequent operations for newly added topics and deleted topics
  registerPartitionModificationsHandlers(newTopics.toSeq)
  val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
  deletedTopics.foreach(controllerContext.removeTopic)
  addedPartitionReplicaAssignment.foreach {
    case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
  }
  info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
    s"[$addedPartitionReplicaAssignment]")
  if (addedPartitionReplicaAssignment.nonEmpty)
    onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}

partitionAssignments #

This field stores the replica assignments for all topic partitions. In my opinion, this is the most important metadata in the Controller. In fact, you can derive and define many useful methods from this field to help Kafka retrieve data from various perspectives.

For example, if Kafka wants to get all partitions on a specific broker, it can be defined as follows:

partitionAssignments.flatMap {
  case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {
    case (_, partitionAssignment) => partitionAssignment.replicas.contains(brokerId)
  }.map {
    case (partition, _) => new TopicPartition(topic, partition)
  }
}.toSet

Another example, if Kafka wants to get all partition objects for a specific topic, the code can be written as follows:

partitionAssignments.getOrElse(topic, mutable.Map.empty).map {
  case (partition, _) => new TopicPartition(topic, partition)
}.toSet

In fact, these two code snippets are the main implementation code of the partitionsOnBroker method and the partitionsForTopic method in ControllerContext.scala.

Now that I’ve covered these 9 important metadata fields, I’m done with the explanation. As mentioned before, ControllerContext defines a total of 17 metadata fields. You can go through the definitions of the remaining 8 fields in conjunction with these 9 fields to get a good grasp of them. The better you understand the Controller metadata, the clearer you will understand the role of the Controller in the cluster.

It is worth noting that when learning about each metadata field, besides its definition, I recommend you to search for how the related utility methods are implemented. If you want to add new methods to retrieve or update the metadata later, you need to have a strong understanding of the code that operates on them.

Summary #

Today, we kicked off the learning journey of the important component of Kafka, the Controller. I provided a roadmap for learning the Controller module and introduced its important metadata.

  • Controller metadata: The Controller currently defines 17 types of metadata, covering all aspects of Kafka cluster data.
  • ControllerContext: Defines the metadata and the class that operates on them.
  • Key metadata fields: The most important metadata includes offlinePartitionCount, liveBrokers, partitionAssignments, etc.
  • ControllerContext utility methods: The ControllerContext class defines many useful methods to manage this metadata information.

In the next lesson, we will learn how the Controller sends requests to the Broker. The interaction and communication between the Controller and the Broker are important for the Controller to establish its dominance. I will explain in detail how it achieves this.

Post-class Discussion #

Today I did not provide all the metadata explanations. Please analyze the code and tell me what data is stored in partitionLeadershipInfo.

Feel free to write down your thoughts and answers in the comment section to discuss with me. You are also welcome to share today’s content with your friends.