30 Group Metadata Manager Is the Topic Migration Only About Moving Offsets

30 GroupMetadataManager Is the Topic Migration only about Moving Offsets #

Hello, I’m Hu Xi. Today, we will study the source code of GroupMetadataManager, which is responsible for managing the offset topic.

The offset topic, known as consumer_offsets, is one of the two internal topics in Kafka (the other one being transaction_state, used for storing Kafka transaction state information).

The purpose of creating the offset topic in Kafka is to store registration messages of consumer groups and the committed offset messages. The former stores identity information of consumer groups, while the latter stores the progress information of consumer groups. In the Kafka source code, the GroupMetadataManager class defines the message types and methods for operating on the offset topic. The message types under this topic are the focus of our study today.

Speaking of the offset topic, are you curious about the content of the messages inside? I have seen many people trying to use the kafka-console-consumer command to consume this topic in order to see the content, but the output is a bunch of binary gibberish. In fact, if you don’t read today’s source code, you won’t know how to query the content of this topic using the command-line tools. This knowledge is only included in the source code and is not covered in the official documentation.

Alright, I won’t keep you waiting. Simply put, when you run the kafka-console-consumer command, you must specify --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" in order to view the data of the committed offset messages. Similarly, you must specify GroupMetadataMessageFormatter to read the registration messages of consumer groups.

Today, we will learn about these two message types under the offset topic. In addition, I will also introduce how consumer groups find their Coordinator. After all, the prerequisite for reading and writing on the offset topic is to be able to find the correct Coordinator.

Message Types #

There are two types of messages in the offset topic: Group Metadata and Offset Commit. Many people think that only consumer group offsets are stored in the offset topic, but this is incorrect! It also stores the registration information of the consumer group, or the metadata of the consumer group. The metadata mainly refers to the consumer group name and the assignment of partition consumption by group members.

Before we look at the implementation code for these two types of messages, let’s first look at the common service code defined by Kafka for them. After all, they are code components that both types of messages will use. These common code components mainly consist of the GroupTopicPartition class and the BaseKey interface.

Let’s first look at the POJO class GroupTopicPartition. Its function is to encapsulate the triple <consumer group name, topic, partition number>, and the code is as follows:

    case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
      def this(group: String, topic: String, partition: Int) =
        this(group, new TopicPartition(topic, partition))
      // toString method...
    }

Clearly, this class is a data container. We will see it later when we study the implementation of the offset commit message.

Next is the **BaseKey interface, **which represents the Key type of the two types of messages in the offset topic. It is worth emphasizing that, no matter which type of message under this topic, a Key must be defined. Here, the BaseKey interface defines the Key type of these two types of messages. Let’s see its code:

    trait BaseKey {
      def version: Short  // message format version
      def key: Any        // message key
    }

Here, version is the Short type message format version. With the continuous evolution of Kafka code, the message format of the offset topic is also continuously iterated. Therefore, the concept of version number appears here. As for the key field, it stores the actual Key value. In Scala, the Any type is similar to the Object class in Java, which means the value can be of any type. When we talk about specific message types later, you will find that the Key types of these two types of messages are actually different data types.

Okay, with the foundation laid, with an understanding of GroupTopicPartition and BaseKey, you can understand how the Key of the specific message types in the offset topic is constructed.

Next, let’s start learning the implementation code of specific message types, including the registration message, the offset commit message, and the Tombstone message. Since the consumer group must register with the Coordinator component before committing the offset, we will read the code of the registration message first.

Registration Message #

The so-called registration message refers to the message written by the consumer group to the offset topic. There are two timings for writing these messages.

  • After all members have joined the group: The Coordinator writes the registration message to the offset topic, although the message does not contain partition consumption assignment.
  • After the Leader member sends the assignment to the Coordinator: When the Leader member sends the partition consumption assignment to the Coordinator, the Coordinator writes the registration message with the assignment.

First, we need to know how the Key of the registration message is defined and how it is encapsulated into the message.

The definition of the Key is in the GroupMetadataKey class code:

    case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
      override def toString: String = key
    }

The key field of this class is a string type, which stores the name of the consumer group. Obviously, the Key of the registration message is the name of the consumer group.

The GroupMetadataManager object has a groupMetadataKey method, which is responsible for converting the Key of the registration message into a byte array for constructing the registration message later. The code of this method is as follows:

    def groupMetadataKey(group: String): Array[Byte] = {
      val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
      key.set(GROUP_KEY_GROUP_FIELD, group)
      // construct a ByteBuffer object to accommodate the version and key data
      val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
      byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
      key.writeTo(byteBuffer)
      byteBuffer.array()
    }

This method first takes the consumer group name, constructs a ByteBuffer object, then sequentially writes the Short type message format version and the consumer group name to the Buffer, and finally returns the underlying byte array of this Buffer.

You don’t need to pay attention to the version variable and how the Struct type is implemented here, because they are not the key to understanding the internal principles of the offset topic. What you need to master is how the Key and Value of the registration message are defined.

Next, let’s understand the code implementation of the Value of the message. Since there is a groupMetadataKey method, the source code also provides the corresponding groupMetadataValue method. Its purpose is to write the important metadata of the consumer group to a byte array. Let’s see its code implementation:

    def groupMetadataValue(
      groupMetadata: GroupMetadata,  // consumer group metadata object
      assignment: Map[String, Array[Byte]], // partition consumption assignment
      apiVersion: ApiVersion // Kafka API version
    ): Array[Byte] = {
      // determine the message format version and structure based on the Kafka API version
      val (version, value) = {
        if (apiVersion < KAFKA_0_10_1_IV0)
          (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
        else if (apiVersion < KAFKA_2_1_IV0)
          (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
        else if (apiVersion < KAFKA_2_3_IV0)
          (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
        else
          (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
      }
      // sequentially write the main metadata information of the consumer group
      // including protocol type, Generation ID, partition assignment strategy, and leader member ID
      value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
      value.set(GENERATION_KEY, groupMetadata.generationId)
      value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
      value.set(LEADER_KEY, groupMetadata.leaderOrNull)
      // write the timestamp of the most recent state change
      if (version >= 2)
        value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault)
      // write the metadata information of each member
      // including member ID, client.id, host name, and session timeout
      val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
        val memberStruct = value.instance(MEMBERS_KEY)
        memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
        memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
        memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
        memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
        // write the rebalance timeout
        if (version > 0)
          memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
        // write the Group Instance ID for managing static consumer groups
        if (version >= 3)
          memberStruct.set(GROUP_INSTANCE_ID_KEY, memberMetadata.groupInstanceId.orNull)
        // the partition assignment strategy must be defined, otherwise an exception is thrown
        val protocol = groupMetadata.protocolName.orNull
        if (protocol == null)
          throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol")
        // write the member consumer subscription information
        val metadata = memberMetadata.metadata(protocol)
        memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
        val memberAssignment = assignment(memberMetadata.memberId)
        assert(memberAssignment != null)
        // write the member consumer assignment information
```scala
    memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
    memberStruct
  }
  value.set(MEMBERS_KEY, memberArray.toArray)
  // Write the version information and the metadata information written above to the Buffer in turn
  val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
  byteBuffer.putShort(version)
  value.writeTo(byteBuffer)
  // Return the underlying byte array of the Buffer
  byteBuffer.array()
}

The code is quite long, so I will help you understand the logic of this method with the help of a diagram.

In step 1, the code determines which format version to use based on the apiVersion field passed in, and creates a corresponding version of the Struct to store this metadata. The value of apiVersion is the value of the broker-side parameter inter.broker.protocol.version. If you open the Kafka official website, you can see that this parameter always points to the latest version of Kafka.

In step 2, the code writes the consumer group’s protocol type, generation ID, partition assignment strategy, and leader member ID to the struct in turn. As I mentioned when discussing GroupMetadata, for a regular consumer group, the protocol type is the string “consumer” and the partition assignment strategy may be “range”, “round-robin”, etc. After that, the code also writes the timestamp of the most recent change in consumer group status to the struct for format versions ≥2.

In step 3, the code iterates over all the members of the consumer group, creates a dedicated struct object for each member, and writes the member’s ID, Client ID, hostname, and session timeout information to the struct in turn. For structs with format versions ≥0, the code also writes the member’s configured rebalance timeout, and for structs with format versions ≥3, the code also writes the Group Instance ID used for static consumer group management. After all of this is done, the groupMetadataValue method must ensure that the consumer group has selected a partition assignment strategy, otherwise it will throw an exception. Then, the method writes the member’s subscription information and consumer assignment information to the struct in turn.

In step 4, the code writes the version information and the metadata information written above to a buffer in turn, and returns the underlying byte array of the buffer. This is the end of the method logic.

That’s all for the explanation of the registered message Key and Value. To help you understand what data the registered message actually contains, I will show you another diagram that shows its composition.

This diagram summarizes the content of the registered message that the groupMetadataKey and groupMetadataValue methods generate. The gray rectangles represent optional fields that may not be included in the Value.

Committed Offset Message #

Next, let’s learn about the composition of the committed offset message Key and Value.

The OffsetKey class defines the Key value of the committed offset message, as shown in the following code:

case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
  override def toString: String = key.toString
}

As you can see, the Key of this type of message is a GroupTopicPartition, which is a triplet of <consumer group name, topic, partition number>.

The offsetCommitKey method is responsible for converting this triplet into a byte array for constructing the committed offset message.

def offsetCommitKey(
  group: String,  // Consumer group name
  topicPartition: TopicPartition // Topic + partition number
): Array[Byte] = {
  // Create a struct and write the consumer group name, topic, and partition number in turn
  val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
  key.set(OFFSET_KEY_GROUP_FIELD, group)
  key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
  key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
  // Construct ByteBuffer, write format version and struct
  val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
  byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
  key.writeTo(byteBuffer)
  // Return the byte array
  byteBuffer.array()
}
      byteBuffer.array()
    }
    

This method receives data from the triplet and creates a struct object, writing the consumer group name, topic, and partition number in order. Then, it constructs a ByteBuffer, writes the format version and struct, and finally returns the underlying byte array.

Now that we've covered Key, let's look at the definition of Value.

The offsetCommitValue method determines which elements are present in the Value. Let's take a look at its code together. Here, I only list the struct object corresponding to the latest version, and the elements to be written for other versions are similar. You can read the struct content of other versions offline, which is the code I omitted under the if branch.

```scala
    def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
                          apiVersion: ApiVersion): Array[Byte] = {
      // Determine the message format version and create the corresponding struct object
      val (version, value) = {
        if (......) {
          ......
        } else {
          val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V3)
          // Write the offset value, leader epoch value, custom metadata, and timestamp in order
          value.set(
            OFFSET_VALUE_OFFSET_FIELD_V3, offsetAndMetadata.offset)
          value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3,
     offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
          value.set(OFFSET_VALUE_METADATA_FIELD_V3, offsetAndMetadata.metadata)
          value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3, offsetAndMetadata.commitTimestamp)
    (3, value)
        }
      }
      // Construct the ByteBuffer, write the message format version and struct
      val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
      byteBuffer.putShort(version.toShort)
      value.writeTo(byteBuffer)
      // Return the underlying byte array of the ByteBuffer
      byteBuffer.array()
    }

The offsetCommitValue method first determines the message format version and creates the corresponding struct object. For the current latest version V3, the elements of the struct include the offset value, leader epoch value, custom metadata, and timestamp. If we use the Java Consumer API, then when committing offsets, this custom metadata is generally empty.

Next, it constructs a ByteBuffer and writes the message format version and struct.

Finally, it returns the underlying byte array of the ByteBuffer.

Compared to the message body of registering messages, the Value of offset commit messages is much simpler. I’ll use another diagram to show the composition of the Key and Value of offset commit messages.

Tombstone Message #

There is another type of message in the Kafka source code related to offset topics, which is the Tombstone message. In fact, there is nothing special about it, it is simply a message with a null Value. Therefore, both registering messages and offset commit messages have corresponding Tombstone messages. The main purpose of this message is to allow Kafka to identify which messages corresponding to the Key can be deleted. With it, Kafka can ensure that the internal offset topic does not continue to increase disk usage.

You can see the following two lines of code, which represent the Tombstone messages corresponding to the two types of messages.

    // Tombstone message corresponding to offset commit message 
    tombstones += new SimpleRecord(timestamp, commitKey, null)
    // Tombstone message corresponding to register message 
    tombstones += new SimpleRecord(timestamp, groupMetadataKey, null)

Regardless of which type of message, their Value field is null. Once a Tombstone message appears in the registering message, it means that Kafka can delete the consumer group metadata from the offset topic; once a Tombstone appears in the offset commit message, it means that Kafka can delete the offset commit data of the consumer group on a certain topic partition.

How to determine the Coordinator? #

Next, let’s take a look at the relationship between the offset topic and the consumer group Coordinator. The Coordinator component is the only component that operates on the offset topic internally.

When each Broker starts up, it also starts the Coordinator component. However, a consumer group can only be managed by one Coordinator component. So how does Kafka determine which Broker’s Coordinator component serves a specific consumer group? The answer lies in selecting the Broker on which the leader replica of a specific partition of the offset topic resides as the Coordinator for the designated consumer group.

So how is this specific partition calculated? Let’s take a look at the code of the partitionFor method in the GroupMetadataManager class:

def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

As you can see, the absolute value of the modulus result of the hash value of the consumer group name and the number of partitions of the offset topic is the target partition to which the consumer group will write.

For example, assuming the offset topic has 50 partitions by default, and our consumer group name is “testgroup”, then the result of Math.abs("testgroup".hashCode % 50) is 27. Therefore, the target partition number is 27. This means that the registration message and the offset commit message of this consumer group will be written to partition 27 of the offset topic, and the Broker where the leader replica of partition 27 resides becomes the Coordinator for this consumer group.

Summary #

The internal offset topic in Kafka is an important tool used by the coordinator to store and record consumer group information. Specifically, the consumer group information includes consumer group metadata and committed offsets, which correspond to the registration message and committed offset message in the offset topic we discussed today. The former defines the metadata of the consumer group, including group name, member list, and partition consumption assignment plan; the latter is the offset values ​​submitted by each member of the consumer group. These two pieces of information together constitute the message types in the offset topic.

In addition to message types, I also introduced the code for determining the coordinator on the consumer group side. Once you understand this, when there is a problem with a member of your consumer group next time, you will know where to look for the corresponding logs on which broker.

Let’s review the key points of this lesson.

  • Offset Topic: namely __consumer_offsets. This topic is an internal topic with 50 partitions by default, and Kafka is responsible for creating it, so you don’t need to manually create it.
  • Message Types: The offset topic is divided into registration messages and committed offset messages.
  • Tombstone Messages: Offset topic messages with null value used to clear the committed offsets and registration information of consumer groups.
  • Coordinator Determination Principle: The absolute value of the hash value of the consumer group name modulo the number of partitions of the offset topic is the target partition, and the broker where the leader replica of the target partition is located is the coordinator.

We have defined the message format and clarified the coordinator. Next, the coordinator will perform read and write operations on the offset topic. Specifically, we will construct the two types of messages we have learned today, serialize them into byte arrays, write them into the offset topic, and read out the byte arrays from the offset topic and deserialize them into the corresponding message types. In the next lesson, we will study this problem together.

After-class discussion #

Please use the kafka-console-consumer script to read the committed offset messages from the offset topic in your online environment based on today’s content. Then, combine it with the source code of the readOffsetMessageValue method to explain the meaning of each field in the output.

Feel free to write down your thoughts and answers in the comments section to discuss with me. You are also welcome to share today’s content with your friends.