32 Group Coordinator How the Coordinator Handles Member Joining During Rebalance

32 GroupCoordinator How the Coordinator Handles Member Joining during Rebalance #

Hello, I’m Hu Xi. Unknowingly, the course is coming to an end. In these last two sessions, let’s learn about how the Rebalance process of consumer groups is completed.

When it comes to Rebalance, your first reaction must be “mixed feelings”. After all, if used properly, it can automatically achieve load balancing and fault tolerance among consumers; but if configured improperly, we may encounter the long-standing defects of Rebalance: long processing time and consumer interruption.

In the practice of using consumer groups, you must want to know how to avoid Rebalance. If you don’t understand the source code mechanism of Rebalance, it’s easy to fall into the “trap” it inadvertently sets.

Let’s take a small example. Some people believe that the consumer-side parameter “session.timeout.ms” determines the maximum time it takes to complete a Rebalance process. This perception is incorrect. In fact, this parameter is used to detect the liveness of consumer group members. If during this timeout period, no heartbeat request is received from the member to the Coordinator, the member will be marked as Dead and explicitly removed from the consumer group, triggering a new round of Rebalance. The parameter that really determines the maximum duration of a single Rebalance is “max.poll.interval.ms” on the consumer side. Obviously, if you don’t understand this part of the source code, you won’t be able to set reasonable values for these parameters.

In general, the Rebalance process can be roughly divided into two steps: JoinGroup and SyncGroup.

JoinGroup refers to the process in which members of a consumer group send JoinGroupRequest to the Coordinator to join the group. This process has a timeout period, and if a member fails to complete the join group operation within the timeout period, it will be excluded from this round of Rebalance.

SyncGroup refers to the process where after all members have successfully joined the group, the Coordinator designates one of them as the Leader and sends the subscribed partition information to the Leader member. Then, all members (including the Leader member) send SyncGroupRequest to the Coordinator. It is worth noting that only the requests sent by the Leader member contain the subscription partition consumption assignment scheme; this part is empty in the requests sent by other members. When the Coordinator receives the assignment scheme, it will notify each member which partitions to consume by sending a response.

Once group synchronization is completed, Rebalance is declared finished. At this time, the consumer group is in normal working state.

Today, let’s learn about the first step, which is the source code implementation of joining the group. They are located in the GroupCoordinator.scala file. In the next session, we will delve into the source code implementation of group synchronization.

To understand the source code mechanism of joining the group, we must learn four methods: handleJoinGroup, doUnknownJoinGroup, doJoinGroup, and addMemberAndRebalance. handleJoinGroup is the top-level method for performing group joining, called by KafkaApis class. This method determines whether to call doUnknownJoinGroup or doJoinGroup based on whether the given consumer group member has set the member ID. The former corresponds to the case where the member ID is not set, and the latter corresponds to the case where the member ID is set. Both of these methods call addMemberAndRebalance to perform the actual join group logic. To help you understand the interaction between them, I’ve created a diagram that shows the calling sequence of these four methods.

handleJoinGroup method #

If you open the API entry file KafkaApis.scala, you can see that the method for handling the JoinGroupRequest request is handleJoinGroupRequest. And its main logic is to call the handleJoinGroup method of GroupCoordinator to handle the join group requests sent by consumer group members. So, we need to learn more about the handleJoinGroup method. Let’s first look at its method signature:

def handleJoinGroup(
  groupId: String, // consumer group name
  memberId: String, // consumer group member ID
  groupInstanceId: Option[String], // group instance ID used to identify static members
  requireKnownMemberId: Boolean, // whether member ID is required to be non-empty
  clientId: String, // client.id value
  clientHost: String, // consumer program hostname
  rebalanceTimeoutMs: Int, // Rebalance timeout in milliseconds, default is max.poll.interval.ms value
  sessionTimeoutMs: Int, // session timeout in milliseconds
  protocolType: String, // protocol type
  protocols: List[(String, Array[Byte])], // subscribed partitions grouped according to allocation strategy
  responseCallback: JoinCallback // callback function
): Unit = {
  ......
}

This method has many parameters, but I will introduce a few key ones. When you read the source code of other methods later, you will see these parameters again, so you must understand their meanings in advance.

  • groupId: Consumer group name.
  • memberId: Consumer group member ID. If the member is newly joined, this field is an empty string.
  • groupInstanceId: This is an optional field introduced by the community in version 2.4 for static members. The introduction of static members can effectively avoid Rebalance scenarios caused by system upgrades or program updates. It belongs to a more advanced usage, and it is not widely used at present. Therefore, you only need to have a basic understanding of its purpose for now. In addition, when discussing other methods later, I will directly skip the code related to static members, and we only focus on the core logic.
  • requireKnownMemberId: Whether to require a non-empty member ID, i.e., whether members must set an ID. If this field is True, Kafka requires consumer group members to set an ID. Members that do not set an ID will be rejected from joining the group until they set an ID and can rejoin the group.
  • clientId: Consumer-side parameter client.id value. The Coordinator uses it to generate the memberId. The format of the memberId is clientId value-UUID.
  • clientHost: Consumer program hostname.
  • rebalanceTimeoutMs: Rebalance timeout in milliseconds. If a consumer group member does not complete the join group operation within this time period, it will be forbidden to join the group.
  • sessionTimeoutMs: Session timeout in milliseconds. If a consumer group member fails to send heartbeat to the Coordinator within this time period, it will be considered “expired”, triggering a new round of Rebalance.
  • responseCallback: Callback method to be executed after joining the group successfully.

After explaining the method signature, let’s look at its body code:

// Validate the legality of the consumer group status
validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
  responseCallback(JoinGroupResult(memberId, error))
  return
}
// Ensure that sessionTimeoutMs is between group.min.session.timeout.ms and group.max.session.timeout.ms, otherwise throw an exception to indicate that the timeout setting is invalid
if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
  sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
  responseCallback(JoinGroupResult(memberId, Errors.INVALID_SESSION_TIMEOUT))
} else {
  // Check if the consumer group member ID is unknown
  val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
  // Get the consumer group information, if the group does not exist, create a new consumer group
  groupManager.getOrMaybeCreateGroup(groupId, isUnknownMember) match {
    case None =>
      responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
    case Some(group) =>
      group.inLock {
// 如果该消费者组已满员
if (!acceptJoiningMember(group, memberId)) {
  // 移除该消费者组成员
  group.remove(memberId)
  group.removeStaticMember(groupInstanceId)
  // 封装异常表明组已满员
  responseCallback(JoinGroupResult(
    JoinGroupRequest.UNKNOWN_MEMBER_ID, 
    Errors.GROUP_MAX_SIZE_REACHED))
// 如果消费者组成员ID为空
} else if (isUnknownMember) {
  // 为空ID成员执行加入组操作
  doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
  // 为非空ID成员执行加入组操作
  doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
// 如果消费者组正处于PreparingRebalance状态
if (group.is(PreparingRebalance)) {
  // 放入Purgatory,等待后面统一延时处理
  joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}

In this code snippet, it handles the logic of a consumer joining a group.

In the first if statement, it checks if the consumer group is already at maximum capacity. If it is, it removes the consumer from the group and returns an exception indicating that the group is at maximum size.

If the consumer’s member ID is unknown, it calls the doUnknownJoinGroup method, which handles joining the group for new members.

If the consumer’s member ID is not unknown (i.e., it is a known member), it calls the doJoinGroup method, which handles joining the group for existing members.

After the join operation is performed, if the consumer group is in the “PreparingRebalance” state, it puts the join request into the joinPurgatory to be completed later.

Overall, this code snippet handles the logic of a consumer joining a group, either as a new member or as an existing member. It also handles the scenario when the group is at maximum capacity.

The doUnknownJoinGroup Method #

If a new member joins a group, it needs to execute the doUnknownJoinGroup method because its member ID has not been generated yet.

With the exception of memberId, the input parameters of this method are almost the same as those of the handleJoinGroup method, so I won’t go into detail about each one. Let’s take a look at the source code directly. To make it easier for you to understand, I have omitted the code related to static members and the DEBUG/INFO debugging parts.

group.inLock {
  // Dead state
  if (group.is(Dead)) {
    // Wrap the exception and return it through the callback function
    responseCallback(JoinGroupResult(
      JoinGroupRequest.UNKNOWN_MEMBER_ID,         
      Errors.COORDINATOR_NOT_AVAILABLE))
  // Protocol type or partition consumption allocation strategy configured by the member does not match that of the consumer group
  } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
  responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.INCONSISTENT_GROUP_PROTOCOL))
  } else {
    // Generate a member ID for the member according to the rule
    val newMemberId = group.generateMemberId(clientId, groupInstanceId)
    // If static members are configured
    if (group.hasStaticMember(groupInstanceId)) {
      ......
    // If a non-empty member ID is required
    } else if (requireKnownMemberId) {
      ......
      group.addPendingMember(newMemberId)
      addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
      responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
    } else {
      ......
      // Add the member
      addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
        clientId, clientHost, protocolType, protocols, group, responseCallback)
    }
  }
}

To help you understand, I have created a diagram to visualize the process of this method.

First, the code checks the state of the consumer group.

If it is in the Dead state, it wraps the exception and returns it through the callback function. You may wonder why the group state can be Dead when adding a member. In fact, this is possible because at the same time a member joins the group, there may be another thread that has already removed the group’s metadata information from the coordinator. For example, if the coordinator corresponding to the group changes and moves to another broker, the code wraps an exception and returns it to the consumer program, which will then look for the latest coordinator and initiate the join group operation again.

If the state is not Dead, it checks whether the protocol type and partition consumption allocation strategy of the member match the current supported schemes of the consumer group. If they don’t match, it still wraps an exception and returns it through the callback function. The matching here refers to whether the member’s protocol type is consistent with that of the consumer group, and whether the partition consumption allocation strategy set by the member is supported by other members of the consumer group.

If these checks pass successfully, the code generates a member ID for the member using the rule clientId-UUID. This is what the generateMemberId method does. Then, based on the value of requireKnownMemberId, the handleJoinGroup method decides the following logic paths:

  • If it is True, it adds the member to the pending member list and wraps an exception along with the generated member ID to send the member’s join request back, allowing the member to reapply after it has been assigned a member ID.
  • If it is False, it does not require such strict requirements, and directly calls the addMemberAndRebalance method to add the member to the group. With that, the handleJoinGroup method ends.

Typically, if you have not enabled the static member mechanism, the value of requireKnownMemberId is True. This is determined by the following line of code in the handleJoinGroupRequest method in KafkaApis:

val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty

As you can see, if you are using a newer version of the Kafka client and have not configured the group.instance.id consumer-side parameter, then the value of this field will be True. This indicates that Kafka requires a consumer member to be allocated a member ID when joining a group.

I will explain the source code of the addMemberAndRebalance method in more detail when we learn about the doJoinGroup method later.

doJoinGroup Method #

Next, let’s take a look at the doJoinGroup method. This method is used to execute the logic for joining a group for members who have set their member ID. All input parameters for this method are inherited from the handleJoinGroup method, which you should already be familiar with. Therefore, let’s directly look at its source code implementation. Since the code is quite long, I will divide it into two parts to explain. At the same time, I will also draw a diagram to help you understand the logic of the entire method.

Diagram

Part 1 #

This part mainly performs some validation and condition checks.

// If the group is in Dead state, wrap a COORDINATOR_NOT_AVAILABLE exception and invoke the callback function to return
if (group.is(Dead)) {
  responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
// If the protocol type or partition consumption allocation strategy does not match that of the consumer group
// Wrap an INCONSISTENT_GROUP_PROTOCOL exception and invoke the callback function to return
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
  responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
// If it is a pending member and since a member ID has been allocated in this allocation,
// allow it to join the group
} else if (group.isPendingMember(memberId)) {
  if (groupInstanceId.isDefined) {
    ......
  } else {
    ......
    // Add it to the group
    addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
      clientId, clientHost, protocolType, protocols, group, responseCallback)
  }
} else {
  // Code for Part 2...
}

The beginning of the doJoinGroup method is similar to the doUnknownJoinGroup method. It checks whether the group is in the Dead state and verifies if the protocol type and partition consumption allocation strategy match that of the consumer group.

The difference is that doJoinGroup checks whether the member applying to join the group is a pending member. If it is, this member has already been assigned a member ID, so it directly calls the addMemberAndRebalance method to add it to the group. If it is not, the method then proceeds to Part 2, which handles the join request of a non-pending member.

Part 2 #

The code is as follows:

// Get the metadata information for the member
val member = group.get(memberId)
group.currentState match {
  // If the group is in PreparingRebalance state
  case PreparingRebalance =>
    // Update the member information and start preparing for rebalance
    updateMemberAndRebalance(group, member, protocols, responseCallback)
  // If the group is in CompletingRebalance state
  case CompletingRebalance =>
    // If the member has previously applied to join the group
    if (member.matches(protocols)) {
      // Return the current group information directly
      responseCallback(JoinGroupResult(
        members = if (group.isLeader(memberId)) {
          group.currentMemberMetadata
        } else {
          List.empty
        },
        memberId = memberId,
        generationId = group.generationId,
        protocolType = group.protocolType,
        protocolName = group.protocolName,
        leaderId = group.leaderOrNull,
        error = Errors.NONE))
    // Otherwise, update the member information and start preparing for rebalance
    } else {
      updateMemberAndRebalance(group, member, protocols, responseCallback)
    }
  // If the group is in Stable state
  case Stable =>
    val member = group.get(memberId)
    // If the member is the leader member or if the member has changed its partition allocation strategy
    if (group.isLeader(memberId) || !member.matches(protocols)) {
      // Update the member information and start preparing for rebalance
      updateMemberAndRebalance(group, member, protocols, responseCallback)
    } else {
      responseCallback(JoinGroupResult(
        members = List.empty,
        memberId = memberId,
        generationId = group.generationId,
        protocolType = group.protocolType,
        protocolName = group.protocolName,
        leaderId = group.leaderOrNull,
        error = Errors.NONE))
    }
  // If it is in any other state, wrap an exception and invoke the callback function to return
  case Empty | Dead =>
    warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
      s"unexpected group state ${group.currentState}")
    responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}

In Step 1 of this code, the metadata information for the member wanting to join the group is obtained.

In Step 2, the current state of the consumer group is checked. There are 4 possible cases.

  1. If the group is in the PreparingRebalance state, it means that the consumer group is about to start the rebalance process. In this case, the updateMemberAndRebalance method is called to update the member information and start preparing for rebalance.

  2. If the group is in the CompletingRebalance state, it checks whether the member’s partition consumption allocation strategy and subscribed partition list match the existing records. If they match, it means that the member has previously applied to join the group and the Coordinator has approved it. However, the member did not receive the response. In this case, the code constructs a JoinGroupResult object and returns the current group information directly to the member. But if the protocols do not match, it means that the member has changed its subscription information or allocation strategy, so the updateMemberAndRebalance method is called to update the member information and start preparing for a new round of rebalance.

  3. If the group is in the Stable state, it checks whether the member is the leader member or if the member has changed its subscription information or allocation strategy. If it meets these conditions, the updateMemberAndRebalance method is called to force a new rebalance. Otherwise, the current group information is returned to the member, allowing them to proceed to the next step of the rebalance process.

  4. If the group is in any other state, such as Empty or Dead, an UNKNOWN_MEMBER_ID exception is wrapped and the callback function is invoked to return.

As you can see, this code frequently calls the updateMemberAndRebalance method. By looking at its code, you will find that it does two things:

  • Update the group member information: the updateMember method of GroupMetadata is called to update the consumer group member.
  • Prepare for rebalance: The core idea of this step is to change the consumer group state to PreparingRebalance and create a DelayedJoin object to be handled by the Purgatory, waiting for delayed processing of the join group operation.

This method has a small number of lines and a simple logic. It only changes the consumer group state and handles the delayed requests by adding them to the Purgatory. Therefore, I will not go into further detail, and you can read the code in this part on your own.

addMemberAndRebalance Method #

Now, let’s learn about the addMemberAndRebalance method which is used in both the doUnknownJoinGroup and doJoinGroup methods. From its name, this method has two purposes:

  • Adding a member to the consumer group;
  • Preparing for rebalance.
private def addMemberAndRebalance(rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, memberId: String, groupInstanceId: Option[String], clientId: String, clientHost: String, protocolType: String, protocols: List[(String, Array[Byte])], group: GroupMetadata, callback: JoinCallback): Unit = {
  // Create an instance of MemberMetadata
  val member = new MemberMetadata(
    memberId, group.groupId, groupInstanceId,
    clientId, clientHost, rebalanceTimeoutMs,
    sessionTimeoutMs, protocolType, protocols)

  // Identify the member as a new member
  member.isNew = true

  // If preparing for the first rebalance, set newMemberAdded to true
  if (group.is(PreparingRebalance) && group.generationId == 0)
    group.newMemberAdded = true

  // Add the member to the consumer group
  group.add(member, callback)

  // Set the next heartbeat expiration time
  completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)

  if (member.isStaticMember) {
    info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
    group.addStaticMember(groupInstanceId, memberId)
  } else {
    // Remove from pending member list
    group.removePendingMember(memberId)
  }

  // Prepare for rebalance
  maybePrepareRebalance(group, s"Adding new member ${memberId} with group instance id ${groupInstanceId}")
}

Although the parameter list of this method is quite long, I believe you are already familiar with them as they are inherited from the calling method.

Let me explain the execution logic of this method.

Step 1: This method creates an instance of MemberMetadata based on the passed parameters and sets the isNew field to true, indicating that it is a new member. The isNew field is associated with heartbeat settings. You can read the code of the hasSatisfiedHeartbeat method in MemberMetadata to understand how this field helps the Coordinator confirm the heartbeat of consumer group members.

Step 2: The code checks if the consumer group is starting a rebalance for the first time. If so, it sets the newMemberAdded field to true; otherwise, it does not need to perform this assignment. The purpose of this field is to optimize the performance of the Kafka consumer group rebalance process. The general idea is to make the Coordinator wait for a longer period of time during the first rebalance of the consumer group, allowing more group members to join the group in order to avoid repeated rebalances caused by latecomers applying to join the group. The duration of the additional waiting time is determined by the broker-side configuration parameter group.initial.rebalance.delay.ms. The newMemberAdded field is used to determine whether the additional waiting time is needed.

Now let’s continue with the addMemberAndRebalance method. Step 3 calls the add method of GroupMetadata to add the new member information to the consumer group metadata and sets the next heartbeat expiration time for the member.

Step 4: The code removes the member from the pending member list. After all, the member has formally joined the group and no longer needs to stay in the pending list.

Step 5: The maybePrepareRebalance method is called to prepare for the rebalance.

Summary #

So far, we have completed the first major step of the Rebalance process, which is the study of the code for joining a group. In this step, you need to pay special attention to whether the consumer group member ID is set or not. For branches where the member ID is not set, the code calls the doUnknownJoinGroup method to generate the ID information for the member; for branches where the member ID is set, the doJoinGroup method is called. Both of these methods ultimately call the addMemberAndRebalance method at the underlying level, which adds the consumer group member to the group.

Let’s briefly review the key points of this lesson.

  • Rebalance process: consists of two major steps, JoinGroup and SyncGroup.
  • handleJoinGroup method: the method used by the Coordinator to handle member joining group requests.
  • Member ID: the ID of a member. Depending on whether the member ID is set or not, Kafka’s source code determines which join group logic method to call, such as doUnknownJoinGroup or doJoinGroup.
  • addMemberAndRebalance method: the actual method that implements the join group functionality, used to complete the “join group + start Rebalance” operations.

Once all members have successfully joined the group, they will proceed to the second major step of the Rebalance process: group synchronization. In this step, members will send SyncGroupRequest requests to the Coordinator. So how does the Coordinator respond to this? We will find out in the next lesson.

After-Class Discussion #

Today, we mentioned the maybePrepareRebalance method multiple times. From its name, it seems that it may not always trigger a rebalance. So, can you explain based on the source code under what circumstances a rebalance is actually triggered?

Feel free to share your thoughts and answer in the comments section to discuss with me. You are also welcome to share today’s content with your friends.