27 Consumer Group Metadata What Metadata Exists in a Consumer Group

27 Consumer Group Metadata What Metadata Exists in a Consumer Group #

Hello, I’m Hu Xi. Starting from today’s lesson, we will dive into the source code of the last module: Consumer Group Management.

In this module, I will guide you through the detailed source code implementation of Kafka consumer groups on the broker side, including the definition and management of consumer group metadata, group metadata manager, the internal topic “__consumer_offsets,” and the important component GroupCoordinator.

First, let me give you a brief introduction to the functionalities of these four parts to help you gain a general understanding of consumer group management.

  • Consumer Group Metadata: This part of the source code mainly includes GroupMetadata and MemberMetadata. These two classes together define the metadata of consumer groups and what it consists of.
  • Group Metadata Manager: Defined by the GroupMetadataManager class, it can be regarded as the management engine for consumer groups, providing functions for creating, deleting, updating, and querying consumer groups.
  • __consumer_offsets: Kafka’s internal topic. In addition to its well-known functionality of storing consumer group offset commit records, it is also responsible for storing consumer group registration record messages.
  • GroupCoordinator: The component responsible for group coordination, providing common functionalities for group member management and offset management.

I have summarized the functionalities of these four parts into a mind map, which you can save for reference at any time:

Today, we will first learn about the source code implementation of consumer group metadata. This forms the foundation for understanding the working mechanism of consumer groups and delving into the consumer group management component. Apart from that, does mastering this part of the code have any practical significance for us?

Certainly. I believe you are familiar with the following command, which is a command-line tool to query the state of a consumer group. In the output, we can see data such as GROUP, COORDINATOR, ASSIGNMENT-STRATEGY, STATE, and MEMBERS. In fact, these data are part of the consumer group metadata.

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup --verbose --state

GROUP     COORDINATOR (ID)  ASSIGNMENT-STRATEGY  STATE      #MEMBERS
mygroup  172.25.4.76:9092 (0)      range         Stable        2

As you can see, there is a close connection between the command-line tools we use in our daily work and our knowledge of the source code. Once you understand today’s content, you will have a deeper understanding when using some command-line tools in practice.

Alright, let’s officially begin today’s lesson now.

As I mentioned earlier, the metadata consists mainly of two classes: GroupMetadata and MemberMetadata. They are located in the source code files GroupMetadata.scala and MemberMetadata.scala respectively. From their names, you can already tell that the former is used to store the metadata of consumer groups, while the latter is used to store the metadata of group members.

Since a consumer group can have multiple members, one instance of GroupMetadata corresponds to multiple instances of MemberMetadata. Next, let’s start by learning about the MemberMetadata.scala source file.

Member metadata (MemberMetadata) #

The MemberMetadata.scala source file is located in the coordinator.group package. In fact, all the source code under the coordinator.group package is closely related to consumer group functionality. The following image shows the list of source code files in the coordinator.group package. You can see that MemberMetadata.scala and GroupMetadata.scala, which we will learn later, are included.

From this package structure, we can also see that the source code classes we will learn in the following sections (such as GroupCoordinator and GroupMetadataManager) are also included. Of course, you may have noticed that there is also a transaction package under the coordinator package, which contains all the source code related to Kafka transactions. If you want to study transaction mechanism in depth, you can read the source code under this package.

Now, let’s focus on the MemberMetadata.scala file, which includes three classes and objects.

  • MemberSummary class: The summary data of a group member, which extracts the most essential metadata information. The result returned by the command line tool example above is provided by this class.
  • MemberMetadata companion object: It only defines a utility method for upper-layer components to invoke.
  • MemberMetadata class: The metadata of a consumer group member. Kafka defines a lot of data for consumer group members, which we will learn in detail later.

In terms of difficulty, let’s start with the simplest MemberSummary class.

MemberSummary class #

The MemberSummary class is a summary data class for the metadata of a group member. Its code is essentially a POJO class, which only holds data and does not define any logic. The code is as follows:

case class MemberSummary(
  memberId: String,                 // Member ID generated by Kafka
  groupInstanceId: Option[String],  // Value of the 'group.instance.id' consumer-side parameter
  clientId: String,                 // Value of the 'client.id' parameter
  clientHost: String,               // Hostname of the consumer-side program
  metadata: Array[Byte],            // Allocation strategy used by consumer group members
  assignment: Array[Byte]           // Member subscription partitions
)

As we can see, this class defines six fields, which I will explain in detail.

  • memberId: The ID that identifies the consumer group member. This ID is automatically generated by Kafka and follows the format “consumer-groupID--”. Although there is ongoing discussion in the community about whether to allow users to set their own IDs, currently it is hard-coded and cannot be set by users.
  • groupInstanceId: The ID of a static member of the consumer group. The introduction of the static member mechanism can avoid unnecessary consumer group rebalances. It is a very new and advanced feature, and here you just need to have a basic understanding of its meaning. If you are interested, I suggest you read the explanation of the ‘group.instance.id’ parameter on the official website.
  • clientId: The ‘client.id’ parameter configured for the consumer group member. Since the memberId cannot be set, you can use this field to distinguish different members in the consumer group.
  • clientHost: The hostname of the machine running the consumer program. It records from which machine the consumer request is sent.
  • metadata: A byte array that identifies the partition assignment strategy used by the consumer group member. It is set by the consumer-side parameter ‘partition.assignment.strategy’. The default RangeAssignor strategy assigns partitions equally across topics.
  • assignment: Saves the subscribed partitions assigned to the member. Each consumer group selects a leader consumer group member responsible for allocating the consumption plan to all members. Afterwards, Kafka serializes the pre-determined assignment plan into a byte array, assigns it to ‘assignment’, and distributes it to each member.

In summary, the MemberSummary class is a container for member summary data, similar to a POJO class in Java, and does not involve any operational logic, so it is easy to understand.

MemberMetadata companion object #

Next, let’s learn about the code of the MemberMetadata companion object. It only defines a plainProtocolSet method for upper-layer components to invoke. This method does only one thing, which is to extract the names of the partition assignment strategies from the given set of partition assignment strategy details, wrap them into a collection object, and then return them:

private object MemberMetadata {
  // Extract the set of partition assignment strategies
  def plainProtocolSet(supportedProtocols: List[(String, Array[Byte])]) = supportedProtocols.map(_._1).toSet
}

Let me give an example to explain. If there are three members in a consumer group, and their partition.assignment.strategy parameters are set to RangeAssignor, RangeAssignor, and RoundRobinAssignor, then the return value of the plainProtocolSet method is the collection [RangeAssignor, RoundRobinAssignor]. In fact, it is often used to count how many different partition assignment strategies are configured for the members in a consumer group.

MemberMetadata class #

Now, let’s take a look at the source code of the MemberMetadata class. First, let’s check the constructor and field definitions of this class to understand what metadata a member has.

@nonthreadsafe
private[group] class MemberMetadata(
  var memberId: String,
  val groupId: String,
  val groupInstanceId: Option[String],
  val clientId: String,
  val clientHost: String,
  val rebalanceTimeoutMs: Int,
  val sessionTimeoutMs: Int,
  val protocolType: String,
  var supportedProtocols: List[(String, Array[Byte])]
) {
  var assignment: Array[Byte] = Array.empty[Byte]
  var awaitingJoinCallback: JoinGroupResult => Unit = null
  var awaitingSyncCallback: SyncGroupResult => Unit = null
  var isLeaving: Boolean = false
  var isNew: Boolean = false
  val isStaticMember: Boolean = groupInstanceId.isDefined
  var heartbeatSatisfied: Boolean = false
  ......
}

The MemberMetadata class holds a lot of data. In addition to the 6 fields defined in the MemberSummary class, it also defines 4 new fields in its constructor.

  • rebalanceTimeoutMs: The timeout for rebalance operations. A rebalance operation must be completed within this time limit, otherwise it is considered timed out. The value of this field is the same as the Consumer parameter max.poll.interval.ms.
  • sessionTimeoutMs: The timeout for a session. Current consumer group members rely on heartbeats to stay active. If a heartbeat fails to be sent within the session timeout, the member is considered “offline” and triggers a new round of rebalancing. The value of this field is the same as the Consumer parameter session.timeout.ms.
  • protocolType: The type of protocol. It actually indicates the scenario for which the consumer group is used. There are two specific scenarios: the first is as a regular consumer group, with the value of this field being “consumer”; the second is for consumers in the Kafka Connect component, with the value of this field being “connect”. Of course, it is possible that the community will add new protocol types in the future. But for now, all you need to know is that it is a string value that identifies the application scenario. This field has no significant impact other than that.
  • supportedProtocols: Identifies the multiple sets of partition assignment strategies configured for the member. Currently, the Consumer parameter partition.assignment.strategy is of type List, indicating that you can set multiple allocation strategies for consumer group members. Therefore, this field is also of type List, with each element being a tuple. The first element of the tuple is the strategy name, and the second element is the serialized strategy details.

In addition to the 10 fields in the constructor, this class defines 7 additional fields for storing metadata and determining states. These additional fields are all var type variables, indicating that their values can be changed. The MemberMetadata source code relies on these fields to continuously adjust the metadata and states of the group members.

I have selected 5 important additional fields to introduce to you.

  • assignment: Stores the partition assignment scheme allocated to this member.
  • awaitingJoinCallback: Indicates whether the member is waiting to join the group.
  • awaitingSyncCallback: Indicates whether the member is waiting for the GroupCoordinator to send the assignment scheme.
  • isLeaving: Indicates whether the member has initiated an operation to leave the group.
  • isNew: Indicates whether the member is a new member of the consumer group.

That concludes the constructor and field definitions of the MemberMetadata class. The methods defined in this class are used to manipulate this metadata, and most of them are simple operations. Here, I have selected the metadata method to familiarize you with its coding style. You can read the code of other methods on your own after class to understand how they work.

Let’s take a look at the code of the metadata method:

def metadata(protocol: String): Array[Byte] = {
  supportedProtocols.find(_._1 == protocol) match {
    case Some((_, metadata)) => metadata
    case None =>
      throw new IllegalArgumentException("Member does not support protocol")
  }
}

The logic implemented by this method is to find the details of the given protocol from the list of partition assignment schemes configured for this member. If found, it simply returns the details as a byte array, otherwise, it throws an exception. Simple, isn’t it?

GroupMetadata (GroupMetadata class) #

After discussing the GroupMemberMetadata class, let’s move on to the GroupMetadata class. It is located in the same named Scala file under the coordinator.group package.

GroupMetadata manages the metadata of a consumer group at the group level, rather than at the individual member level. Therefore, its code structure is more complex than the MemberMetadata class. First, let me draw a mind map to help you understand its code structure.

In general, the GroupMetadata.scala file consists of six parts.

  • GroupState class: Defines the state space of a consumer group. There are currently five states: Empty, PreparingRebalance, CompletingRebalance, Stable, and Dead. Empty represents a consumer group with no members. PreparingRebalance represents a consumer group that is in the process of joining. CompletingRebalance represents a consumer group waiting for the leader member to assign the partition assignment strategy. Stable represents a consumer group that has completed a rebalance operation and is functioning normally. Dead represents a consumer group with no members and its metadata has been deleted.
  • GroupMetadata class: The group metadata class. This is the most important class in this Scala file and the focus of our study today.
  • GroupMetadata companion object: This object provides methods for creating instances of the GroupMetadata class.
  • GroupOverview class: Defines a very brief overview of a consumer group.
  • GroupSummary class: Similar to the MemberSummary class, it defines a summary of the consumer group.
  • CommitRecordMetadataAndOffset class: Saves the offset value of messages written to the offset topic, as well as other metadata. This class is mainly responsible for saving the offset value, so I won’t go into detail about its code.

Next, let’s see what metadata information is stored in each of these code structures. Let’s start with the simplest class, GroupState.

GroupState class and implementation objects #

The GroupState class defines the state of a consumer group. The code for this class and its implementation object, Stable, is as follows:

// GroupState trait
private[group] sealed trait GroupState {
  // Valid previous states
  val validPreviousStates: Set[GroupState]
}
// Stable state
private[group] case object Stable extends GroupState {
  val validPreviousStates: Set[GroupState] = Set(CompletingRebalance)
}
...

Here I am only showing the code for the Stable state, but the code for the other four states is similar. To help you understand the state transition between consumer groups, I have created a complete state transition diagram.

You need to remember that the state transitions for a consumer group from creation to normal operation are: Empty -> PreparingRebalance -> CompletingRebalance -> Stable.

GroupOverview class #

Next, let’s look at the code for the GroupOverview class. As I mentioned earlier, this class provides a very brief overview of a group. When we run kafka-consumer-groups.sh --list in the command line, Kafka creates an instance of GroupOverview and returns it to the command line.

Here is the code for the GroupOverview class:

case class GroupOverview(
  groupId: String,      // Group ID, i.e., the group.id parameter value
  protocolType: String, // Protocol type of the consumer group
  state: String)        // State of the consumer group

Doesn’t it look simple? GroupOverview encapsulates the basic group data, including the group ID, protocol type, and state information. If you are familiar with Java Web development, you can think of the relationship between GroupOverview and GroupMetadata as the relationship between DAO and DTO.

GroupSummary class #

Its purpose is very similar to GroupOverview, except that it stores slightly more data. Let’s take a look at its code:

case class GroupSummary(
  state: String,                  // Consumer group state
  protocolType: String,           // Protocol type
  protocol: String,               // Partition assignment strategy for the consumer group
  members: List[MemberSummary])   // Member metadata


The GroupSummary class has four fields, and their meanings are clear just by looking at the field names. The field you should pay attention to is **members**, which is a list of MemberSummary types and contains metadata information for all members of the consumer group. Through this field, we can see that **there is a one-to-many relationship between the consumer group metadata and the member metadata**.

### GroupMetadata class

Finally, let's take a look at the source code of the GroupMetadata class. First, let's see the fields required by the class constructor and the custom extended metadata:

```scala
@nonthreadsafe
private[group] class GroupMetadata(
  val groupId: String,  // Group ID
  initialState: GroupState, // Initial state of the consumer group
  time: Time) extends Logging {
  type JoinCallback = JoinGroupResult => Unit
  // Group state
  private var state: GroupState = initialState
  // Timestamp of the most recent state change
  var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
  var protocolType: Option[String] = None
  var protocolName: Option[String] = None
  var generationId = 0
  // Record the leader member of the consumer group, may not exist
  private var leaderId: Option[String] = None
  // Member metadata list
  private val members = new mutable.HashMap[String, MemberMetadata]
  // Static member ID list
  private val staticMembers = new mutable.HashMap[String, String]
  private var numMembersAwaitingJoin = 0
  // Supported protocol votes for partition assignment
  private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
  // Save the commit offset values of the consumer group for subscribed partitions
  private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
  // List of topics subscribed by the consumer group
  private var subscribedTopics: Option[Set[String]] = None
  ......
}

The GroupMetadata class defines a large number of fields, and precisely because of this, it stores the most complete data and can be called the consumer group metadata class.

In addition to the fields we have mentioned before, it also defines many other fields. However, some of these fields are either transaction-related metadata or temporary metadata in an intermediate state that are not core metadata and do not require a lot of effort to learn. What we need to focus on is the fields shown in the code above, which are the most important fields of the GroupMetadata class.

  • currentStateTimestamp: Records the timestamp of the most recent state change, used to determine the expiration messages in the offset topic. The messages in the offset topic also follow Kafka’s retention policy, and messages whose difference between the current time and this field exceeds the retention threshold are considered “expired”.
  • generationId: Consumer group generation number. Generation is equivalent to the number of times the consumer group has performed a rebalance operation. The generation number is incremented by 1 every time a rebalance is performed.
  • leaderId: Member ID information of the Leader member in the consumer group. When the consumer group performs a rebalance process, a member needs to be elected as the Leader to be responsible for formulating partition assignment plans for all members. In the early stage of the rebalance, this Leader may not have been elected yet. This is why the leaderId field is of type Option.
  • members: Saves the metadata information of all members in the consumer group. Group metadata is modeled by the MemberMetadata class, so the members field is grouped by Member ID and MemberMetadata.
  • offsets: A HashMap that saves the offset values of offset topic messages grouped by topic partition. The key is the topic partition, and the value is the CommitRecordMetadataAndOffset type we mentioned earlier. When a member of the consumer group submits an offset to Kafka, the source code inserts the corresponding record into this field.
  • subscribedTopics: Saves the list of topics subscribed by the consumer group, used to help filter the offset values of subscribed topic partitions from the offsets field.
  • supportedProtocols: Saves the supported votes for partition assignment strategies. It is a HashMap, where the key is the name of the assignment strategy and the value is the number of supported votes. As we mentioned earlier, each member can choose multiple partition assignment strategies, so if member A chooses [“range”, “round-robin”], B chooses [“range”], and C chooses [“round-robin”, “sticky”], then this field will have three items: <“range”, 2>, <“round-robin”, 2>, and <“sticky”, 1>.

These extended fields and the fields in the constructor together build the complete consumer group metadata. Personally, I think the most important fields in these fields are members and offsets, which respectively store the metadata of all members in the group and the offset values submitted by these members. In this respect, these two parts of data are the three most important aspects that a consumer group cares about: how many members are in the group, what each member is responsible for, and how well they are doing.

Summary #

Today, I took you deep into the source code files GroupMetadata.scala and MemberMetadata.scala, and we learned about the definition of consumer group metadata and group member metadata. They encapsulate all the data of a consumer group and its members. The subsequent GroupCoordinator and other consumer group components will heavily rely on this metadata to manage consumer groups.

In order for you to better grasp today’s content, let’s review the key points of this lesson.

  • Consumer group metadata: It includes group metadata and group member metadata, which are represented by the GroupMetadata and MemberMetadata classes respectively.
  • MemberMetadata class: It stores the metadata of group members, such as group ID, consumer hostname, protocol type, etc. Additionally, it provides the MemberSummary class, which encapsulates the summary information of group member metadata.
  • GroupMetadata class: It stores the group metadata, including group status, group member metadata list, and so on.
  • One-to-many relationship: There is a one-to-many relationship between group metadata and group member metadata. This is because there can be multiple group members under each consumer group.

The logic of this lesson is not particularly complex. We focused on understanding the composition of consumer group metadata, without delving into the operations on the metadata. In the next lesson, we will continue exploring these two Scala files to learn the implementation of methods for manipulating this metadata.

However, I want to emphasize once again that the methods we learned today are the foundation for upper-level components to call. If you want to thoroughly understand the workings of consumer groups, you must first “lay a solid foundation” with this part, so that you can reach the destination of “fully mastering the source code of consumer group implementation” through them.

After-class Discussion #

Please think about it: which item of metadata we learned today does the ASSIGNMENT-STRATEGY in the tool’s command output correspond to?

Feel free to write down your thoughts and answers in the comments section to discuss with me. You are also welcome to share today’s content with your friends.