26 Metadata Cache Broker How Is the Metadata Cache Updated Asynchronously

26 MetadataCacheBroker How is the Metadata Cache Updated Asynchronously #

Hello, I’m Hu Xi. Today, we are going to learn about the metadata cache (MetadataCache) on the Broker.

You must be wondering, didn’t we already learn about the metadata cache on the Controller side? What is the metadata cache here? In fact, the MetadataCache here refers to the metadata cache on the Broker. This data is sent to the Broker by the Controller through the UpdateMetadataRequest. In other words, the Controller implements an asynchronous update mechanism that can broadcast the latest cluster information to all Brokers.

So why does each Broker need to hold the same data? There are two reasons.

The first and most important reason is that by storing this data, the Broker can promptly respond to metadata requests sent by clients, i.e., process Metadata requests. Metadata requests are one of the few types of requests that can be processed by any Broker in the cluster, which means that client programs can freely send Metadata requests to any Broker to obtain metadata information about the cluster. This is made possible thanks to the existence of the MetadataCache.

The second reason is that some important components of Kafka use this data. For example, the replica manager uses it to obtain information about Broker nodes, and the transaction manager uses it to obtain information about the leader replicas of partitions, and so on.

In summary, the MetadataCache is data that is kept on each Broker. Kafka ensures eventual consistency of the metadata cache on all Brokers through an asynchronous update mechanism.

In practical usage, you may encounter a scenario where a new topic is created in the cluster, but the consumer-side reports an error saying “topic information not found.” This situation usually lasts for a short period of time. If you haven’t thought about the reason behind this, it’s actually quite simple. It’s because the metadata is asynchronously synchronized. Therefore, at a certain moment, some Brokers have not yet updated their metadata, and the data they hold is expired metadata, unable to recognize the latest topics.

Once you have finished studying the MetadataCache class today, especially the updating of metadata, you will completely understand this issue. Now, let’s learn about the code of the MetadataCache class.

MetadataCache Class #

The MetadataCache class is located in the same-named Scala file under the server package. This is a small file with less than 400 lines, and its code structure is very simple. The file only defines one class, which is MetadataCache.

The instantiation of MetadataCache is done when the Kafka Broker starts, and the specific invocation occurs in the startup method of the KafkaServer class.

// KafkaServer.scala
def startup(): Unit = {
  try {
    ......
    metadataCache = new MetadataCache(config.brokerId)
    ......
  }
  catch {
    case e: Throwable =>
      ......
  }
}

Once the instance is successfully created, it will be used by four components of Kafka. Let me explain the names of these four components and their main purposes for using this instance.

  • KafkaApis: This is the entry class of the source code. It is the place where various Kafka request logic is executed. This class extensively uses the topic partition and broker data from MetadataCache to perform topic-related checks, comparisons, and to obtain broker information.
  • AdminManager: This is a specially-defined manager for topic management in Kafka, which defines many methods related to topics. Similar to the KafkaApis class, it will use the topic information and broker data from MetadataCache to obtain the list of topics and brokers.
  • ReplicaManager: This is the replica manager that we just learned. It needs to obtain the topic partitions and broker data, and it also updates the MetadataCache.
  • TransactionCoordinator: This is the coordinator component for managing Kafka transactions. It needs to use the broker data of the leader replicas of the topic partitions in MetadataCache to send transaction markers to specified brokers.

Class Definition and Fields #

Once we understand when the MetadataCache class is created and who its callers are, we have a better understanding of its typical use cases. As a cluster metadata repository, it stores all important data about topics and brokers in the cluster. Now, let’s take a closer look at what this data actually represents.

class MetadataCache(brokerId: Int) extends Logging {
  private val partitionMetadataLock = new ReentrantReadWriteLock()
  @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,
    controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)
  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
  private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
  ......
}

The MetadataCache class constructor only takes one parameter: brokerId, which is the ID number of the broker. In addition to this parameter, the class defines four fields.

The partitionMetadataLock field is a lock object that protects the write operations. The logIdent and stateChangeLogger fields are used for logging only. The metadataSnapshot field stores the actual metadata information and is the most important field in the MetadataCache class.

This field has a type of MetadataSnapshot, which is a nested class defined in MetadataCache. Here is the source code of the nested class:

case class MetadataSnapshot(partitionStates: mutable.AnyRefMap
  [String, mutable.LongMap[UpdateMetadataPartitionState]],
  controllerId: Option[Int],
  aliveBrokers: mutable.LongMap[Broker],
  aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]])

From the source code, we can see that it is a case class, which is equivalent to a POJO class in Java with generated getter methods. It is also an immutable class. Because of its immutability, the values of its fields cannot be modified; instead, we need to create a new instance to store updated field values.

Let’s take a look at the meaning of each field.

  • partitionStates: This is a Map. The key is the topic name, and the value is another Map. The key of the inner map is the partition number, and the value is a field of type UpdateMetadataPartitionState. UpdateMetadataPartitionState is a data structure needed by the UpdateMetadataRequest. We will discuss the details of this type shortly.
  • controllerId: The ID of the broker where the controller is located.
  • aliveBrokers: A list of all live broker objects in the current cluster.
  • aliveNodes: This is also a Map of Maps. The key is the broker ID, and the value is a Map. The key is ListenerName, which represents the type of broker listener, and the value is the broker node object.

Now, let’s talk about the UpdateMetadataPartitionState type. The source code of this type is generated by the Kafka project. The fields required by the UpdateMetadataRequest are represented in JSON format, and the Kafka generator project is responsible for generating the corresponding Java files for the JSON format. The generated class is a POJO class with the following definition:

static public class UpdateMetadataPartitionState implements Message {
    private String topicName;     // topic name
    private int partitionIndex;   // partition number
    private int controllerEpoch;  // Controller epoch value
    private int leader;           // Broker ID where the leader replica is located
    private int leaderEpoch;      // Leader epoch value
    private List<Integer> isr;    // ISR list
    private int zkVersion;        // ZNode version number in ZooKeeper node statistics
    private List<Integer> replicas;  // Replica list
    private List<Integer> offlineReplicas;  // Offline replica list
    private List<RawTaggedField> _unknownTaggedFields; // Unknown field list
    ......
}

As we can see, the UpdateMetadataPartitionState class contains extensive information about a topic partition, including the topic name, partition number, leader replica, ISR list, controller epoch, ZooKeeper version number, and more. From a high-level perspective, Kafka cluster metadata consists of two parts: topic data and broker data. Therefore, it can be said that this field in the MetadataCache class supports “half of the sky” of metadata caching.

Important Methods #

Next, let’s learn about the important methods of the MetadataCache class. What you need to remember is that the most important methods of this class are the ones that operate on the metadataSnapshot field. After all, the so-called metadata cache refers to what is carried by the MetadataSnapshot class.

I roughly classify the methods of the MetadataCache class into three categories:

  1. Methods for checking.
  2. Methods for fetching.
  3. Methods for updating.

These three categories of methods have a progressive relationship. We’ll start with the simpler methods for checking.

Methods for Checking #

The so-called methods for checking are the ones that determine whether a given topic or topic partition is included in the metadata cache. The MetadataCache class provides two methods for checking, both named contains, but with different input parameters.

// Determines whether the given topic is included in the metadata cache
def contains(topic: String): Boolean = {
  metadataSnapshot.partitionStates.contains(topic)
}

// Determines whether the given topic partition is included in the metadata cache
def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined

// Retrieves detailed data information for a given topic partition. Returns None if the corresponding record is not found
def getPartitionInfo(topic: String,
  partitionId: Int): Option[UpdateMetadataPartitionState] = {
  metadataSnapshot.partitionStates.get(topic)
    .flatMap(_.get(partitionId))
}

The first contains method is used to determine whether the given topic is included in the metadata cache. It is relatively simple, just need to check if all the keys in metadataSnapshot.partitionStates contain the specified topic.

The second contains method is relatively more complex. It first retrieves the partition data information for the specified topic partition from metadataSnapshot, then determines whether the given topic partition is included in the metadata cache based on whether the partition data exists.

The implementation of the methods for checking is relatively simple, with not much code, and is easy to understand, so I won’t say much. Next, let’s look at the methods for fetching.

Methods for Fetching #

The MetadataCache class has many getXXX methods. Among them, the most representative ones are getAllTopics, getAllPartitions, and getPartitionReplicaEndpoints, which are methods for fetching topic, partition, and replica objects, respectively. In my opinion, these are the most basic methods for fetching metadata and are well worth learning.

First, let’s look at the beginner-level get method, which is the getAllTopics method. This method returns all the topics in the current cluster’s metadata cache. The code is as follows:

private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
  snapshot.partitionStates.keySet
}

It simply returns all the key fields of the partitionStates field of the MetadataSnapshot data type. As mentioned earlier, partitionStates is a map type, where the key is the topic. How simple is that?

If we want to fetch the partition objects in the metadata cache, how would we write it? Let’s take a look at the implementation of the getAllPartitions method:

def getAllPartitions(): Set[TopicPartition] = {
  metadataSnapshot.partitionStates.flatMap { case (topicName, partitionsAndStates) =>
    partitionsAndStates.keys.map(partitionId => new TopicPartition(topicName, partitionId.toInt))
  }.toSet
}

Similar to the getAllTopics method, its main idea is to iterate through partitionStates, extract the partition number, create a TopicPartition instance, and add it to the collection to be returned.

Finally, let’s look at a slightly more complex get method: getPartitionReplicaEndpoints.

def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
  // Use a local variable to get the current metadata cache
  val snapshot = metadataSnapshot
  // Get the data for the given topic partition
  snapshot.partitionStates.get(tp.topic).flatMap(_.get(tp.partition))
    .map { partitionInfo =>
    // Get the replica ID list
    val replicaIds = partitionInfo.replicas
    replicaIds.asScala
      .map(replicaId => replicaId.intValue() -> {
        // Get the Broker ID where the replica is located
        snapshot.aliveBrokers.get(replicaId.longValue()) match {
          case Some(broker) =>
            // Get the corresponding Broker node object based on the Broker ID
            broker.getNode(listenerName).getOrElse(Node.noNode())
          case None => // If the node cannot be found
            Node.noNode()
        }}).toMap
      .filter(pair => pair match {
        case (_, node) => !node.isEmpty
      })
  }.getOrElse(Map.empty[Int, Node])
}

This getPartitionReplicaEndpoints method accepts a topic partition and a listener name, and retrieves the Broker node objects for all replicas of that topic partition under the specified listener type, grouping them by Broker ID.

First, the code uses a local variable to get the current metadata cache. The advantage of doing this is that it doesn’t require the use of lock techniques. However, as I mentioned at the beginning, a possible problem here is that the read data may be outdated. Fortunately, Kafka can handle expired metadata. When a client sends an incorrect command to the Broker because it has obtained expired metadata, the Broker will explicitly notify the client of the error reason. Upon receiving the error, the client will attempt to fetch the latest metadata again. This process ensures that the client ultimately obtains the latest metadata information. Overall, the adverse effects of expired metadata exist, but they are not too serious in practical scenarios.

After obtaining the topic partition data, the code retrieves the replica ID list, then iterates through the list to get the Broker ID where each replica is located, and based on this Broker ID, retrieves the corresponding Broker node object. Finally, it packages these node objects into the result to be returned.

Methods for Updating #

Next, we come to the “highlight” of today: the update methods for the Broker-side metadata cache. I call it the “highlight” for two reasons:

  1. Compared to the previous two categories of methods, the code implementation of these update methods is much more complex, so we need to spend more time learning them.
  2. The metadata cache can only be read if it is updated. To some extent, it is a prerequisite for all subsequent getXXX methods.

There is only one implementation method for updating in the source code: the updateMetadata method. The code for this method is quite long, so I will first draw a flowchart to help you understand what it does.

The main logic of the updateMetadata method is to read the partition data from the UpdateMetadataRequest request and then update the local metadata cache. Next, we’ll study its implementation logic in detail.

To help you master it, I will divide this method into several parts. First, let’s look at the code for the first part:

def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
  inWriteLock(partitionMetadataLock) {
    // Save alive Broker objects. The key is the Broker ID, and the value is the Broker object.

The updateMetadata method acquires a write lock on partitionMetadataLock, which ensures that the method execution is exclusive.

val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
// Save alive broker objects. Key is Broker ID, and Value is listener -> node object
val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)
// Get the Broker ID where the Controller is located from the UpdateMetadataRequest
// If there is no Controller, assign it as None
val controllerIdOpt = updateMetadataRequest.controllerId match {
    case id if id < 0 => None
    case id => Some(id)
}
// Loop through all the alive Broker objects in the UpdateMetadataRequest
updateMetadataRequest.liveBrokers.forEach { broker =>
    val nodes = new java.util.HashMap[ListenerName, Node]
    val endPoints = new mutable.ArrayBuffer[EndPoint]
    // Loop through all EndPoint types, which are the listeners configured for the Broker
    broker.endpoints.forEach { ep =>
        val listenerName = new ListenerName(ep.listener)
        endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol))
        // Save the <listener, node object> pair
        nodes.put(listenerName, new Node(broker.id, ep.host, ep.port))
    }
    // Add the Broker to the alive Broker object collection
    aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
    // Add the Broker node to the alive node object collection
    aliveNodes(broker.id) = nodes.asScala
}
......

// Use the alive Broker node objects from the previous part
// Get all <listener, node> pairs for the current Broker
aliveNodes.get(brokerId).foreach { listenerMap =>
    val listeners = listenerMap.keySet
    // If the listeners configured for the current Broker are different from other Brokers, log an error
    if (!aliveNodes.values.forall(_.keySet == listeners))
        error(s"Listeners are not identical across brokers: $aliveNodes")
}
// Construct the deleted partition array and return it as the method result
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
// If the UpdateMetadataRequest does not carry any partition information
if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
    // Construct a new MetadataSnapshot object using the previous partition information and the new Broker list information
    metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerIdOpt, aliveBrokers, aliveNodes)
// Otherwise, go to the last part of the method
} else {
    ......
}

The main purpose of this code is to prepare data for subsequent operations, namely, the aliveBrokers and aliveNodes fields.

First, the code creates these two fields to respectively store the alive Broker objects and alive node objects. The key type of aliveBrokers is the Broker ID, and the value type is the Broker object. The key type of aliveNodes is also the Broker ID, and the value type is the <listener, node> pair.

Then, the method obtains the Broker ID where the Controller is located from the UpdateMetadataRequest and assigns it to the controllerIdOpt field. If the cluster does not have a Controller, the field is assigned as None.

Next, the code loops through all the alive Broker objects in the UpdateMetadataRequest. It retrieves all the EndPoint types, which are the listeners configured for the Broker.

Finally, the code traverses the listeners and saves the <listener, Broker node object> pairs. It then adds the Broker to the collection of alive Broker objects and adds the Broker node to the collection of alive node objects. At this point, the logic of the first part of the code is completed.

Now let’s look at the second part of the code. This part’s main function is to ensure that the cluster Brokers have the same listeners configured and initialize the deleted partitions array for the subsequent part of the code to operate on. The code is as follows:

// Using the alive Broker node objects from the previous part,
// get all <listener, node> pairs for the current Broker
aliveNodes.get(brokerId).foreach { listenerMap =>
    val listeners = listenerMap.keySet
    // If the listeners configured for the current Broker are different from other Brokers, log an error
    if (!aliveNodes.values.forall(_.keySet == listeners))
        error(s"Listeners are not identical across brokers: $aliveNodes")
}
// Construct the deleted partition array and return it as the method result
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
// If the UpdateMetadataRequest does not carry any partition information
if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
    // Construct a new MetadataSnapshot object using the previous partition information and the new Broker list information
    metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerIdOpt, aliveBrokers, aliveNodes)
// Otherwise, go to the last part of the method
} else {
    ......
}

First, this method uses the alive Broker node objects from the previous part to retrieve all <listener, node> pairs for the current Broker.

Then, it gets all the listeners configured for the current Broker. If it finds that the configured listeners are different from other Brokers, it logs an error.

Next, the code constructs a deleted partition array and returns it as the method result. Then it checks if the UpdateMetadataRequest carries any partition information. If it does not, it constructs a new MetadataSnapshot object using the previous partition information and the new Broker list information. If there is partition information, the code goes to the last part of the method.

The last part is in the else branch of the code above. The main task of this part is to extract the data from the UpdateMetadataRequest, and then fill the metadata cache. The code is as follows:

val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
// Backup the partition data in the existing metadata cache
metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>
    val copy = new mutable.LongMap[UpdateMetadataPartitionState](oldPartitionStates.size)
    copy ++= oldPartitionStates
    partitionStates(topic) = copy
}
val traceEnabled = stateChangeLogger.isTraceEnabled
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
// Get all partition data carried in the UpdateMetadataRequest
val newStates = updateMetadataRequest.partitionStates.asScala
// Iterate over the partition data
newStates.foreach { state =>
    val tp = new TopicPartition(state.topicName, state.partitionIndex)
    // If the partition is being deleted
    if (state.leader == LeaderAndIsr.LeaderDuringDelete) {
        // Remove the partition from the metadata cache
        removePartitionInfo(partitionStates, tp.topic, tp.partition)
        if (traceEnabled)
            stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +
                s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
        // Add the partition to the result data
        deletedPartitions += tp
    } else {
        // Add the partition to the metadata cache
        addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, state)
        if (traceEnabled)
            stateChangeLogger.trace(s"Cached leader info $state for partition $tp in response to " +
                s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
    }
}
val cachedPartitionsCount = newStates.size - deletedPartitions.size
stateChangeLogger.info(s"Add $cachedPartitionsCount partitions and deleted ${deletedPartitions.size} partitions from metadata cache " +
    s"in response to UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
// Build the latest metadata cache using the updated partition metadata, alive Broker list, and node list from the first part
metadataSnapshot = 
    MetadataSnapshot(partitionStates, controllerIdOpt, aliveBrokers, aliveNodes)
// Return the deleted partition list array
deletedPartitions

First, the method backs up the partition data in the existing metadata cache to the local variable partitionStates.

Then, it gets the trace log state and the Controller ID and epoch from the UpdateMetadataRequest.

Next, it gets all the partition data carried in the UpdateMetadataRequest and iterates over each partition data. If it finds that the partition is being deleted, it removes the partition from the metadata cache and adds it to the result data. Otherwise, it adds the partition to the metadata cache.

Finally, the method builds the latest metadata cache using the updated partition metadata, the alive Broker list, and the node list from the first part. It returns the deleted partition list array. Thus, the updateMetadata method ends here.

Summary #

Today, we learned about the MetadataCache class on the Broker side, which is the so-called metadata cache class. This class stores detailed data about topic partitions and Broker data on the current cluster. Each Broker maintains an instance of MetadataCache. The Controller updates this part of the cache data asynchronously by sending UpdateMetadataRequest requests to the Broker.

Let’s review the key points of this lesson.

  • MetadataCache class: Broker metadata cache class, which stores partition detailed data and Broker node data.
  • The four major callers: ReplicaManager, KafkaApis, TransactionCoordinator, and AdminManager.
  • updateMetadata method: Trigger an update when the Controller sends an UpdateMetadataRequest request to the Broker.

Finally, I’d like to discuss a topic with you.

Some people think that Kafka Brokers are stateless. After learning today’s content, you should now know that Brokers are not stateless nodes, as they need to asynchronously update and store the cluster’s metadata information from the Controller side. Due to Kafka’s use of the Leader/Follower model, in comparison to more complex Leader-based or Leaderless architectures, this distributed architecture’s consistency is relatively easier to ensure, hence the eventual consistency of metadata among Brokers is ensured. However, as I mentioned earlier, you need to handle the issues of Follower lagging behind or data expiration. It is important to note that here the term “Leader” actually refers to the Controller, while “Follower” refers to the ordinary Broker node.

In summary, from what we’ve learned so far, I don’t know if you have this feeling that many problems and solutions in distributed architecture design are interconnected. For example, when dealing with the issue of data backup, both metadata caching and Kafka replication adopt the same design approach, that is, using a single Leader architecture where the Leader provides services externally and the Followers passively synchronize data from the Leader.

After learning new content each time, I hope you don’t treat them as isolated knowledge, but rather think and summarize, achieving a solid understanding. While studying source code is important, it is even more valuable if we can let studying source code lead us to upgrade our architectural thinking!

Post-class Discussion #

Earlier, we mentioned that when the Controller sends an UpdateMetadataRequest to the Broker, it updates the MetadataCache. Can you find the complete call path for updating the metadata cache in the source code?

Feel free to share your thoughts and answers in the comments section and discuss with me. You are also welcome to share today’s content with your friends.