15 What Exactly Are Consumer Groups

15 What Exactly Are Consumer Groups #

Hello, I’m Hu Xi. Today, I want to share with you the topic of Kafka consumer groups.

Consumer groups, also known as Consumer Group, are one of the highlights of Kafka’s design. So what is a Consumer Group? In a nutshell, a Consumer Group is a scalable and fault-tolerant consumer mechanism provided by Kafka. Since it is a group, there can be multiple consumers or consumer instances within the group, sharing a common ID called the Group ID. All consumers in the group work together to consume all partitions of the subscribed topics. However, each partition can only be consumed by one consumer instance within the same consumer group. In my opinion, understanding Consumer Group can be summarized with the following three characteristics:

  1. There can be one or more consumer instances under a Consumer Group. These instances can be separate processes or threads within the same process. In practical scenarios, using processes is more common.
  2. Group ID is a string that uniquely identifies a Consumer Group within a Kafka cluster.
  3. Each partition of the topics subscribed by the Consumer Group can only be assigned to a specific consumer instance within the group for consumption. This partition can also be consumed by other groups.

You should remember the two message engine models I mentioned in the first issue of this column, right? They are the point-to-point model and the publish/subscribe model, also known as the message queue. Of course, you should be aware of the distinction between the message queues mentioned in many architecture articles and the message queues mentioned here. Many domestic articles tend to use the term “message queue” to refer to middleware frameworks like this, and I’m not evaluating whether this usage is accurate. I just want to remind you to pay attention to the fact that the message queue mentioned here specifically refers to the classic message engine model.

Okay, these are the two main types of traditional message engine models, each with its own advantages and disadvantages. Let’s briefly review them. The flaw of the traditional message queue model is that once a message is consumed, it is deleted from the queue and can only be consumed by one downstream consumer. Strictly speaking, this is not a flaw, but rather a feature. However, it is obvious that this model has poor scalability because multiple downstream consumers have to “fight” for messages from the shared message queue. The publish/subscribe model allows messages to be consumed by multiple consumers, but it also has scalability issues because each subscriber must subscribe to all partitions of the topic. This full subscription method is not flexible and can affect the actual delivery of messages.

If there is a mechanism that can avoid the shortcomings of both models and combine their advantages, it would be great. Luckily, Kafka’s Consumer Group is such a mechanism. When a Consumer Group subscribes to multiple topics, each instance within the group is not required to subscribe to all partitions of the topic. It only consumes messages from some partitions. Consumer Groups are independent of each other and do not affect each other. They can subscribe to the same set of topics without interfering with each other. Together with the message retention mechanism on the broker side, Kafka’s Consumer Groups perfectly avoid the scalability issue mentioned above. It can be said that Kafka uses only the Consumer Group mechanism, but achieves the two major models of traditional messaging engine systems: if all instances belong to the same group, it implements the message queue model; if all instances belong to different groups, it implements the publish/subscribe model.

After understanding Consumer Groups and their design highlights, you may have the question: how do I know how many consumer instances should be in a group in practical usage scenarios? Ideally, the number of consumer instances should be equal to the total number of partitions subscribed by that group.

For example, let’s assume a consumer group subscribes to three topics, A, B, and C, with 1, 2, and 3 partitions respectively (a total of 6 partitions). In general, setting 6 consumer instances for this group is the most ideal situation because it maximizes scalability.

You may ask, can I set fewer or more than 6 instances? Yes, you can! If you have 3 instances, each instance will consume about 2 partitions on average (6 / 3 = 2). If you set 8 instances, unfortunately, 2 instances (8 - 6 = 2) will not be assigned any partitions and will be idle forever. Therefore, it is generally not recommended to set consumer instances greater than the total number of partitions in actual usage. Setting extra instances will only waste resources without any benefits.

Now, after discussing the design features of Consumer Groups, let’s discuss a question: how does Kafka manage offsets for Consumer Groups? Remember, consumers need to record how much data they have consumed, which is the consumption position information. In Kafka, this position information has a dedicated term called “offset”.

Although the offset appears to be just a numerical value, for Consumer Groups, it is a group of key-value pairs. The key is the partition, and the value corresponds to the latest offset consumed by the consumer for that partition. If represented in Java, you can roughly think of it as a data structure like Map, where TopicPartition represents a partition, and Long represents the offset type. Of course, I must admit that the actual data structure in Kafka source code is more complex than this, but it doesn’t hinder our understanding of group offsets.

As mentioned in my column Issue 4, Kafka has both new and old client APIs, which naturally leads to new and old consumers. The old version of the consumer also has the concept of consumer groups, and it is not much different from the Consumer Groups we are currently discussing in terms of user experience. The only difference is the way it manages offsets compared to the new version.

In the old version, Consumer Groups stored offsets in ZooKeeper. Apache ZooKeeper is a distributed coordination service framework that Kafka heavily relies on to implement various coordination and management functions. Storing offsets in an external system like ZooKeeper has the obvious advantage of reducing the state storage overhead on Kafka brokers. The current popular approach is to make server nodes stateless, which allows for free scaling and achieves strong scalability. Kafka initially considered this and thus stored Consumer Group offsets in a framework outside the Kafka cluster. However, people gradually discovered a problem, that is, frameworks like ZooKeeper are not suitable for frequent write updates, but the offset updates of Consumer Groups are very frequent operations. These high throughput write operations greatly slow down the performance of the ZooKeeper cluster, so the Kafka community gradually reached a consensus: it is not appropriate to save consumer offsets in ZooKeeper.

Therefore, in the new version of Consumer Group, the Kafka community redesigned the way consumer offsets are managed, using the method of saving offsets in the internal topic of Kafka. This internal topic is the love and hate topic called __consumer_offsets. I will specifically introduce this mysterious topic in the content later. However, for now, you need to remember that the new version of Consumer Group saves offsets in the internal topic of the Broker.

Finally, let’s talk about the famous rebalance process on the Consumer Group side. I describe it as “famous”, and to some extent, it is also “infamous,” because the bugs related to it have been recurring. Here I will give you a little teaser, and I will explain why it is “hated” later. Let’s first understand what rebalance is.

Rebalance is essentially a protocol that specifies how all consumers in a consumer group reach a consensus to distribute each partition of the subscribed topic. For example, if a group has 20 consumer instances and subscribes to a topic with 100 partitions, Kafka normally assigns 5 partitions to each consumer. This process of assignment is called rebalance.

So when does a Consumer Group perform a rebalance? There are three trigger conditions for rebalance:

  1. The number of group members changes. For example, when new consumer instances join or leave the group, or when a consumer instance crashes and is “kicked out” of the group.
  2. The number of subscribed topics changes. The Consumer Group can subscribe to topics using regular expressions. For example, consumer.subscribe(Pattern.compile(“t.*c”)) means that the group subscribes to all topics starting with the letter “t” and ending with the letter “c”. If you create a new topic that meets this condition during the operation of the Consumer Group, a rebalance will occur.
  3. The number of partitions for the subscribed topic changes. Kafka currently only allows increasing the number of partitions for a topic. When the number of partitions increases, it triggers a rebalance for all groups that subscribe to that topic.

When rebalance occurs, all consumer instances under the group will coordinate together. You might ask, how does each consumer instance know which partitions of the subscribed topic it should consume? This requires the assistance of an assignment strategy.

Currently, Kafka provides three default assignment strategies, each with its own advantages and disadvantages. We won’t go into detail on these strategies today. Just remember that the community will continuously improve these strategies to ensure the most fair distribution, so that each consumer instance can get an approximately equal number of partitions. For example, if there are 10 consumer instances in a group and 100 partitions to consume, the ideal distribution strategy is for each instance to get 10 partitions. This is called a fair distribution strategy. If there is a severe imbalance, it will result in some instances being “idle” while others are “busy”.

Let’s illustrate the process of rebalance occurring in a Consumer Group with a simple example. Suppose a Consumer Group currently has two consumers, A and B. When a third member, C, joins, Kafka will trigger a rebalance and reassign partitions to A, B, and C according to the default assignment strategy, as shown in the following diagram:

Clearly, the allocation after rebalance is still fair, with each consumer instance gaining the right to consume 2 partitions. This is the situation we hope to achieve.

After talking about rebalance, now let’s discuss why it is “hated”.

Firstly, the rebalance process has a significant impact on the consumption process of the Consumer Group. If you are familiar with the garbage collection mechanism of the JVM, you must have heard of the stop-the-world collection method, also known as STW. During STW, all application threads stop working, resulting in the entire application being frozen in place. The rebalance process is similar to this. During the rebalance process, all consumer instances stop consuming and wait for the rebalance to complete. This is one aspect of rebalance that is criticized.

Secondly, the current design of rebalance requires the participation of all consumer instances in the group and a complete reallocation of all partitions. However, a more efficient approach would be to minimize changes in the allocation plan. For example, if instance A was previously responsible for consuming partitions 1, 2, and 3, after rebalance, if possible, it would be better to let instance A continue consuming partitions 1, 2, and 3 rather than reallocating other partitions to it. In this way, the TCP connections between instance A and the brokers where these partitions are located can continue to be used, without the need to create new socket connections to other brokers.

Finally, rebalance is simply too slow. Once, there was a foreign user whose group had several hundred consumer instances, and it took several hours to complete a rebalance! This is completely unacceptable. The most tragic thing is that the community is currently powerless in this regard, and there are no particularly good solutions at least for now. The saying goes, “It is better not to have great abilities than to be plagued with misfortunes.” Perhaps the best solution is to avoid rebalance from happening in the first place.

Summary #

In conclusion, today I have shared with you various aspects of Kafka Consumer Group, including how it is defined, the problems it solves, and its features. We have also discussed the management of consumer group offsets and the famous Rebalance process. I hope that they will be of great help to you when developing consumer applications.

Open Discussion #

Today, it seems like I spoke a lot about the benefits of Consumer Groups (except for Rebalance). What do you think are the drawbacks of this consumer group design?

Feel free to share your thoughts and answers, and let’s discuss together. If you find it enlightening, you’re welcome to share the article with your friends.