17 Can Consumer Group Rebalancing Be Prevented

17 Can Consumer Group Rebalancing be Prevented #

Hello, I’m Hu Xi. Today I want to share with you the topic: Can consumer group rebalancing be avoided?

In fact, we have discussed rebalancing, also known as Rebalance, in episode 15 of this column. Now, let’s review the principles and uses of this concept. Rebalance is the process of reaching a consensus among all consumer instances in a consumer group on how to consume all partitions of a subscribed topic. During the rebalance process, all consumer instances participate together and, with the help of the coordinator component, complete the allocation of topic partitions for consumption. However, during the entire process, no instance can consume any messages, so it has a significant impact on the consumer’s TPS (Transactions Per Second).

You may be unfamiliar with the term “coordinator” mentioned here, so let me briefly introduce it. The coordinator, known as the Coordinator in Kafka, is responsible for serving the consumer group and is responsible for performing rebalancing for the group, as well as providing offset management and group member management.

Specifically, when the consumer-side application submits an offset, it actually submits it to the broker where the coordinator is located. Similarly, when a consumer application starts, it sends various requests to the broker where the coordinator is located, and then the coordinator is responsible for performing metadata management operations such as group registration and member management records for the consumer group.

When a broker starts, it creates and enables the corresponding coordinator component. In other words, all brokers have their own coordinator components. So how does a consumer group determine which broker its coordinator is located on? The answer lies in the Kafka internal offset topic, __consumer_offsets, which I mentioned earlier.

Currently, Kafka has two steps to determine the algorithm for a consumer group to locate the coordinator.

Step 1: Determine which partition of the offset topic is used to save the group’s data: partitionId = Math.abs(groupId.hashCode() % offsetsTopicPartitionCount).

Step 2: Find the broker where the leader replica of the partition is located, which is the corresponding coordinator.

Let me explain the algorithm briefly. First, Kafka calculates the hash value of the group’s group.id parameter. For example, if you have a group with group.id set to “test-group”, then its hashCode value should be 627841412. Next, Kafka calculates the number of partitions of __consumer_offsets, usually 50 partitions, and then calculates the absolute value of the hash value modulo the number of partitions, that is, abs(627841412 % 50) = 12. At this point, we know that partition 12 of the offset topic is responsible for saving the data for this group. With the partition number, the second step of the algorithm becomes simple. We just need to find out on which broker the leader replica of partition 12 of the offset topic is located. This broker is the coordinator we are looking for.

In actual usage, consumer applications, especially the Java Consumer API, can automatically discover and connect to the correct coordinator, so we don’t have to worry about this issue. The significance of knowing this algorithm is that it can help us solve the problem of locating. When there is a problem with the consumer group and we need to quickly check the broker logs, we can accurately locate the broker corresponding to the coordinator based on this algorithm, instead of blindly checking one broker after another.

Alright, let’s get back to rebalance. Since we are discussing how to avoid rebalancing today, it means that this thing called rebalance is not good, or at least it has some downsides that we need to avoid. So, what are the downsides of rebalancing? In summary, there are the following three points:

  1. Rebalance affects consumer-side TPS. This has been mentioned repeatedly before, so I won’t go into detail here. In short, during the rebalance period, the consumer will stop what it is doing and won’t be able to do anything.

  2. Rebalance is slow. If you have many members in your group, you will definitely have this pain point. Do you remember the example I mentioned before about a foreign user? He had hundreds of consumer instances in his group, and each rebalance took several hours. In that kind of scenario, the rebalancing of the consumer group has completely gotten out of control.

  3. Rebalance is not efficient. The current design mechanism of Kafka determines that every time a rebalance occurs, all members of the group must participate, and usually the locality principle is not considered. However, the locality principle is particularly important for improving system performance.

Regarding the third point, let’s give a simple example. Suppose there is a group with 10 members, and each member consumes an average of 5 partitions. Now, let’s say one member exits, and a new round of rebalance is triggered to “transfer” the 5 partitions previously handled by this member to other members. Obviously, the best approach is for the 9 remaining members to keep the current allocation of partitions unchanged, and then randomly assign the 5 partitions to these 9 members. This way, the impact of rebalance on the remaining consumer members is minimized.

Unfortunately, Kafka is not designed this way currently. By default, each time a rebalance starts, the group shuffles these 50 partitions (10 members * 5 partitions), and the surviving 9 members are then reassigned. Obviously, this is not a very efficient approach. For this reason, the community introduced the StickyAssignor in version 0.11.0.0, which is a partition assignment strategy with stickiness. The so-called stickiness means that this strategy tries to retain the previous partition assignment as much as possible during each rebalance, minimizing the changes in partition assignment. However, unfortunately, this strategy currently has some bugs, and it requires upgrading to version 0.11.0.0 to use it, so it is not widely used in actual production environments.

In summary, rebalance has the above three downsides. You might wonder if there are solutions to these problems. Especially for the issues of slow rebalancing and the impact on TPS, does the community have any solutions? For these two points, I can responsibly tell you: “No solution!” Especially for the problem of slow rebalancing, the Kafka community is powerless. Since we can’t solve various problems during the rebalancing process, it’s better to avoid unnecessary rebalancing, especially those that are not needed.

In my personal experience, in real business scenarios, many rebalances are unplanned or unnecessary. The TPS of our applications is mostly slowed down by these rebalances, so it is necessary to avoid these rebalances. Now let’s talk about how to avoid rebalancing. To avoid Rebalance, we need to start with the timing of Rebalance. As mentioned earlier, there are three timings for Rebalance:

  • Change in the number of group members
  • Change in the number of subscribed topics
  • Change in the number of partitions for subscribed topics

The latter two are usually actively operated by operation and maintenance personnel, so most of the Rebalances they trigger are inevitable. Next, let’s mainly talk about how to avoid Rebalance caused by changes in the number of group members.

If the number of Consumer instances under the Consumer Group changes, it will definitely trigger Rebalance. This is the most common reason for Rebalance to occur. I have encountered 99% of Rebalances caused by this reason.

It is easy to understand the case of adding Consumer instances. When we start a Consumer program with the same group.id configuration, we are actually adding a new Consumer instance to this group. At this time, the Coordinator will accept this new instance, add it to the group, and reassign partitions. Generally speaking, adding Consumer instances is a planned operation, which may be for the need to increase TPS or scalability. In short, it is not the type of “unnecessary Rebalance” that we need to avoid.

What we are more concerned about is when the number of instances under the group decreases. If you want to stop certain Consumer instances, that goes without saying. The key is that in some cases, the Coordinator may mistakenly consider a Consumer instance as “stopped” and “kick it out” of the group. If Rebalance is caused by this reason, we cannot ignore it.

Under what circumstances will the Coordinator consider a Consumer instance as dead and remove it from the group? This is definitely a topic that needs to be discussed in detail, so let’s talk about it in detail.

After the Consumer Group completes Rebalance, each Consumer instance will regularly send heartbeat requests to the Coordinator to indicate that it is still alive. If a Consumer instance fails to send these heartbeat requests in a timely manner, the Coordinator will consider the Consumer as “dead,” remove it from the group, and then start a new round of Rebalance. The Consumer side has a parameter called session.timeout.ms, which is used to indicate this. The default value of this parameter is 10 seconds, which means that if the Coordinator does not receive heartbeat requests from a Consumer instance under the group within 10 seconds, it will consider the Consumer instance as dead. It can be said that session.timeout.ms determines the interval for which the Consumer is considered alive.

In addition to this parameter, the Consumer also provides a parameter that allows you to control the frequency of sending heartbeat requests. This is heartbeat.interval.ms. The smaller the value set, the higher the frequency at which the Consumer instance sends heartbeat requests. Frequent heartbeat requests consume additional bandwidth resources, but the benefit is that it allows us to quickly know whether Rebalance is currently in progress. Currently, the method in which the Coordinator notifies each Consumer instance to start Rebalance is by encapsulating the REBALANCE_NEEDED flag in the response body of the heartbeat request.

In addition to the above two parameters, the Consumer side also has a parameter used to control the impact of Consumer’s actual consumption capability on Rebalance, which is max.poll.interval.ms. It limits the maximum time interval for the Consumer-side application to call the poll method twice. The default value is 5 minutes, which means if your Consumer program cannot consume the messages returned by the poll method within 5 minutes, the Consumer will actively request to “leave the group” and the Coordinator will start a new round of Rebalance.

Having understood the meanings of these parameters, let’s be clear about which Rebalances are “unnecessary.”

The first type of unnecessary Rebalance is caused by the failure to send heartbeats in a timely manner, resulting in the Consumer being “kicked out” of the group. Therefore, you need to carefully set the values of session.timeout.ms and heartbeat.interval.ms. Here are some recommended values that you can “blindly” apply to your production environment.

  • Set session.timeout.ms to 6s.
  • Set heartbeat.interval.ms to 2s.
  • Ensure that a Consumer instance can send at least 3 rounds of heartbeat requests before being identified as “dead,” that is, session.timeout.ms >= 3 * heartbeat.interval.ms.

Setting session.timeout.ms to 6s is mainly to allow the Coordinator to locate dead Consumers faster. After all, we still hope to quickly identify those “empty-seat diners” and kick them out of the group as soon as possible. I hope this configuration can help you avoid the first type of “unnecessary” Rebalance.

The second type of unnecessary Rebalance is caused by the Consumer taking too long to consume. I had a customer before whose Consumer needed to process messages and write them to MongoDB. Obviously, this is a heavy consumption logic. Even a little instability in MongoDB can increase the Consumer program’s consumption time. In this case, the setting of the max.poll.interval.ms parameter value becomes particularly critical. If you want to avoid unexpected Rebalances, it is best to set the parameter value to a slightly longer time than the maximum processing time downstream. Taking MongoDB as an example, if the longest time to write to MongoDB is 7 minutes, you can set this parameter to around 8 minutes.

In short, you need to leave enough time for your business processing logic. This way, the Consumer will not trigger a Rebalance due to taking too long to process these messages.

If you have appropriately set these parameters according to the recommended values mentioned above and still encounter Rebalance, then I suggest you investigate the GC performance of the Consumer side, such as frequent Full GC that causes long pauses, thereby triggering unexpected Rebalances. Why specifically mention GC? That’s because in practical scenarios, I have seen too many unexpected Rebalances caused by frequent Full GC due to inappropriate GC settings.

Summary #

In summary, we must avoid situations where group members unexpectedly leave or quit due to various parameters or illogical logic. The main parameters related to this are:

  • session.timeout.ms
  • heartbeat.interval.ms
  • max.poll.interval.ms
  • GC parameters

According to what we discussed today, by properly setting these parameters, you will be able to significantly reduce the number of rebalances in the production environment, thereby improving the overall TPS (Transactions Per Second) on the consumer side.

Rebalance

Open Discussion #

Let’s talk about the frequency and reasons for Rebalance in your business scenario, as well as how you handle it. We can discuss whether there are better solutions together.

Please share your thoughts and answers, and let’s have a discussion. If you find it helpful, feel free to share this article with your friends.