31 Group Metadata Manager Querying Offsets Without Reading the Offset Topic

31 GroupMetadataManager Querying Offsets without Reading the Offset Topic #

Hello, I’m Hu Xi.

In the previous lesson, we learned about two types of messages in the offset topic: consumer group registration messages and consumer group committed offset messages. Today, we will continue to learn about the offset topic, with a focus on understanding how to write to and read from the offset topic.

We always say that the offset topic is a mysterious topic. Besides the fact that we don’t create it ourselves, its mysteriousness is also reflected in the fact that we have no control over its read and write operations. By default, we are unable to write messages to this topic, and when we directly read messages from this topic, all we see is a bunch of garbled characters. Therefore, today we will learn about reading from and writing to the offset topic, which is an important step in removing its mysteriousness.

Writing to the Offset Topic #

Let’s first learn how to write to the offset topic. In [Lesson 29], when we learned the storeOffsets method, we also learned the appendForGroup method. Both types of messages defined by Kafka are written by this method. In the source code, the storeGroup method calls it to write consumer group registration messages, and the storeOffsets method calls it to write committed offset messages.

First, we need to understand the storeGroup method, which is used to register consumer groups with the coordinator. Let’s take a look at its code implementation:

def storeGroup(group: GroupMetadata,
               groupAssignment: Map[String, Array[Byte]],
               responseCallback: Errors => Unit): Unit = {
  // Check whether the current Broker is the coordinator for this consumer group
  getMagic(partitionFor(group.groupId)) match {
    // If the current Broker is not the coordinator
    case Some(magicValue) =>
      val timestampType = TimestampType.CREATE_TIME
      val timestamp = time.milliseconds()
      // Build the key of the registration message
      val key = GroupMetadataManager.groupMetadataKey(group.groupId)
      // Build the value of the registration message
      val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
      // Build the message collection to be written using the key and value
      val records = {
        val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
          Seq(new SimpleRecord(timestamp, key, value)).asJava))
        val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
        builder.append(timestamp, key, value)
        builder.build()
      }
      // Calculate the target partition to be written
      val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
      val groupMetadataRecords = Map(groupMetadataPartition -> records)
      val generationId = group.generationId
      // Call putCacheCallback method to populate the cache
      ......
      // Write message to the offset topic
      appendForGroup(group, groupMetadataRecords, putCacheCallback)
    // If the current Broker is not the coordinator
    case None =>
      // Return NOT_COORDINATOR exception
      responseCallback(Errors.NOT_COORDINATOR)
      None
  }
}

To help you understand, I will draw a diagram to illustrate the logic of the storeGroup method.

The first step of the storeGroup method is to call the getMagic method to determine if the current broker is the coordinator component for this consumer group. The determination is based on whether the underlying log object of the offset topic’s target partition can be obtained. If it can be obtained, it means that the current broker is the coordinator, and the program proceeds to the next step. Otherwise, it means that the current broker is not the coordinator, and it constructs a NOT_COORDINATOR exception to return.

In the second step, we call the groupMetadataKey and groupMetadataValue methods we learned in the previous lesson to construct the key and value fields of the registration message.

In the third step, we use the key and value to build a collection of messages to be written. The class that represents the message collection here is MemoryRecords.

Currently, there are two classes for modeling Kafka message collections:

  • MemoryRecords: Represents a message collection in memory.
  • FileRecords: Represents a message collection in a disk file.

The source code of these two classes is not the focus of our learning. You only need to know their meanings. However, I recommend that you read their source code after class. They are in the clients project, which can help you further understand how Kafka saves messages in memory and on disks.

In the fourth step, we call the partitionFor method to calculate the target partition to be written to in the offset topic.

In the fifth step, we call the appendForGroup method to insert the messages to be written into the target partition of the offset topic. At this point, the method returns.

One thing to mention is that, in the above code, I omit the source code of the putCacheCallback method. We have learned it in detail in Lesson 29. Its purpose is to populate the cache after the message is written to the offset topic.

As you can see, there is no difference between writing to the offset topic and writing to other regular topics. The coordinator constructs messages in the specified format and passes them to the storeOffsets and storeGroup methods for writing. Therefore, we can consider the coordinator as the message producer for the offset topic.

Reading Offset Topic #

In addition to playing the role of a producer, the Coordinator also acts as a consumer, responsible for reading the offset topic. Compared to writing, the logic for reading is more complex, not only in terms of code length but also in how the messages are processed after they are read.

First, we need to know when it is necessary to read the offset topic.

You might think that the offset topic is read when a consumer group queries for offsets. However, that is not the case. When querying for offsets, the Coordinator only looks up the corresponding offset value from the GroupMetadata cache, without actually reading the offset topic. The actual reading of the offset topic happens when the current Broker is elected as the Coordinator, or in other words, when the Broker becomes the leader replica for a partition of the offset topic.

Once the current Broker is elected as the leader replica for a partition of the offset topic, it needs to populate its in-memory metadata cache, which requires reading the offset topic. This is done by the scheduleLoadGroupAndOffsets method. This method creates an asynchronous task to read the offset topic messages and populate the cache. The logic executed by this asynchronous task is the loadGroupsAndOffsets method.

If you open the source code of the loadGroupsAndOffsets method, you can see that it essentially calls the doLoadGroupsAndOffsets method to read the offset topic. Now, let’s focus on understanding this method.

The code for this method is quite long. To help you better understand it, let’s first take a look at its method signature and an internal method called logEndOffset.

private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = {
  // Get the log end offset of the specified partition of the offset topic
  // If the current Broker is not the leader replica for this partition, return -1
  def logEndOffset: Long = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
  ......
}

As the name suggests, the doLoadGroupsAndOffsets method does two things: loading the consumer groups and loading the offsets of the consumer groups. Again, when we say “loading”, it means reading the messages from the offset topic and populating the information into the cache.

This method takes two parameters: the first parameter, topicPartition, is the target partition of the offset topic; the second parameter, onGroupLoaded, is the logic to be executed after the loading is complete. This logic is specified in the upper-level component and we don’t need to understand its implementation, as it doesn’t affect our learning of reading the offset topic.

The doLoadGroupsAndOffsets method also defines an internal method, logEndOffset. Its purpose is simple: to get the log end offset of the specified partition of the offset topic, and return -1 if the current Broker is not the leader replica for this partition.

This is a particularly important fact because Kafka relies on it to determine if there is a change in the leader replica of a partition. Once a change occurs, the return value of the logEndOffset method executed by the current Broker will be -1, indicating that the Broker is no longer the leader replica.

The doLoadGroupsAndOffsets method reads the log object of the target partition of the offset topic and performs the core logic, as shown in the following code:

......
replicaManager.getLog(topicPartition) match {
  // If the log object cannot be obtained
  case None =>
    warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
  case Some(log) =>
     // Core logic...

I have divided the core logic into three parts for further explanation.

  • Part 1: Initializing four lists + reading the offset topic;
  • Part 2: Processing the read data and populating the four lists;
  • Part 3: Handling these four lists separately.

Before I explain what this method does, let me first show you a flowchart that gives a macro-level view of the process.

Flowchart

Part 1 #

First, let’s learn about the code in the first part, which performs the reading operation on the offset topic.

// List of partitions for which offset loading is complete
val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
// List of partitions for which offset loading is in progress, used only for Kafka transactions
val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
// List of consumer groups for which group metadata loading is complete
val loadedGroups = mutable.Map[String, GroupMetadata]()
// List of consumer groups to be removed
val removedGroups = mutable.Set[String]()
      ......
      // 创建消息集合
      val memRecords = fetchDataInfo.records match {
        case records: MemoryRecords => records
        case fileRecords: FileRecords =>
          val sizeInBytes = fileRecords.sizeInBytes
          val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes)
          if (buffer.capacity < bytesNeeded) {
            if (config.loadBufferSize < bytesNeeded)
              warn(s"Loaded offsets and group metadata from $topicPartition with buffer larger ($bytesNeeded bytes) than " +
                s"configured offsets.load.buffer.size (${config.loadBufferSize} bytes)")
            buffer = ByteBuffer.allocate(bytesNeeded)
          } else {
            buffer.clear()
          }
          fileRecords.readInto(buffer, 0)
          MemoryRecords.readableRecords(buffer)
      }
      ......

代码的第一步是根据fetchDataInfo.records判断消息集合的类型。如果是MemoryRecords类型,则直接赋值给memRecords;如果是FileRecords类型,则需要将其转换为MemoryRecords类型。

在FileRecords类型的情况下,代码会首先获取待读取的字节大小sizeInBytes和所需的字节大小bytesNeeded。

判断buffer的容量是否小于所需字节大小bytesNeeded。如果是,则重新创建ByteBuffer对象,并分配所需字节大小的空间;如果不是,则直接清空buffer。

接下来,调用fileRecords的readInto方法,将消息数据读取到buffer中。

最后,使用MemoryRecords的readableRecords方法,从buffer中读取可读取的消息,并赋值给memRecords。

经过这些处理,不论是MemoryRecords类型还是FileRecords类型的消息集合,都会被转换为MemoryRecords类型的memRecords对象。

至此,第2部分逻辑完成。这一部分的产物就是成功地将位移主题目标分区的日志对象转换为了MemoryRecords类型的消息集合对象。

// Traverse each RecordBatch in the message collection
memRecords.batches.forEach { batch =>
  val isTxnOffsetCommit = batch.isTransactional
  // If it is a control batch
  // Control batches belong to the Kafka transaction category, which is not discussed here
  if (batch.isControlBatch) {
    ......
  } else {
    // Save the offset value of the first message in the batch
    var batchBaseOffset: Option[Long] = None
    // Traverse all the messages in the batch
    for (record <- batch.asScala) {
      // Make sure the message has a key, otherwise throw an exception
      require(record.hasKey, "Group metadata/offset entry key should not be null")
      // Save the offset value of the first message in the batch
      if (batchBaseOffset.isEmpty)
        batchBaseOffset = Some(record.offset)
      // Read the message key
      GroupMetadataManager.readMessageKey(record.key) match {
        // If it is an OffsetKey, it means a commit offset message
        case offsetKey: OffsetKey =>
          ......
          val groupTopicPartition = offsetKey.key
          // If this message has no value
          if (!record.hasValue) {
            if (isTxnOffsetCommit)
              pendingOffsets(batch.producerId)
              .remove(groupTopicPartition)
            else
              // Remove the target partition from the list of partitions loaded with completed offset values
              loadedOffsets.remove(groupTopicPartition)
          } else {
            val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
            if (isTxnOffsetCommit)
              pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
            else
              // Add the target partition to the list of partitions loaded with completed offset values
              loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
          }
        // If it is a GroupMetadataKey, it means a register message
        case groupMetadataKey: GroupMetadataKey =>
          val groupId = groupMetadataKey.key
          val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
          // If the message value is not empty
          if (groupMetadata != null) {
            // Remove the consumer group from the list of consumer groups to be removed
            removedGroups.remove(groupId)
            // Add the consumer group to the list of loaded consumer groups
            loadedGroups.put(groupId, groupMetadata)
          // If the message value is empty, it means a Tombstone message
          } else {
            // Remove the consumer group from the list of loaded groups
            loadedGroups.remove(groupId)
            // Add the consumer group to the list of consumer groups to be removed
            removedGroups.add(groupId)
          }
        // If it is a key of unknown type, throw an exception
        case unknownKey =>
          throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
      }
    }
  }
  // Update the read position to the offset value of the last message in the batch + 1, waiting for the next while loop
  currOffset = batch.nextOffset
}

The main purpose of this part is to process the message collection obtained in the previous step, and add the corresponding data to the 4 lists mentioned earlier. The specific logic is that the code traverses each message batch (Record Batch) in the message collection. Let me explain this process further. First, determine if this batch is a control message batch. If it is, execute some Kafka transaction-specific logic. Since we are not discussing Kafka transactions here, I won’t go into details. If it is not a control message batch, proceed to the next step.

Next, iterate through all the messages in this batch and perform the following steps:

Step 1: Record the offset value of the first message in the batch.

Step 2: Read the message key and determine its type based on the following criteria:

  • If it is a commit offset message, check if there is a value. If there is no value, remove the target partition from the list of partitions that have completed offset loading. If there is a value, add the target partition to the list of partitions that have completed offset loading.
  • If it is a registration message, check if there is a value. If there is a value, remove the consumer group from the list of consumer groups to be removed and add it to the list of consumer groups that have completed loading. If there is no value, it means that this is a Tombstone message. In this case, remove the consumer group from the list of consumer groups that have completed loading and add it to the list of consumer groups to be removed.
  • If the key type is unknown, throw an exception directly.

Finally, update the read position and wait for the next while loop. The position is the offset value of the last message in the batch plus 1.

At this point, this part of the code is completed. Its main output is the four filled lists. Now, in the third part, we will start processing these four lists.

Part 3 #

The complete code for the last part is as follows:

// Process loadedOffsets
val (groupOffsets, emptyGroupOffsets) = loadedOffsets
  .groupBy(_._1.group)
  .map { case (k, v) =>
    // Extract <group name, topic name, partition number> and offset value pair
    k -> v.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }
  }.partition { case (group, _) => loadedGroups.contains(group) }
......
// Process loadedGroups
loadedGroups.values.foreach { group =>
  // Extract the committed offsets for the consumer group
  val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
  val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
  debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
  // Perform the load group operation for the loaded group
  loadGroup(group, offsets, pendingOffsets)
  // Perform post-load group operation for the loaded group
  onGroupLoaded(group)
}
(emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
  val group = new GroupMetadata(groupId, Empty, time)
  val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
  val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
  debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
  // Perform the load group operation for the empty consumer group
  loadGroup(group, offsets, pendingOffsets)
  // Perform post-load group operation for the empty consumer group
  onGroupLoaded(group)
}
// Process removedGroups
removedGroups.foreach { groupId =>
  if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
    throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
      s"loading partition $topicPartition")
}

First, the code groups the loadedOffsets, separating the offset values of consumer groups that have completed loading into the groupOffsets field, and separating those with offset values but no corresponding group information into the emptyGroupOffsets field.

Next, the code performs the load group operation and the post-load group operation (onGroupLoaded) for all consumer groups in loadedGroups. Remember, loadedGroups contains consumer groups that have completed loading. The onGroupLoaded function is passed in by the higher-level component Coordinator. Its main purpose is to handle heartbeat timeouts for all members of the consumer group and specify the next heartbeat timeout.

Then, the code creates empty consumer group metadata for all consumer groups in emptyGroupOffsets, performs the same load group operation and post-load group operation as in the previous step.

Finally, the code checks all consumer groups in removedGroups to ensure that they cannot be found in the consumer group metadata cache. Otherwise, an exception will be thrown.

After calling the doLoadGroupsAndOffsets method, the Coordinator successfully reads the data from the offset topic’s target partition and fills it into the consumer group metadata cache.

Summary #

Today, we focused on the code for reading and writing offset topics in the GroupMetadataManager class. The Coordinator uses these methods to manipulate the offset topics and manage consumer groups. The writing operation is relatively simple and similar to writing regular messages, while the reading operation is more complex. More importantly, contrary to our intuitive understanding, Kafka does not read the offset topics when querying the committed offsets of consumer groups. Instead, it directly queries from the in-memory cache of consumer group metadata. This is something you must pay special attention to.

Let’s briefly review the key points of this lesson.

  • Reading and writing methods: the appendForGroup method is responsible for writing to the offset topic, and the doLoadGroupsAndOffsets method is responsible for reading from the offset topic and loading group information and offset values.
  • Querying consumer group offsets: when querying offsets, the offset topics are not read. Instead, the consumer group metadata cache is read.

Image

With this, we have completed the important source code of the GroupMetadataManager class. As a large file with nearly 1000 lines of code and so many functionalities packed into it, it is definitely worth reading multiple times.

In addition to the functionalities we have covered, the GroupMetadataManager class is actually a crucial link between GroupMetadata and the Coordinator. The Coordinator uses the GroupMetadataManager class to manipulate GroupMetadata.

When I first started learning this part of the source code, I was confused about the difference between GroupMetadata and GroupMetadataManager. Now, after going through the content of these three lessons, I believe you already know that GroupMetadata models the metadata information, while the GroupMetadataManager class models the methods for managing metadata and is the only component responsible for managing the internal offset topics. In the future, whenever you encounter any issues related to offset topics, you can directly refer to this class for answers.

After-class Discussion #

In fact, besides the read-write displacement topic, the GroupMetadataManager also provides a method to clear the displacement topic data. The cleanGroupMetadata in the code is responsible for this task. Please analyze the process of the cleanGroupMetadata method based on the source code.

Feel free to write down your thoughts and answers in the comments section to discuss with me. You can also share today’s content with your friends.