33 Group Coordinator How to Perform Group Synchronization During Rebalance

33 GroupCoordinator How to Perform Group Synchronization during Rebalance #

Hello, I am Hu Xi. Today, we will continue to study the consumer group Rebalance process. In this lesson, we will focus on the second major step of this process, which is group synchronization.

Group synchronization means that members send a SyncGroupRequest request to the Coordinator and wait for the Coordinator to send them the assignment plan. In the GroupCoordinator class, the entry method responsible for handling this request is handleSyncGroup. It further calls the doSyncGroup method to complete the logic of group synchronization. Besides distributing the assignment plan to the members, the latter also registers the group message in the metadata cache and changes the group status to Stable. Once the group synchronization operation is completed, the Rebalance is declared finished, and the consumer group starts working normally.

Next, let’s learn the implementation logic of the group synchronization process in detail. We will start with the top-level entry method handleSyncGroup, which is called by the handleSyncGroupRequest method of the KafkaApis class to handle the SyncGroupRequest request sent by consumer group members. Following this entry method, we will continuously delve deeper into the private method doSyncGroup that implements the logic of group synchronization.

handleSyncGroup Method #

Let’s start by examining the method signature of handleSyncGroup, the code is as follows:

def handleSyncGroup(
  groupId: String,  // Consumer group name
  generation: Int,  // Consumer group generation number
  memberId: String,  // Consumer group member ID
  protocolType: Option[String],  // Protocol type
  protocolName: Option[String],  // Partition consumption allocation strategy name
  groupInstanceId: Option[String],  // Static member instance ID
  groupAssignment: Map[String, Array[Byte]],  // Group assignment by member grouping
  responseCallback: SyncCallback  // Callback function
  ): Unit = {
  ......
}

This method defines a total of 8 parameters. Let me explain 6 of the more critical parameters.

  • groupId: Consumer group name, indicating which group this member belongs to.
  • generation: Consumer group generation number. Generation is similar to a term and indicates the number of rebalances that the Coordinator is responsible for processing for the consumer group. Every time a new rebalance is initiated, the Generation is automatically incremented by 1.
  • memberId: Consumer group member ID. This field is automatically generated by the Coordinator based on certain rules. The specific rules have been covered in a previous lesson, so I won’t go into them here. In general, the value of memberId is not directly specified by you, but you can indirectly influence its value through the client.id parameter.
  • protocolType: Field that identifies the protocol type. This field can have two possible values: “consumer” and “connect”. For ordinary consumer groups, the value of this field is “consumer”, which is of type Some("consumer"). The consumer group mechanism is also used in Kafka Connect. There, the value of the consumer group would be “connect”.
  • protocolName: Name of the partition consumption allocation strategy selected by the consumer group. The selection method here is the GroupMetadata.selectProtocol method that we learned previously.
  • groupAssignment: Assignment scheme grouped by member ID. It is important to note that only SyncGroupRequest requests sent by leader members include this scheme. Therefore, the groupAssignment field only has a value when the Coordinator is processing requests from leader members.

You might have noticed that protocolType and protocolName are both of type Option. This indicates that their values can be None, meaning they have no value. Why is that?

Currently, the values of these two fields are actually determined by the Coordinator, which is the step before the Rebalance process.

If a member successfully joins a group, the Coordinator assigns the correct values to these two fields and packages them into the response of the JoinGroupRequest to be sent to the consumer program. Once the consumer receives the data from the response, it extracts the values of these two fields and packages them into the SyncGroupRequest to be sent back to the Coordinator.

If a member fails to join the group, the Coordinator assigns None as the value of these two fields and includes them in the response. Therefore, in the handleSyncGroup method, the types of these two fields are Option.

Now that we have covered the method signature of handleSyncGroup, let’s look at its code:

// Validate consumer group status and legality
validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
  // If validation fails and the error is due to Coordinator loading
  // package REBALANCE_IN_PROGRESS exception and call the callback function to return
  case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
    responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
  // If it is another error, package the corresponding error and call the callback function to return
  case Some(error) => responseCallback(SyncGroupResult(error))
  case None =>
    // Get consumer group metadata
    groupManager.getGroup(groupId) match {
      // If not found, package UNKNOWN_MEMBER_ID exception and call the callback function to return
      case None => 
        responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
      // If found, call the doSyncGroup method to perform group synchronization task
      case Some(group) => doSyncGroup(
        group, generation, memberId, protocolType, protocolName,
        groupInstanceId, groupAssignment, responseCallback)
    }
}

To help you understand, I’ve created a flowchart to illustrate the main logic of this method.

The handleSyncGroup method first calls the validateGroupStatus method that we covered in a previous lesson to validate the status and legality of the consumer group. These checks include:

  1. The consumer group name must not be empty.
  2. The Coordinator component must be in a running state.
  3. The Coordinator component is not currently undergoing the loading process.
  4. The SyncGroupRequest is sent to the correct Coordinator component.

The first two checks are easy to understand, so I will focus on explaining the meaning of the last two checks.

When the Coordinator moves to another Broker, it needs to read message data from the internal offset topic and fill it into the consumer group’s metadata cache in memory, which is called loading.

  • If the Coordinator has changed, the request sent to the previous Coordinator’s Broker becomes invalid because it did not pass the fourth check, which is sending to the correct Coordinator.
  • If the request is sent to the correct Coordinator but the Coordinator is currently in the loading process, it means it did not pass the third check because the Coordinator cannot provide services externally until the loading is complete.

The code checks the consumer group against these four validation items one by one. If any of the checks fail, the validateGroupStatus method returns the reason for the failed check as the result. If it fails due to the Coordinator being in the loading process, it means all the statuses of this Rebalance are lost. By status, I mean the member information under the consumer group. In this case, the safest option is to restart the Rebalance from the join group stage. Therefore, the code packages the REBALANCE_IN_PROGRESS exception and calls the callback function to return. Once a member of the consumer group receives this exception, it knows that it has found the correct Coordinator and it only needs to restart the Rebalance, instead of going through the trouble of locating the Coordinator before starting the Rebalance again. However, if it fails due to another error, the error is packaged and the callback function is called to return.

If the consumer group passes all of the above validation checks, the code then retrieves the metadata information of the consumer group. If the metadata is not found, the code packages the UNKNOWN_MEMBER_ID exception and calls the callback function to return. If the metadata is found, it calls the doSyncGroup method to execute the actual group synchronization logic.

Clearly, we should now study the source code of the doSyncGroup method, which is where the group synchronization functionality is implemented.

doSyncGroup Method #

The doSyncGroup method has the same input parameters as its calling method handleSyncGroup, so I won’t go into the details here. Instead, let’s focus on its source code implementation.

Since the code is quite long, I’ve divided it into two parts and provided flowcharts for illustration.

  • Part 1: Mainly performs various checks on the consumer group. If the checks fail, it encapsulates the corresponding exception and sends it back to the callback function.
  • Part 2: Selects different execution logic based on the state of the consumer group. Pay special attention to how the code achieves group synchronization in the CompletingRebalance state.

Let’s start with the flowchart for the first part, to get an overall understanding:

Now, let’s take a look at the code for this part:

if (group.is(Dead)) {
 responseCallback(
   SyncGroupResult(Errors.COORDINATOR_NOT_AVAILABLE))
} else if (group.isStaticMemberFenced(memberId, groupInstanceId, "sync-group")) {
  responseCallback(SyncGroupResult(Errors.FENCED_INSTANCE_ID))
} else if (!group.has(memberId)) {
  responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
} else if (generationId != group.generationId) {
  responseCallback(SyncGroupResult(Errors.ILLEGAL_GENERATION))
} else if (protocolType.isDefined && !group.protocolType.contains(protocolType.get)) {
 responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else if (protocolName.isDefined && !group.protocolName.contains(protocolName.get)) {
 responseCallback(SyncGroupResult(Errors.INCONSISTENT_GROUP_PROTOCOL))
} else {
  // Code for part 2...
}

As you can see, the code is structured neatly, with a series of if-else statements.

First, this part of the code checks if the consumer group’s state is Dead. If so, it means that the group’s metadata has been removed from the Coordinator by another thread, possibly due to a Coordinator change. In this case, the best approach is to reject the group sync operation for the member, encapsulate the COORDINATOR_NOT_AVAILABLE exception, explicitly instruct it to find the latest Coordinator on the Broker node, and then try to rejoin the group.

The isStaticMemberFenced method that comes after this is related to static members, which we can ignore.

Next, the code checks if the member identified by the memberId field belongs to the consumer group. If it doesn’t, it encapsulates the UNKNOWN_MEMBER_ID exception and calls the callback function to return. If it does, it continues with the next check.

After that, the code checks if the member’s generation is the same as the consumer group’s generation. If they are different, it encapsulates the ILLEGAL_GENERATION exception and calls the callback function to return. If they are the same, it continues with the next check.

Next, the code checks if the member and the consumer group have the same protocol type. If they don’t, it encapsulates the INCONSISTENT_GROUP_PROTOCOL exception and calls the callback function to return. If they do, it proceeds to the next step.

Finally, it checks if the member and the consumer group have the same partition assignment strategy. If they don’t, it again encapsulates the INCONSISTENT_GROUP_PROTOCOL exception and returns it to the callback function.

If all of these checks pass, it smoothly moves on to the second part. Before we get into that, let’s first look at the implementation logic through another flowchart:

Once in this part, what the code does entirely depends on the current state of the consumer group. If the consumer group is in the CompletingRebalance state, the code has to do more complex tasks, which we’ll discuss later. For now, let’s see the logic code for other states excluding this state.

group.currentState match {
  case Empty =>
    // Encapsulate the UNKNOWN_MEMBER_ID exception and call the callback function to return
    responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
  case PreparingRebalance =>
    // Encapsulate the REBALANCE_IN_PROGRESS exception and call the callback function to return
    responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
  case CompletingRebalance =>
    // Detailed implementation below...
  case Stable =>
    // Get the member metadata of the consumer group
    val memberMetadata = group.get(memberId)
    // Encapsulate the group protocol type, assignment strategy, and member assignment scheme, and call the callback function to return
    responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
    // Set the next heartbeat time for the member
    completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
  case Dead =>
    // Throw an exception
    throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
}

}

If the current state of the consumer group is Empty or PreparingRebalance, the code will encapsulate the corresponding exception and provide it to the callback function for calling.

If the state is Stable, it means that the consumer group is already in a normal working state and no group synchronization is necessary. Therefore, in this case, simply return the current allocation scheme of the consumer group to the callback function for it to send to the members of the consumer group.

If the state is Dead, it means that this is an exceptional case, because theoretically, group synchronization should not be performed for a group in the Dead state. Therefore, the code can only choose to throw an IllegalStateException exception for the upper layer method to handle.

If none of these states apply, then the consumer group can only be in the CompletingRebalance state, which is the most likely state for the consumer group during group synchronization. Therefore, this part of the logic is more complex. Let’s take a look at the code:

// Set the group synchronization callback function for the consumer group member
group.get(memberId).awaitingSyncCallback = responseCallback
// SyncGroupRequest requests sent by the group leader member need special handling
if (group.isLeader(memberId)) {
  info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
  // If any member has not been assigned any consumption scheme, create an empty scheme for it
  val missing = group.allMembers.diff(groupAssignment.keySet)
  val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap

  if (missing.nonEmpty) {
    warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
  }
  // Save the consumer group information in the consumer group metadata and write it to the internal offset topic
  groupManager.storeGroup(group, assignment, (error: Errors) => {
    group.inLock {
      // If the group state is CompletingRebalance and the member and group generationId are the same
      if (group.is(CompletingRebalance) && generationId == group.generationId) {
        // If there is an error
        if (error != Errors.NONE) {
          // Clear the assignment and send it to all members
          resetAndPropagateAssignmentError(group, error)
          // Prepare to start a new round of Rebalance
          maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
        // If there is no error
        } else {
          // Save the assignment in the consumer group metadata and send it to all members
          setAndPropagateAssignment(group, assignment)
          // Change the consumer group state to Stable
          group.transitionTo(Stable)
        }
      }
    }
  })
  groupCompletedRebalanceSensor.record()
}

In step 1, set the group synchronization callback function for the consumer group member. We always talk about callback functions, but actually, its meaning is very simple, which is to send the data passed to the callback function to the members of the consumer group through the Response.

In step 2, check if the current member is the leader member of the consumer group. If it is not a leader member, the method will end directly because only the leader member’s groupAssignment field carries the allocation scheme, and other members do not have an allocation scheme. If it is a leader member, proceed to the next step.

In step 3, create an empty allocation scheme for members who have not been assigned any partitions, and assign it to these members. The main purpose of this step is to construct a unified format allocation scheme field called assignment.

In step 4, call the storeGroup method to save the consumer group information to the consumer group metadata and write it to the internal offset topic. Once these actions are completed, proceed to the next step.

In step 5, if the group state is CompletingRebalance and the member and group Generation ID are the same, check whether there was an error during the storeGroup operation:

  • If there is an error, clear the assignment and send it to all members, and prepare to start a new round of Rebalance;
  • If there is no error, save the assignment in the consumer group metadata and send it to all members, and change the consumer group state to Stable.

If the group state is not CompletingRebalance, or the member and group Generation ID are not the same, it means that the consumer group may have started a new round of Rebalance, so the assignment should not be sent to the members at this time.

Thus, the group synchronization operation in the CompletingRebalance state is completed. To summarize, the group synchronization operation completes the following three things:

  1. Add the consumer group metadata containing the group member allocation scheme to the consumer group metadata cache and the internal offset topic.
  2. Send the assignment to all members of the group through the SyncGroupRequest response.
  3. Change the consumer group state to Stable.

I suggest you compare the code and find and read the source code that completes these three things on your own. This will not only help you review what you have learned today, but also help you deepen your understanding of the source code. When reading it, think about whether the meaning of this code is really as I said. If you have a different understanding, feel free to write it in the comments, and we can have an open discussion.

Summary #

Today, we focused on the second step of the Rebalance process, which is group synchronization. With this, we have completed the entire Rebalance process.

The Rebalance process is a critical functionality provided by Kafka for consumer groups. Due to its importance, the community continues to improve it, including introducing incremental Rebalance and static members. The Rebalance process we learned in these two lessons is the foundation for understanding these advanced functionalities. If you are unclear about what these steps in the Rebalance process are for, you cannot deeply grasp what the incremental Rebalance or static member mechanisms do.

Therefore, I recommend that you review the content of the previous lesson and thoroughly understand the Rebalance process of consumer groups, including how a group member participates and completes the Rebalance process.

Let’s review the key points of this lesson.

  • Group Synchronization: Members send SyncGroupRequest to the Coordinator to obtain the assignment plan.
  • handleSyncGroup Method: Receives the SyncGroupRequest data sent by KafkaApis and executes the group synchronization logic.
  • doSyncGroup Method: The actual method that executes the group synchronization logic, including group metadata storage, assignment plan distribution, and status changes.

With that, I have finished introducing the source code of the Coordinator component. In this module, we basically followed the learning method of “top-down” and “bottom-up”. We started with the lowest-level consumer group metadata class and gradually moved up to its manager class GroupMetadataManager and top-level class GroupCoordinator. Then, when learning the Rebalance process, we reversed the process, starting with the entry method of the GroupCoordinator class and gradually going down to the GroupMetadataManager, and even lower-level classes such as GroupMetadata and MemberMetadata.

If you have been following the course, you may have noticed that I often use this method to explain the source code. I hope that in your future source code study, you can also try to apply this method. As the saying goes, it is better to take action than to choose the right time. Today, I recommend you an excellent example to practice this method after class.

I suggest you read the source code of the implementation of messages, message batches, and message sets in the clients project, specifically the code of the interfaces and classes such as Record, RecordBatch, and Records. Practice “top-down” and “bottom-up” reading methods repeatedly.

In fact, this method is not only applicable to Kafka source code but can also be used when reading the source code of other frameworks. I hope you can continuously summarize your experiences and ultimately develop a learning method that suits you.

After-class Discussion #

The coordinator does not distribute the allocation scheme of all consumer groups to a single member. This means that member A cannot see the partition consumption allocation scheme of member B. So, can you find out which line of code in the source code achieves this?

Please write your thoughts and answers in the comment section to exchange and discuss with me. Feel free to share today’s content with your friends as well.