29 Group Metadata Manager What Is the Group Metadata Manager

29 GroupMetadataManager What is the Group Metadata Manager #

Hello, I’m Hu Xi. Today, we are studying the source code of the GroupMetadataManager class. From its name, it is a group metadata manager, but from the functionality it provides, I prefer to call it a consumer group manager. This is because the methods it defines provide basic group-level functions such as adding groups, removing groups, and querying groups.

However, this class is not as well-known as KafkaController or GroupCoordinator, and you may not have heard of it before. But it is actually a very important consumer group management class.

The GroupMetadataManager class is instantiated when the consumer group coordinator component is created. This means that each broker, during the startup process, creates and maintains an instance of GroupMetadataManager to manage the consumer groups that it is responsible for. More importantly, most of the information related to consumer groups in the log output of the production environment is closely related to it.

Let me give you a simple example. You may have seen this log output:

Removed ××× expired offsets in ××× milliseconds.

This log is printed every 10 minutes. Have you ever wondered why it does this? Actually, this is triggered by a scheduled task created by the GroupMetadataManager class. If you are not familiar with the workings of GroupMetadataManager, although it won’t affect your immediate usage, once you see error logs related to consumer groups in a live environment, you won’t be able to pinpoint the cause of the errors based solely on the log output. To solve this problem, there’s only one way: read the source code and thoroughly understand the underlying implementation principles in order to adapt to changes.

The most important thing about this class is to understand how it manages consumer groups, as well as its methods for manipulating the internal offset topic. Both of these are heavyweight functions that we must fully understand the principles of, and they are the focus of our three lessons. Today, let’s start by learning about the class definition and the methods for managing consumer groups.

Class definition and fields #

The GroupMetadataManager class is defined in the same-named Scala file under the coordinator.group package. This class’s code is nearly 1000 lines long, so analyzing it line by line is obviously inefficient and unnecessary. Therefore, I will provide an analysis of the main logic from two dimensions: the class definition and fields, and important methods. The code below shows the class definition and the important field information I selected.

// brokerId: ID of the broker it belongs to
// interBrokerProtocolVersion: inter.broker.protocol.version value on the broker side
// config: OffsetConfig class for the internal offset topic
// replicaManager: ReplicaManager class
// zkClient: ZooKeeper client
class GroupMetadataManager(
  brokerId: Int, 
  interBrokerProtocolVersion: ApiVersion,
  config: OffsetConfig, 
  replicaManager: ReplicaManager, 
  zkClient: KafkaZkClient, 
  time: Time,
  metrics: Metrics) extends Logging with KafkaMetricsGroup {
  // Compression type. Performs compression when writing messages to the offset topic.
  private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
  // Consumer group metadata container that stores the data of all consumer groups managed by the broker.
  private val groupMetadataCache = new Pool[String, GroupMetadata]
  // Partitions in the offset topic that are currently being loaded.
  private val loadingPartitions: mutable.Set[Int] = mutable.Set()
  // Partitions in the offset topic that have completed loading.
  private val ownedPartitions: mutable.Set[Int] = mutable.Set()
  // Total number of partitions in the offset topic.
  private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
  ...
}

The constructor of this class requires 7 parameters, with the time and metrics being auxiliary parameters, so I will focus on explaining the meanings of the first 5 parameters.

  • brokerId: We are already very familiar with this parameter. It is the ID of the broker it belongs to, which is the value of the broker.id parameter.
  • interBrokerProtocolVersion: Stores the request version used for inter-broker communication. It is the value of the inter.broker.protocol.version parameter on the broker side. The main purpose of this parameter is to determine the version of the message format for the offset topic.
  • config: This is an OffsetConfig type. This type defines important parameters related to offset management, such as offset topic log segment size, offset topic replication factor, offset topic partition count configuration, etc.
  • replicaManager: ReplicaManager class. The GroupMetadataManager class uses this field to obtain partition objects, log objects, and write partition messages.
  • zkClient: ZooKeeper client. In this class, this field has only one purpose: to get the number of partitions for the offset topic from ZooKeeper.

In addition to the required fields for the constructor, this class also defines other important fields. Let me introduce a few of them that are very important.

1. compressionType The compression type. Kafka can choose to compress messages before writing them to the offset topic. Whether compression is performed depends on the value of the offsets.topic.compression.codec parameter on the broker side. It is not compressed by default. If your offset topic occupies a large amount of disk space, you can consider enabling compression to save resources.

2. groupMetadataCache This field is the most important attribute on the GroupMetadataManager class. It stores all the consumer group metadata managed by the GroupCoordinator component on this broker. Its key is the consumer group name, and the value is the consumer group metadata, which is GroupMetadata. The source code uses this field to implement adding, removing, and traversing operations on consumer groups.

3. loadingPartitions A set of partition numbers in the offset topic that are currently being loaded. Two things to note here: first, these partitions are all partitions under the offset topic, which means partitions under the __consumer_offsets topic; second, the so-called loading refers to reading the offset topic message data and populating the GroupMetadataCache field.

4. ownedPartitions A set of partition numbers in the offset topic that have completed loading. Similar to loadingPartitions, this field stores partitions that are under the offset topic. The difference is that it stores partitions that have already completed loading.

5. groupMetadataTopicPartitionCount The number of partitions in the offset topic. It is the value of the offsets.topic.num.partitions parameter on the broker side, with a default of 50 partitions. If you want to modify the number of partitions, you can change this parameter value or manually create the offset topic and specify a different number of partitions.

Among these fields, groupMetadataCache is the most important one, and the GroupMetadataManager class extensively uses this field to manage consumer groups. Next, let’s focus on how this class manages consumer groups.

Important methods #

Managing consumer groups includes two aspects: managing the consumer group metadata and managing the consumer group offsets. Group metadata and group offsets are both important consumer group management objects on the coordinator side.

Consumer Group Metadata Management #

Consumer group metadata management involves querying and retrieving group information, adding groups, removing groups, and loading group information. In terms of code complexity, the logic for querying, removing, and adding groups is relatively simple, while the loading process is a bit more involved. Let’s start with querying and retrieving.

Querying and Retrieving Consumer Group Metadata #

The GroupMetadataManager class has many methods for querying and retrieving group data. Most of these methods have simple logic that is easy to understand, such as the getGroup and getOrMaybeCreateGroup methods:

// getGroup method: returns the metadata information for the given consumer group.
// If the group information does not exist, returns None.
def getGroup(groupId: String): Option[GroupMetadata] = {
  Option(groupMetadataCache.get(groupId))
}

// getOrMaybeCreateGroup method: returns the metadata information for the given consumer group.
// If it doesn't exist, creates a new consumer group metadata object with a state of Empty, depending on the value of the createIfNotExist parameter.
def getOrMaybeCreateGroup(groupId: String, createIfNotExist: Boolean): Option[GroupMetadata] = {
  if (createIfNotExist)
    Option(groupMetadataCache.getAndMaybePut(groupId, new GroupMetadata(groupId, Empty, time)))
  else
    Option(groupMetadataCache.get(groupId))
}

The higher-level component GroupCoordinator heavily uses these two methods to retrieve data for a given consumer group. Both methods return the metadata information for the given consumer group, but there are differences between them.

For the getGroup method, if the group information does not exist, it returns None. This typically indicates that the consumer group does not exist or that the Coordinator component for this group has changed to another Broker.

On the other hand, for the getOrMaybeCreateGroup method, if the group information does not exist, it determines whether to add the consumer group based on the value of the createIfNotExist parameter. This method is called when the first member joins the consumer group and is used to create the group.

There are other places in the GroupMetadataManager class where group query and retrieval logic is scattered. However, they are similar to the code in these two methods and are easy to understand. You can read them on your own in your own time.

Removing Consumer Group Metadata #

Next, let’s look at how to remove consumer group information. When a Broker resigns as the Coordinator for certain consumer groups, it needs to remove these consumer groups from the groupMetadataCache. This is what the removeGroupsForPartition method does. Let’s take a look at its source code:

def removeGroupsForPartition(offsetsPartition: Int,
                             onGroupUnloaded: GroupMetadata => Unit): Unit = {
  val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
  info(s"Scheduling unloading of offsets and group metadata from $topicPartition")
  scheduler.schedule(topicPartition.toString, () => removeGroupsAndOffsets)

  def removeGroupsAndOffsets(): Unit = {
    var numOffsetsRemoved = 0
    var numGroupsRemoved = 0
    inLock(partitionLock) {
      ownedPartitions.remove(offsetsPartition)
      for (group <- groupMetadataCache.values) {
        if (partitionFor(group.groupId) == offsetsPartition) {
          onGroupUnloaded(group)
          groupMetadataCache.remove(group.groupId, group)
          removeGroupFromAllProducers(group.groupId)
          numGroupsRemoved += 1
          numOffsetsRemoved += group.numOffsets
        }
      }
    }
  }
}

This method removes all consumer groups from the groupMetadataCache when a Broker resigns as the Coordinator. It schedules an asynchronous task to remove the group information and offset information. The key step is removing the group information from the groupMetadataCache using groupMetadataCache.remove(group.groupId, group). It also removes the consumer group from the set of groups associated with a producer by calling removeGroupFromAllProducers(group.groupId). The counters numGroupsRemoved and numOffsetsRemoved are updated accordingly. } } info(s"Finished unloading $topicPartition. Removed $numOffsetsRemoved cached offsets " + s"and $numGroupsRemoved cached groups.") }


The main logic of this method is to firstly define an inner method `removeGroupsAndOffsets`, and then create an asynchronous task to call this method to remove consumer group information and offset information.

So how do we determine which consumer groups to remove? The reference here is the **given topic partition**. The data of each consumer group and its offset are only stored in one partition of the offset topic. Once the offset partition is given, the consumer groups stored in this offset partition in the metadata need to be removed from the broker. 

What is the specific execution logic? Let me explain.

First, the asynchronous task removes the given offset partition from `ownedPartitions`.

Next, iterate through all consumer group objects in the consumer group metadata cache. If the consumer group is indeed stored in the given offset partition, the following steps are executed one by one.

  * Step 1: Call the `onGroupUnloaded` method to execute the group unloading logic. The logic of this method is passed in by the upper-level component `GroupCoordinator`. It mainly does two things: change the consumer group status to `Dead` status, and encapsulate an exception to indicate that the coordinator has changed, and then call the callback function to return.
  * Step 2: Remove the consumer group information from `groupMetadataCache`. This step is very important to completely eliminate the "trace" of this group.
  * Step 3: Remove the consumer group from the set of groups corresponding to the producer. Here, the producer is used for Kafka transactions.
  * Step 4: Increase the counter for the number of removed groups.
  * Step 5: Update the counter for the number of removed offset values.

With that, the method ends.

### Add Consumer Group Metadata

Next, let's learn about the `addGroup` method, which is responsible for managing the addition of consumer groups. It is very simple, just call `putIfNotExists` to add the given group to `groupMetadataCache`. The code is as follows:

```scala
def addGroup(group: GroupMetadata): GroupMetadata = {
  val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group)
  if (currentGroup != null) {
    currentGroup
  } else {
    group
  }
}

Load Consumer Group Metadata #

Now let’s move on to the relatively complex process of loading consumer groups. The GroupMetadataManager class defines a loadGroup method to execute the corresponding loading process.

private def loadGroup(
  group: GroupMetadata, offsets: Map[TopicPartition, CommitRecordMetadataAndOffset],
  pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
  trace(s"Initialized offsets $offsets for group ${group.groupId}")
  // Initialize the offset information of the consumer group
  group.initializeOffsets(offsets, pendingTransactionalOffsets.toMap)
  // Call the `addGroup` method to add the consumer group
  val currentGroup = addGroup(group)
  if (group != currentGroup)
    debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
      s"because there is already a cached group with generation ${currentGroup.generationId}")
}

There are two steps in this method.

Step 1: Use the initializeOffsets method to add the offset values to the consumer group’s committed offset metadata represented by the offsets field, achieving the purpose of loading the consumer group’s subscribed partition committed offsets.

Step 2: Call the addGroup method to add the consumer group metadata object into the consumer group metadata cache, achieving the purpose of loading the consumer group metadata.

Consumer group offset management #

In addition to managing consumer groups, another major feature of the GroupMetadataManager class is to provide management of consumer group offsets, which mainly includes saving and querying offset data. We always say that offset topic is where consumer group offset information is stored. In reality, when the consumer group program queries the offset, Kafka always queries from the offset cache data in memory, instead of directly reading the underlying offset topic data.

Saving consumer group offsets #

The storeOffsets method is responsible for saving consumer group offsets. The code for this method is quite long, so let me first draw a diagram to show the complete process of this method, which will help you get a general understanding of it. Then, we will go through the method signature and the specific code to understand the execution logic in detail.

Let me explain the entire process of saving consumer group offsets.

First, the storeOffsets method filters out the offset information to be saved that meets specific conditions. Whether the specific conditions are met depends on the return value of the validateOffsetMetadataLength method. The specific condition here refers to the size of the custom data in the offset commit record, which must be smaller than the value of the broker-side parameter offset.metadata.max.bytes, with a default value of 4KB.

If none of the partitions meet the condition, an OFFSET_METADATA_TOO_LARGE exception is constructed, and the callback function is called. The callback function will send the offset commit response.

If any partition meets the condition, next, the method checks whether the current broker is the coordinator for the consumer group. If it is not the coordinator, a NOT_COORDINATOR exception is constructed and submitted to the callback function; if it is the coordinator, an offset topic message is constructed and written into the offset topic.

Then, an internal method named putCacheCallback is called to fill in the offset values in the group metadata cache for each consumer group metadata. Finally, the method calls the callback function to return.

Next, let’s look at the implementation logic of the storeOffsets method with the help of the code.

First, let’s take a look at the method signature. Since this method is used to save consumer group committed offsets, we need to know what parameters are passed to this method by the higher-level caller.

// group: Consumer group metadata
// consumerId: Consumer group member ID
// offsetMetadata: The offsets to be saved, grouped by partition
// responseCallback: Callback function to handle completion
// producerId: Transactional Producer ID
// producerEpoch: Transactional Producer Epoch value
def storeOffsets(
  group: GroupMetadata,
  consumerId: String,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
  producerId: Long = RecordBatch.NO_PRODUCER_ID,
  producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Unit = {
  ......
}

This method accepts 6 parameters, and I have annotated their meanings using comments. producerId and producerEpoch are related to Kafka transactions, you can just have a brief understanding of them. We need to focus on the meanings of the first 4 parameters.

  • group: Consumer group metadata information. The type of this field is the GroupMetadata class we learned before.
  • consumerId: Consumer group member ID, used for DEBUG debugging only.
  • offsetMetadata: The offsets to be saved, grouped by partition.
  • responseCallback: The callback function to be executed after the offsets are saved.

Next, let’s look at the code of storeOffsets. To help you understand, I have removed the parts related to Kafka transaction operations.

// Filter out offset data to be saved that meets specific conditions
val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
  validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
......
val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
// If there is no offset metadata of any partition that meets the specific conditions
if (filteredOffsetMetadata.isEmpty) {
  // Construct OFFSET_METADATA_TOO_LARGE exception and call responseCallback to return
  val commitStatus = offsetMetadata.map { case (k, _) => k -> Errors.OFFSET_METADATA_TOO_LARGE }
  responseCallback(commitStatus)
  None
} else {
  // Check if the current broker is the Coordinator for the given consumer group
  getMagic(partitionFor(group.groupId)) match {
    // If it is the Coordinator
    case Some(magicValue) =>
      val timestampType = TimestampType.CREATE_TIME
      val timestamp = time.milliseconds()
      // Construct offset commit messages for the offset topic
      val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
        val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
          if (group.is(Dead)) {
            // 如果是Dead状态,抛出异常
            debug(s"Record offset commit $offsetMetadata from group ${group.groupId} is received " +
              s"when the group is marked as dead.")
            Errors.UNKNOWN_SERVER_ERROR
          } else if (group.is(Empty)) {
            // 如果是Empty状态,抛出异常
            debug(s"Record offset commit $offsetMetadata from group ${group.groupId} is received " +
              s"when the group has no active member.")
            // the group is not empty, but has no active member anymore. This could mean the remote
            // coordinator has not received the join group response, or the group has started to
            // rebalance before the offset commit request reached to the coordinator. If the latter
            // is true, it is possible that any offset received after this empty offset could be lost
            // abnormally in case of a coordinator failover. Hence return UNKNOWN_SERVER_ERROR, so that
            // the producer can take action accordingly.
            Errors.UNKNOWN_SERVER_ERROR
          } else {
            // 否则,返回写入结果错误
            Errors.NONE
          }
        } else {
          // 如果写入结果有错误,返回写入结果错误
          status.error
        }
      }
      // 记录元数据缓存更新耗时
      group.delayedStore.completedPut(offsetMetadata, status)
      if (responseError == Errors.NONE) {
        // offset缓存更新完成事件
        group.onCompleteOffsetCommit(offsetMetadata)
      } else {
        // offset缓存更新失败事件
        group.onFailedOffsetCommit(offsetMetadata, responseError)
      }
    }
  // 如果消费者组不存在,或者是DEAD或EMPTY状态,抛出追踪日志错误
  if (group == null || group.is(Dead) || group.is(Empty))
    throw new IllegalStateException(s"Cannot fetch offsets for group $groupId in state ${group.currentState}")

  val offsets = group.allOffsets

  // 构造获取消费者组位移的订阅记录
  val partitionData = offsets.map { case (topicPartition, offsetAndMetadata) =>
    val metadataOption = if (offsetAndMetadata.metadata == null || offsetAndMetadata.metadata.isEmpty) None else Some(offsetAndMetadata.metadata)
    topicPartition -> OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, metadataOption, Errors.NONE)
  }

  // 构造返回结果
  OffsetFetchResponse(Errors.NONE, partitionData.toMap)
}
groupId: String,
requireStable: Boolean,
topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
  ......
  // Get the metadata of the specified consumer group from the groupMetadataCache field
  val group = groupMetadataCache.get(groupId)
  // If there is no group data, return empty data
  if (group == null) {
    topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
      val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
        Optional.empty(), "", Errors.NONE)
      topicPartition -> partitionData
    }.toMap
  // If group data exists
  } else {
    group.inLock {
      // If the group is in Dead state, return empty data
      if (group.is(Dead)) {
        topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
          val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
            Optional.empty(), "", Errors.NONE)
          topicPartition -> partitionData
        }.toMap
      } else {
        val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)
        topicPartitions.map { topicPartition =>
          if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
            topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
              Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
          } else {
            val partitionData = group.offset(topicPartition) match {
              // If there is no offset data for the partition, return empty data
              case None =>
                new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                  Optional.empty(), "", Errors.NONE)
              // Return the offset data for the specified partition from the consumer group metadata
              case Some(offsetAndMetadata) =>
                new PartitionData(offsetAndMetadata.offset,
                  offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
            }
            topicPartition -> partitionData
          }
        }.toMap
      }
    }
  }
}

The getOffsets method first reads the metadata of the specified consumer group from the groupMetadataCache field. If there is no corresponding record, it returns an empty dataset. If there is, it then checks whether the group is in the Dead state.

If it is in the Dead state, it means that the consumer group has already been destroyed, and the offset data is also considered unavailable. It still returns an empty dataset. If the state is not Dead, it extracts the partition information subscribed by the consumer group, and then retrieves the corresponding offset data for each partition and returns it. The method ends here.

Summary #

Today, we learned the source code of the GroupMetadataManager class. As a consumer group manager, it is responsible for managing all aspects of consumer groups. Two important management functions are consumer group metadata management and consumer group offset management, including querying, removing, adding, and loading consumer group metadata, as well as saving and querying consumer group offsets. These methods are the heavy-duty functional carriers that the upper-level component GroupCoordinator relies on. You must thoroughly grasp them.

I have created a mind map to help you review the key points of today’s content.

In fact, the GroupMetadataManager class has a significant status. Although it is not prominent in the Coordinator component, it is the root cause of some online issues.

Let me share a small case with you.

In the past, I encountered a problem: when there were too many consumer group members, it was impossible to complete the offset loading, which resulted in the Consumer always receiving the error “Marking the coordinator dead”.

At that time, I searched through various materials but could not locate the problem. In the end, it was through reading the source code that I found that the buffer created in the doLoadGroupsAndOffsets method was too small. Later, by increasing the value of the offsets.load.buffer.size parameter, we successfully solved the problem.

Just imagine, if we didn’t read this part of the source code at that time and relied solely on logs, we would definitely not be able to solve this problem. Therefore, it is very worthwhile to spend three lessons reading the source code of the GroupMetadataManager class. In the next lesson, I will continue to explore the source code of the GroupMetadataManager to find those code snippets related to offset topics.

Discussion #

Please consider this question: In what scenarios do you need to remove the consumer group records saved in the GroupMetadataManager?

Feel free to share your thoughts and answers in the comments and discuss with me. You are also welcome to share today’s content with your friends.