28 Consumer Group Metadata How Kafka Manages These Metadata

28 Consumer Group Metadata How Kafka Manages These Metadata #

Hello, I’m Hu Xi. Today, we will continue learning about consumer group metadata.

After studying in the previous lesson, we know that Kafka defines a large amount of metadata, which inevitably involves the issue of managing this metadata.

Since the types of these metadata are different, the management strategies will vary. In this lesson, I will break down these metadata from four dimensions: consumer group state, members, offsets, and partition assignment strategies. I will take you through the methods Kafka uses to manage this metadata.

These methods are defined in the MemberMetadata and GroupMetadata classes. Among them, the methods in the GroupMetadata class are the most important and will be the focus of our study. In the upcoming lessons, you will see that these methods are frequently called by the higher-level component, GroupCoordinator. Therefore, understanding them is a prerequisite for studying the code of the Coordinator component. Be sure to spend some extra effort to comprehend them.

Consumer Group State Management Methods #

Consumer group state is an important type of metadata. The methods for managing this state involve setting and querying. Since these methods are mostly simple, I have summarized them together and will introduce them to you directly.

// GroupMetadata.scala
// Method for setting/updating state
def transitionTo(groupState: GroupState): Unit = {
  assertValidTransition(groupState) // Ensure a valid state transition
  state = groupState  // Set the state to the given state
  currentStateTimestamp = Some(time.milliseconds()) // Update the state change timestamp
}
// Method for querying state
def currentState = state
// Method for checking if the consumer group state is a specified state
def is(groupState: GroupState) = state == groupState
// Method for checking if the consumer group state is not a specified state
def not(groupState: GroupState) = state != groupState
// Method for checking if the consumer group is eligible for rebalance, based on the current state being a valid previous state for PreparingRebalance
def canRebalance = PreparingRebalance.validPreviousStates.contains(state)

1. The transitionTo method

The purpose of the transitionTo method is to change the consumer group state to the given state. Before the transition, the code needs to ensure that this change is a valid state transition. This is achieved by the validPreviousStates collection defined in each implementation class of GroupState. Only the states in this collection can be valid previous states for the current state. In simple terms, only these states in the collection can be transitioned to the current state.

In addition, this method will also update the timestamp field for state changes. Kafka has a scheduled task that periodically cleans up expired consumer group offset data, and it relies on this timestamp field to determine expiration.

2. The canRebalance method

This method is used to determine if the consumer group is eligible for a rebalance operation. The criterion is whether the current state is a valid previous state for PreparingRebalance. Only consumer groups with states categorized as Stable, CompletingRebalance, and Empty are eligible to initiate a rebalance.

3. The is and not methods

As for the is and not methods, they respectively check if the consumer group state matches or does not match the given state. They are mainly used for performing state validation. In particular, the is method is extensively used in the upper-level calling code to perform pre-state validation for various consumer group management tasks.

Overall, these are the methods used for managing consumer group state data, and they are quite simple, right?

Member Management Methods #

Before introducing the methods for managing consumer group members, let me first remind you of the fields that store member information in GroupMetadata. The members field in GroupMetadata is a HashMap where the key is the member ID and the value is of type MemberMetadata, which contains metadata information about the member.

Managing members involves adding members (add method), removing members (remove method), and querying members (has, get, size methods, etc.). Let’s learn about each of them one by one.

Adding Members #

First, let’s talk about the method for adding members: add. The main logic of the add method is to add the member object to the members field and update other necessary metadata, such as the Leader member field, supported vote count for partition assignment strategy, etc. The following is the source code of the add method:

def add(member: MemberMetadata, callback: JoinCallback = null): Unit = {
  // If it's the first member to be added
  if (members.isEmpty)
    // Set the member's protocolType to the consumer group's protocolType
    this.protocolType = Some(member.protocolType)
  // Ensure that the groupId in the member metadata is the same as the group's groupId
  assert(groupId == member.groupId)
  // Ensure that the protocolType in the member metadata is the same as the group's protocolType
  assert(this.protocolType.orNull == member.protocolType)
  // Ensure that the partition assignment strategy selected by the member matches the one selected by the group
  assert(supportsProtocols(member.protocolType, MemberMetadata.plainProtocolSet(member.supportedProtocols)))
  // If the Leader member has not been selected yet
  if (leaderId.isEmpty)
    // Set this member as the Leader member
    leaderId = Some(member.memberId)
  // Add the member to members
  members.put(member.memberId, member)
  // Update the supported vote count for partition assignment strategy
  member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1 }
  // Set the callback logic after the member joins the group
  member.awaitingJoinCallback = callback
  // Update the number of members that have joined the group
  if (member.isAwaitingJoin)
    numMembersAwaitingJoin += 1
}

Let me draw a flowchart to help you understand the function of this method more intuitively.

Now let me explain the execution logic of this method in detail.

First, the add method checks whether the members field contains existing members. If not, it means that the member to be added is the first member of the consumer group. In this case, the member’s protocolType is set to the group’s protocolType. As I mentioned in the previous lesson, for ordinary consumers, protocolType is simply set to the string “consumer”. If it is not the first member, it moves on to the next step.

Second, the add method performs three consecutive validations to ensure that the group ID, protocolType, and partition assignment strategy selected by the member match the group configuration. If any of these validations fail, an exception will be thrown immediately.

Third, it checks whether the Leader member of the consumer group has been selected or not. If not, it sets this member as the Leader member. However, if the Leader has already been selected, this step is naturally skipped. It’s important to note that the Leader here is not the same concept as the Leader replica we learned about when studying the replica manager. The Leader member here refers to a member within the consumer group. This member is responsible for creating a partition assignment plan for all members based on the partition assignment strategy selected by the consumer group.

Fourth, it updates the supported vote count for the consumer group’s partition assignment strategy. I explained the meaning of the supportedProtocols field at the end of the previous lesson using an example, so I won’t repeat it here. If you don’t remember, you can review it again.

Finally, it sets the callback logic after the member joins the group and updates the number of members that have joined the group. With that, the method is completed.

As one of the key member management methods, the add method is crucial in implementing the consumer group’s Rebalance process. Whenever the Rebalance process starts its first major step - joining the group operation, it essentially utilizes this add method to implement the logic of adding new members to the group.

Removing Members #

Since there is an add method, there is naturally a remove method as well. The following is the complete source code of the remove method:

def remove(memberId: String): Unit = {
  // Remove the given member from members
  members.remove(memberId).foreach { member =>
    // Update the supported vote count for the partition assignment strategy
    member.supportedProtocols.foreach{ case (protocol, _) => supportedProtocols(protocol) -= 1 }
    // Update the number of members that have joined the group
    if (member.isAwaitingJoin)
      numMembersAwaitingJoin -= 1
  }
  // If the member is the Leader, select the first member from the remaining list as the new Leader member
  if (isLeader(memberId))
    leaderId = members.keys.headOption
}

The remove method is simpler than the add method. First, the code removes the given member from members. Then, it updates the supported vote count for the partition assignment strategy, as well as the number of members that have joined the group. Finally, the code checks if the member is the Leader member. If so, it selects the first member from the remaining list as the new Leader member.

Querying Members #

There are many methods for querying members, most of which are simple scenarios. Here, I’ll introduce three more common ones.

def has(memberId: String) = members.contains(memberId)
def get(memberId: String) = members(memberId)
def size = members.size
  • The has method checks if the consumer group contains a specific member.
  • The get method retrieves the specified member object.
  • The size method counts the total number of members.

Other query methods have simple logic as well, such as allMemberMetadata, rebalanceTimeoutMs, and so on. I won’t explain them in detail here. You can read them on your own after the class, with the focus on understanding what these methods do using the members field.

Offset Management Methods #

In addition to group status and member management, GroupMetadata has another type of management function, which is to manage the committed offsets of consumer groups, mainly including adding and removing offset values.

However, before learning about the functionality of offset management, let’s review the definition of the offsets field, where the offset is saved. After all, the methods we are going to learn mainly operate on this field.

private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]

It is of HashMap type, where the Key is of TopicPartition type, representing a topic partition, and the Value is of CommitRecordMetadataAndOffset type. This class encapsulates the offset value of the offset commit message.

Before delving into the details of the offset management methods, let me first explain the terms “offset” and “offset commit message” used here.

Consumer groups need to submit the progress of consumed messages to the Coordinator. In Kafka, this progress has a specialized term called “committed offset”. Kafka uses the committed offset to locate the next message to be consumed by the consumer group. So, how is the committed offset saved on the Coordinator side? It is actually saved in the internal offset topic. The way it is submitted is that the consumer group members write event messages that meet specific formats to the internal topic. These messages are called offset commit messages. I will provide a specific introduction to the event format of offset commit messages in Lesson 30, but you don’t need to pay attention to it for now. And the CommitRecordMetadataAndOffset class mentioned here is the place where the offset commit message is marked. Let’s take a look at its code:

case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {
  def olderThan(that: CommitRecordMetadataAndOffset): Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get
}

This class has two constructor parameters.

  • appendedBatchOffset: It saves the offset value of the offset topic message itself.
  • offsetAndMetadata: It saves the consumer group’s offset value saved in the offset commit message.

Adding Offset Values #

In GroupMetadata, there are three methods for adding consumed offset values to the offsets map: initializeOffsets, onOffsetCommitAppend, and completePendingTxnOffsetCommit.

The code for the initializeOffsets method is very simple, as shown below:

def initializeOffsets(
  offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
  pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
  this.offsets ++= offsets
  this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
}

It simply adds the given set of consumed offset values for subscribed partitions to the offsets. Of course, it also updates the pendingTransactionalOffsetCommits field at the same time.

However, since this field is used by Kafka’s transaction mechanism, you only need to pay attention to the first line of this method. When the coordinator component of the consumer group starts, it creates an asynchronous task to periodically read the committed offset data of the corresponding consumer group from the offset topic and load them into the offsets field.

Let’s take a look at the code for the second method, onOffsetCommitAppend:

def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset): Unit = {
  if (pendingOffsetCommits.contains(topicPartition)) {
    if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
      throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +
        "in the log.")
    // If the offset commit data for the partition is not in the offsets,
    // or the appended offset value of the offset commit message for the partition in the offsets 
    // is less than the offset value to be written.
    if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
      // Add the offset commit message for the partition to the offsets.
      offsets.put(topicPartition, offsetWithCommitRecordMetadata)
  }
  pendingOffsetCommits.get(topicPartition) match {
    case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>
      pendingOffsetCommits.remove(topicPartition)
    case _ =>
  }
}

This method is called after the offset commit message is successfully written. The main criteria for judgment are whether the offset commit message for the partition is already included in the offsets, or in other words, whether the appended offset value of the offset commit message for the partition in the offsets is less than the offset value to be written. If so, the offset value that has already been committed for the topic is added to the offsets.

The purpose of the third method, completePendingTxnOffsetCommit, is to complete a pending transaction. A pending transaction refers to an ongoing transaction that has not been completed yet. During the process of handling pending transactions, there may be cases where the offset values of partitions involved in the pending transactions are added to the offsets. However, since this method is closely related to Kafka’s transaction, you don’t need to pay much attention to it, so I won’t go into the details here.

Removing Offset Values #

The consumed offset values of subscribed partitions in the offsets map can also be removed. Do you remember that Kafka messages have default retention time settings? Since the offset topic is just a typical Kafka topic, it also follows the corresponding rules. If the difference between the current time and the timestamp of the committed offset message exceeds the offsets.retention.minutes parameter on the broker side, Kafka will remove this record from the offsets map. This is what the removeExpiredOffsets method does.

This method’s code is a bit long, so to help you understand it more easily, I will introduce it by sections. First, let’s understand its nested inner class method getExpiredOffsets, and then we will delve into its implementation logic, so that you can easily grasp the code principle of removing offset values in Kafka.

First, this method defines an inner nested method called getExpiredOffsets, which is specifically used to retrieve expired offset values for subscribed partitions. Let’s read the source code to see how it works:

def getExpiredOffsets(
  now: Long,
  groupMaxMetadataIdleTimeMs: Long,
  offsetsRetentionCheckIntervalMs: Long,
  pendingToDeletePartitionOffsetsEnabled: Boolean): Map[TopicPartition, CommitRecordMetadataAndOffset] = {
  val startMs = SystemTime.milliseconds()
  val expiredOffsets = mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]
  val intervalMs = offsetsRetentionCheckIntervalMs.min(groupMaxMetadataIdleTimeMs)

  // Calculate the deadline for expiring offset values
  var currentMs = startMs
  val deadlineMs = currentMs + intervalMs

  // Loop through all partitions in the offsets map
  offsets.foreach { case (topicPartition, commitMetadataOffset) =>
    // Calculate the expiration time of the offset value of the partition
    val expireMs = commitMetadataOffset.offsetAndMetadata.expireTimestamp
    currentMs = SystemTime.milliseconds()

    if (expireMs < currentMs) {
      // If the offset value has expired, add it to the expiredOffsets map
      expiredOffsets.put(topicPartition, commitMetadataOffset)
    }
  }

  expiredOffsets.toMap
}

This method takes several parameters to determine the expiration time of the offset values.

  • now: The current timestamp.
  • groupMaxMetadataIdleTimeMs: The maximum idle time allowed for group metadata.
  • offsetsRetentionCheckIntervalMs: The interval for checking the retention of offset values.
  • pendingToDeletePartitionOffsetsEnabled: A flag indicating whether pending offset values can be deleted.

The method uses mutable.Map to store the partitions with expired offset values, and at the end, it returns the expiredOffsets map converted to an immutable Map.

Now let’s explore the implementation logic of the removeExpiredOffsets method. This method also takes the same parameters as the getExpiredOffsets method, and it is used to remove the expired offset values from the offsets map:

private def removeExpiredOffsets(
  now: Long,
  groupMaxMetadataIdleTimeMs: Long,
  offsetsRetentionCheckIntervalMs: Long,
  pendingToDeletePartitionOffsetsEnabled: Boolean): Unit = {
  val expiredOffsets = getExpiredOffsets(
    now,
    groupMaxMetadataIdleTimeMs,
    offsetsRetentionCheckIntervalMs,
    pendingToDeletePartitionOffsetsEnabled
  )
  offsets --= expiredOffsets.keys
}

This method utilizes the getExpiredOffsets method to retrieve the expired offset values. It then removes the corresponding entries from the offsets map using the --= operator.

That’s all for the explanation of the offset management methods. I hope you have a good understanding of how offset values are added and removed in Kafka. // Method definition def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long): Map[TopicPartition, OffsetAndMetadata] = { // Call the getExpiredOffsets method to get expired offsets for topic partitions val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match { case Some() if is(Empty) => getExpiredOffsets(commitRecordMetadataAndOffset => currentStateTimestamp.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp)) case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined => getExpiredOffsets(.offsetAndMetadata.commitTimestamp, subscribedTopics.get) case None => getExpiredOffsets(_.offsetAndMetadata.commitTimestamp) case _ => Map() }

  // Print debug message if there are expired offsets
  if (expiredOffsets.nonEmpty)
    debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}")

  // Remove the topic partitions corresponding to the expired offsets from offsets
  offsets --= expiredOffsets.keySet

  // Return the expired offsets for topic partitions
  expiredOffsets
}

Partition Allocation Strategy Management Method #

Finally, let’s discuss the management of partition allocation strategy for consumer group, which is the management of the “supportedProtocols” field. The “supportedProtocols” field represents the number of votes for a partition allocation strategy, and it is updated when members are added or removed.

Every time a consumer group performs rebalancing, it needs to determine which partition allocation strategy to use after the rebalance. Therefore, specific methods are needed to calculate the votes for each strategy and select the strategy with the most votes.

The GroupMetadata class defines two methods to accomplish this: candidateProtocols and selectProtocol.

Determine the Supported Partition Allocation Strategy Set for a Consumer Group #

Let’s first take a look at the candidateProtocols method. Its purpose is to find the set of partition allocation strategies that are supported by all members of the group. Here is the code:

private def candidateProtocols: Set[String] = {
  val numMembers = members.size // Get the total number of members in the group
  // Find the strategies with votes equals to the total number of members and return their names
  supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
}

This method first retrieves the total number of members in the group. Then, it filters out the strategies in supportedProtocols that have votes equal to the total number of members, and returns their names. Votes equal to the total number of members means that all members support that particular strategy.

Select the Partition Consumption Allocation Strategy for a Consumer Group #

Next, let’s look at the selectProtocol method. Its purpose is to select the partition consumption allocation strategy for a consumer group.

def selectProtocol: String = {
  // If there are no members, it is not possible to determine which strategy to select.
  if (members.isEmpty)
    throw new IllegalStateException("Cannot select protocol for empty group")
  
  // Get the set of strategies supported by all members
  val candidates = candidateProtocols
  
  // Let each member vote, and the strategy with the most votes is selected
  val (protocol, _) = allMemberMetadata
    .map(_.vote(candidates))
    .groupBy(identity)
    .maxBy { case (_, votes) => votes.size }
  
  protocol
}

This method first checks if there are any members in the group. If there are no members, it is not possible to determine which strategy to select, so the method throws an exception and exits. Otherwise, the code calls the candidateProtocols method to get the set of strategies supported by all members. Then, it lets each member vote, and selects the strategy with the most votes.

You may wonder how the vote method is implemented. It is actually a simple lookup operation. Let me give you a simple example to help you understand.

For example, if the value of the candidates field is [“Strategy A”, “Strategy B”], and Member 1 supports [“Strategy B”, “Strategy A”], Member 2 supports [“Strategy A”, “Strategy B”, “Strategy C”], and Member 3 supports [“Strategy D”, “Strategy B”, “Strategy A”], then the vote method will compare the candidates with the support list of each member and find the first strategy in the support list that is also in the candidates. So, for this example, Member 1 votes for Strategy B, Member 2 votes for Strategy A, and Member 3 votes for Strategy B. As you can see, the voting result is 2 votes for Strategy B and 1 vote for Strategy A. Therefore, the selectProtocol method returns Strategy B as the new strategy.

One thing to note is that the order of strategies in the support list of a member matters. This means that [“Strategy B”, “Strategy A”] and [“Strategy A”, “Strategy B”] are different, and members tend to choose the strategy that appears earlier in the list.

Summary #

Today, we combined the source code of GroupMetadata and learned about Kafka’s management of consumer group metadata, mainly including group status, members, offsets, and partition assignment strategies. I suggest that you carefully read these methods for managing metadata again after class, and follow the source code and comments to go through the complete operation process.

In addition, in these two lessons, I didn’t talk about the management of the “Pending Members” and “Pending Offsets” in the pending state because these two metadata items belong to intermediate temporary states. Therefore, I did not elaborate on them. If you don’t understand this part of the code, it will not affect our understanding of consumer group metadata and how the Coordinator uses them. However, I suggest that you can read the relevant code sections related to them. You should know that Kafka likes to refer to intermediate state variables to manage all kinds of metadata or states.

Now, let’s briefly review the key points of this lesson.

  • Management of consumer group metadata: mainly includes management of group status, members, offsets, and partition assignment strategies.
  • Group status management: the transitionTo method is responsible for setting the status, and the is, not, and get methods are used to query the status.
  • Member management: the add and remove methods are used to increase or decrease members, and the has and get methods are used to query specific members.
  • Partition assignment strategy management: defines the dedicated method selectProtocols, which is used to elect the partition assignment strategy during each round of rebalance.

Image

So far, we have spent two lessons’ time learning the source code of consumer group metadata and its management methods in detail. These methods of manipulating metadata are extensively used by the upper-level caller GroupCoordinator. As I mentioned at the beginning, if we don’t thoroughly grasp the methods of manipulating this metadata now, when we learn the GroupCoordinator code, we will find it a bit difficult. Therefore, you must study these two lessons well. With these foundations, when you learn the source code of GroupCoordinator, you will have a deeper understanding of its underlying implementation principles.

Post-discussion #

In the discussion about MemberMetadata, I mentioned that each member in a consumer group has their own setting for Rebalance timeout. So, how does Kafka determine which member’s timeout to use as the overall group timeout?

Please feel free to share your thoughts and answer in the comments section. Let’s discuss and exchange ideas. Also, don’t hesitate to share today’s content with your friends.