25 Consumer Group Rebalance Full Process Analysis

25 Consumer Group Rebalance Full Process Analysis #

Hello, I am Hu Xi. Today, I will share with you the topic of the full process of consumer group rebalancing.

Previously, we discussed the rebalancing process of consumer groups, which aims to achieve consensus among all consumer instances in the group on which topic partitions to consume. Rebalancing relies on the Coordinator component on the Kafka Broker side to achieve the partition reassignment of the entire consumer group. Today, we will discuss this process in detail.

First, I would like to remind you that I will be using the source code of Kafka version 2.3 for today’s discussion. Throughout the sharing process, I will also explicitly mention any design differences in older versions. Therefore, even if you are still using an older version, it won’t matter much since the overall design principles have not changed.

Triggering and Notification #

Let’s start by briefly reviewing the three triggering conditions for rebalancing:

  1. Changes in the number of group members.
  2. Changes in the number of subscribed topics.
  3. Changes in the number of partitions of subscribed topics.

In my personal experience, in actual production environments, rebalancing triggered by the first condition is the most common. Furthermore, the sequential startup of consumer instances in a consumer group also falls under the first condition. In other words, the rebalancing process is inevitably triggered every time a consumer group starts.

I have already provided a detailed explanation of this in Chapter 15 of my column here, so I won’t repeat it here. If you don’t remember, you can review it first.

Today, what I really want to bring up is another topic: how is the rebalancing process notified to other consumer instances? The answer is through the consumer’s heartbeat thread.

The Kafka Java consumer needs to regularly send heartbeat requests to the broker’s coordinator to indicate that it is still alive. Before version 0.10.1.0, heartbeat requests were sent by the consumer’s main thread, i.e. the thread in which you call the KafkaConsumer.poll method.

This approach has many disadvantages, the biggest problem being that message processing logic is also performed in this thread. Therefore, if message processing takes too long, the heartbeat request will not be sent to the coordinator in a timely manner, causing the coordinator to “incorrectly” assume that the consumer has “died”. Starting from version 0.10.1.0, the community introduced a separate heartbeat thread to handle heartbeat request sending, which avoids this problem.

But what does this have to do with rebalancing? Actually, the rebalancing notification mechanism is achieved through the heartbeat thread. When the coordinator decides to start a new round of rebalancing, it encapsulates “REBALANCE_IN_PROGRESS” in the response to the heartbeat request and sends it back to the consumer instance. When the consumer instance detects that the heartbeat response contains “REBALANCE_IN_PROGRESS”, it can immediately know that rebalancing has started again. This is the notification mechanism for rebalancing.

By the way, many people still don’t understand the real purpose of the consumer-side parameter heartbeat.interval.ms. Let me explain. Literally, it sets the interval time for heartbeats, but its actual function is to control the frequency of rebalancing notifications. If you want consumer instances to be notified more quickly, you can set a very small value for this parameter, so that consumers can perceive that rebalancing has been initiated faster.

Consumer Group State Machine #

Once rebalancing is enabled, the coordinator component on the broker side starts to get busy, mainly involved in controlling the state transition of consumer groups. Currently, Kafka has designed a consumer group state machine to help the coordinator complete the entire rebalancing process. Strictly speaking, this state machine belongs to a very low-level design that is not mentioned on the Kafka official website at all, but it’s better for you to understand it because it can help you understand the design principles of consumer groups, such as the deletion of expired offsets.

Currently, Kafka defines five states for consumer groups, which are: Empty, Dead, PreparingRebalance, CompletingRebalance, and Stable. So what do these five states mean? Let’s take a look at the table below.

State Meaning
Empty No active members
Dead Group marked for deletion, and waiting for old generation to complete before being deleted
PreparingRebalance Group is preparing for rebalance
CompletingRebalance Group has completed rebalance and is waiting for all members to join
Stable Group is in a stable state

Having understood the meanings of these states, let’s take a look at an image that shows the state transitions of the state machine.

State Machine

Let me explain the state transition process when a consumer group starts. A consumer group starts with the Empty state, and when the rebalance process is initiated, it is placed in the PreparingRebalance state, waiting for members to join. After that, it transitions to the CompletingRebalance state, waiting for the assignment plan, and finally transitions to the Stable state to complete rebalancing.

When new members join or existing members exit, the consumer group’s state jumps directly from Stable to PreparingRebalance. At this time, all existing members must reapply to join the group. When all members have exited the group, the consumer group’s state changes to Empty. The condition for Kafka to automatically delete expired offsets is that the group must be in the Empty state. Therefore, if your consumer group has been stopped for a long time (more than 7 days), Kafka is likely to delete the offset data of that group. I believe that you often see the following output in Kafka logs:

Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.

This is Kafka’s attempt to periodically delete expired offsets. Now you know that only groups in the Empty state will perform the deletion of expired offsets.

Consumer-side Rebalance Process #

With the content above as a preface, we can now begin to introduce the rebalance process. The complete rebalance process requires the participation of both the consumer-side and the coordinator component. Let’s first look at the process from the consumer’s perspective.

From the consumer-side, the rebalance process consists of two steps: joining the group and waiting for the leader consumer to assign the assignment scheme. These two steps correspond to two specific types of requests: JoinGroup request and SyncGroup request.

When a member joins the group, it sends a JoinGroup request to the coordinator. In this request, each member needs to report the topics it subscribes to, so that the coordinator can collect the subscription information of all members. Once all JoinGroup requests from members are received, the coordinator will elect a leader consumer to lead this consumer group.

Usually, the member that sends the first JoinGroup request automatically becomes the leader. You must pay attention to the distinction between this leader and the leader replica we introduced earlier. They are not the same concept. The leader here refers to a specific consumer instance, which is neither a replica nor a coordinator. The task of the leader consumer is to collect the subscription information of all members and then create a specific partition consumption assignment scheme based on this information.

After the leader is selected, the coordinator will encapsulate the consumer group’s subscription information into the response body of the JoinGroup request, and then send it to the leader. The leader then makes a distribution plan and proceeds to the next step: sending a SyncGroup request.

In this step, the leader sends a SyncGroup request to the coordinator, sending the just-made distribution plan to the coordinator. Note that other members also send SyncGroup requests to the coordinator, but the request body does not contain any actual content. The main purpose of this step is for the coordinator to receive the distribution plan, and then distribute it to all members in the form of SyncGroup responses, so that all members in the group will know which partitions they should consume.

Next, I will use a diagram to vividly illustrate the processing process of JoinGroup requests.

JoinGroup Request

As mentioned earlier, the main function of the JoinGroup request is to send the member subscription information to the leader consumer. After the leader has formulated the distribution plan, the rebalance process enters the SyncGroup request stage.

The following diagram describes the processing flow of SyncGroup requests.

SyncGroup Request

The main purpose of the SyncGroup request is for the coordinator to distribute the distribution plan made by the leader to each member in the group. When all members successfully receive the distribution plan, the consumer group enters the Stable state and begins normal consumption work.

Now, I have finished introducing the consumer-side rebalance process. Next, let’s take a look at how the rebalance is executed from the coordinator-side.

Analysis of Rebalancing Scenarios from the Broker’s Perspective #

In order to analyze the entire process of how the coordinator handles rebalancing, we need to discuss several scenarios. These scenarios include a new member joining the group, a group member voluntarily leaving the group, a group member leaving the group due to a crash, and a group member submitting an offset. Let’s discuss one by one.

Scenario 1: New member joining the group.

A new member joining the group refers to a situation where a new member joins the group after the group reaches the Stable state. If it is a new start of a consumer group, Kafka has some optimizations of its own, so the process will be slightly different. Here, we are discussing the case where a new member joins the group after it has stabilized.

When the coordinator receives a new JoinGroup request, it will notify all existing members in the group by responding via a heartbeat request, forcing them to start a new round of rebalancing. The specific process is the same as the previous client-side rebalancing process. Now, let me explain how the coordinator handles the new member joining the group using a sequence diagram.

Scenario 2: Group member voluntarily leaving the group.

What does voluntarily leaving the group mean? It means that the consumer instance’s thread or process calls the close() method to actively notify the coordinator that it wants to exit. This scenario involves the third type of request: the LeaveGroup request. After receiving the LeaveGroup request, the coordinator will still notify other members via heartbeat response, so I won’t go into detail here. Let’s directly use a diagram to illustrate.

Scenario 3: Group member leaving the group due to a crash.

A crash leaving the group refers to a situation where a consumer instance experiences a severe failure and suddenly crashes, leading to leaving the group. It is different from voluntarily leaving the group because the latter is initiated actively, and the coordinator can immediately perceive and handle it. However, in the case of a crash leaving the group, it is passive, and the coordinator usually needs to wait for a certain period of time to perceive it. This period of time is generally controlled by the consumer-side parameter session.timeout.ms. This means that Kafka can generally perceive this crash before the session.timeout.ms is exceeded. Of course, the subsequent process of handling a crash leaving the group is the same as before. Let’s take a look at the following diagram.

Scenario 4: Coordinator’s handling of member’s offset submission during rebalancing.

Under normal circumstances, each member of the group will periodically report its offset to the coordinator. When rebalancing starts, the coordinator will give members a certain buffer time, requiring each member to quickly report its offset information within this time period, and then send normal JoinGroup/SyncGroup requests. Here is the diagram showing the process.

Summary #

Alright, I have completed explaining the consumer rebalancing process. Although I used two members as examples throughout, you can easily extend it to consumer groups with multiple members because the principles are the same. I hope you can go through today’s content several times to thoroughly understand Kafka’s consumer rebalancing process. The community is making significant changes to the current rebalancing process, and if you are not familiar with these basic design principles, it will be very difficult to delve deeper into this topic in the future.

Open Discussion #

During the entire process of rebalancing, all consumer instances within the group will pause consumption. In terms of JVM GC, this is considered a “stop the world” operation. Please think about how we can improve this process. Is it possible to allow some consumers to continue consuming during the rebalancing process in order to improve consumer-side availability and throughput?

Feel free to share your thoughts and answers. Let’s discuss together. If you find this article helpful, please feel free to share it with your friends.