22 How Are Consumer Group Consumption Progress Monitors Implemented

22 How are Consumer Group Consumption Progress Monitors Implemented #

Hello, I’m Hu Xi. Today I’m going to share with you the topic of how to monitor consumer group progress.

For Kafka consumers, the most important thing is to monitor their consumption progress, or rather, monitor the lag of their consumption. This lag is specifically referred to as Consumer Lag.

The lag refers to the extent to which the consumer is behind the producer. For example, if a Kafka producer successfully produces 1 million messages to a topic, and your consumer has consumed 800,000 messages, then we say that your consumer lags behind by 200,000 messages, which means the lag is 200,000.

Generally, lag is measured in terms of message count, and we usually discuss lag at the topic level. However, in reality, Kafka monitors lag at the partition level. If you want to calculate at the topic level, you need to manually aggregate the lags of all topic partitions and add them up to get the final lag value.

As mentioned earlier, for consumers, lag is considered the most important monitoring metric. It directly reflects the state of a consumer. A properly functioning consumer should have a small lag, even close to zero, indicating that the consumer can consume the messages produced by the producer in a timely manner and has a small lag. Conversely, if a consumer has a large lag value, it usually indicates that it cannot keep up with the producer’s speed, and the lag will continue to increase, thereby slowing down the downstream message processing speed.

What’s even worse is that, due to the consumer’s inability to match the producer’s speed, it can result in the consumer’s consumed data no longer being in the operating system’s page cache. In that case, the consumer has to read them from the disk, which further widens the gap with the producer, resulting in the Matthew effect, where consumers with already large lag become slower and the lag grows larger.

Considering these reasons, you must constantly monitor the consumer’s consumption progress in actual business scenarios. Once there is a trend of increasing lag, you must identify and address the problem in a timely manner to avoid business losses.

Since consumption progress is so important, how do we monitor it? Simply put, there are three methods.

  1. Use Kafka’s built-in command line tool kafka-consumer-groups script.
  2. Use Kafka Java Consumer API programming.
  3. Use Kafka’s built-in JMX monitoring metrics.

Next, we will discuss each of these three methods.

Built-in Kafka Commands #

First, let’s understand the first method: using the built-in command line tool bin/kafka-consumer-groups.sh (or bat). The kafka-consumer-groups script is the most direct tool provided by Kafka for monitoring consumer progress. Besides monitoring lag, it also has other functionalities. Today, we will mainly discuss how to use it to monitor lag.

If you only look at the name, you may think that it is only used for operating and managing consumer groups. Actually, it can also monitor the lag of standalone consumers. As mentioned before, standalone consumers are consumer programs that do not use the consumer group mechanism. Just like consumer groups, they also need to configure the group.id parameter, but unlike consumer groups where KafkaConsumer.subscribe() is called, standalone consumers use KafkaConsumer.assign() to consume specific partitions directly. The focus today is not to learn about standalone consumers, so all you need to know is that the following discussion applies to standalone consumers as well.

Using the kafka-consumer-groups script is very simple. The script is located in the bin subdirectory of the Kafka installation directory. You can use the following command to view the lag of a given consumer:

$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker connection info> --describe --group <group name>

The Kafka broker connection info is in the format of <hostname:port>, and the group name is the group.id value set in your consumer program. Let me give an example to illustrate the specific usage. Please see the output in the screenshot below:

When running the command, I specified the connection info for the Kafka cluster, i.e. localhost:9092. Additionally, I set the consumer group name to be queried as testgroup. The output of the kafka-consumer-groups script provides abundant information. Firstly, it displays each partition subscribed by the consumer group, with each partition appearing on one line. Secondly, besides topic and partition information, it reports the offset value of the latest produced message in each partition (column LOG-END-OFFSET), the offset value of the latest consumed message by the consumer group (column CURRENT-OFFSET), lag (the difference between the previous two), consumer instance ID, hostname of the consumer’s connected broker, and the consumer’s CLIENT-ID information.

Undoubtedly, the value in the LAG column is the most crucial among this data. In the screenshot, the lag value for each partition is around 600,000, which indicates that in my test, the consumer group is far behind the producer’s progress. Ideally, we hope that all values in this column are 0, as it would mean that the consumer has no lag at all.

Sometimes, when you run this script, you may encounter a situation like this:

By comparing it with the previous screenshot, we can easily notice a difference in the output: there are no values in the CONSUMER-ID, HOST, and CLIENT-ID columns! If you come across this situation, don’t panic; it is because you have not started the consumer program when running the kafka-consumer-groups script. Please note the text highlighted in orange explicitly tells us that the current consumer group has no active members, meaning that no consumer instances have been started. Although these columns have no values, the LAG column remains valid and correctly calculates the lag value of this consumer group.

In addition to the scenario where the aforementioned three columns have no values, another possible situation is that the command does not return any results at all. In this case, there is no need to panic either; it is because the Kafka version you are using is relatively old, and the kafka-consumer-groups script does not yet support querying non-active consumer groups. Once you encounter this problem, you can choose to upgrade your Kafka version or use the other methods I will mention next to make queries.

Kafka Java Consumer API #

Many times, you may not be satisfied with querying lag using command-line tools and instead prefer to automate monitoring using a programmatic approach. Luckily, the community has provided us with such a method. This is what we will discuss as the second approach today.

In simple terms, the Java Consumer API provided by the community offers two sets of methods: one for querying the latest message offset for a given partition and the other for querying the latest consumed message offset for a consumer group. By using these methods, we can calculate the corresponding lag.

The following code demonstrates how to use the Consumer-side API to monitor the lag value of a given consumer group:

public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
    Properties props = new Properties();
    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    try (AdminClient client = AdminClient.create(props)) {
        ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
        try {
            Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Disable auto offset commit
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
                Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
                return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
                        entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // Handle InterruptedException
            // ...
            return Collections.emptyMap();
        } catch (ExecutionException e) {
            // Handle ExecutionException
            // ...
            return Collections.emptyMap();
        } catch (TimeoutException e) {
            throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
        }
    }
}

You don’t need to fully understand the meaning of each line in the above code. Just remember three things: the first part calls the AdminClient.listConsumerGroupOffsets method to get the latest consumed message offset for the given consumer group; the second part obtains the latest message offset for the subscribed partitions; and finally, the third part performs the necessary subtraction operation to get the lag value and encapsulates it into a Map object.

I’m giving you this code so that you can directly apply the lagOf method to your production environment and achieve the goal of programmatic monitoring of consumer lag. However, please note that this code is only applicable to Kafka version 2.0.0 and above, as the AdminClient.listConsumerGroupOffsets method is not available in versions prior to 2.0.0.

Kafka JMX Monitoring Metrics #

The first two methods mentioned above can conveniently query the Lag information of a given consumer group. However, in many practical monitoring scenarios, we often rely on existing monitoring frameworks. If this is the case, the above two methods are not very useful because they cannot be integrated into existing monitoring frameworks such as Zabbix or Grafana. Now let’s look at the third method, which is to use the JMX monitoring metrics provided by Kafka to monitor the Lag value of consumers.

Currently, Kafka consumer provides a JMX metric named kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}", and it has many attributes. There are two groups of attributes related to our topic today: records-lag-max and records-lead-min, which respectively represent the maximum Lag value and the minimum Lead value that this consumer has reached in the test window time.

We have repeatedly explained the meaning of Lag value, so I won’t repeat it. The Lead value here refers to the difference between the offset of the consumer’s latest consumed message and the offset of the first message in the partition. Obviously, Lag and Lead are two aspects of the same thing: the larger the Lag value, the smaller the Lead value, and vice versa.

You may ask, why introduce Lead? Can’t I just monitor Lag? The reason for introducing Lead is because this part of the functionality was implemented by me. Just kidding, the reason the community introduced Lead is that if we only look at Lag, we may not be aware of potential serious problems in a timely manner.

Imagine this, if the Lag keeps increasing, it may only give you a feeling that the consumer program is getting slower, at least it cannot catch up with the producer program. Besides that, you may not do anything else. After all, sometimes this is acceptable. But on the other hand, once you monitor that the Lead is getting smaller, or even close to 0, you must be careful, as this may indicate that the consumer side is about to lose messages.

Why? We know that Kafka messages are set to stay for a certain period of time, the default is 1 week, which means Kafka deletes data that is 1 week old by default. If your consumer program is slow enough, slow enough that it is about to consume data that Kafka is about to delete, then you must handle it immediately, otherwise messages will definitely be deleted, leading to the consumer program readjusting its offset value. This may have two consequences: one is that the consumer re-consumes all the data from the beginning, and the other is that the consumer starts consuming from the latest offset, skipping all the messages that haven’t been consumed yet, resulting in a false impression of message loss.

Both of these situations are intolerable, so there must be a JMX metric that clearly represents this situation, which is the reason Lead is introduced. Therefore, the increase in Lag from 1 million to 2 million is far less important than the decrease in Lead from 200 to 100. In actual production environments, please be sure to monitor both Lag and Lead values. Of course, it’s true that I also developed this lead JMX metric, that’s for sure.

Next, I will provide a screenshot of monitoring this JMX metric using the JConsole tool. From this picture, we can see that the consumer with client-id “consumer-1” has a maximum Lag value of 714202 and a minimum Lead value of 83 within the given measurement period, indicating that this consumer has a significant consumption delay.

Kafka consumers also provide additional JMX metrics at the partition level for monitoring Lag and Lead values separately. The JMX name is: kafka.consumer:type=consumer-fetch-manager-metrics,partition="{partition}",topic="{topic}",client-id="{client-id}".

In our example, the client-id is still “consumer-1”, the topic and partition are “test” and “0” respectively. The following figure shows the JMX metrics at the partition level:

The partition-level JMX metrics have two additional attributes: records-lag-avg and records-lead-avg, which can calculate the average Lag and Lead values. In actual scenarios, we will use these two JMX metrics more often.

Summary #

Today I presented three methods for monitoring consumer groups and tracking the progress of independent consumer programs. In terms of convenience, it can be said that Method 1 is the simplest, as we can directly run Kafka’s built-in command-line tool. Method 2, using the Consumer API to calculate Lag, is also an effective method, especially because it can be integrated into many enterprise-level automated monitoring tools. However, Method 3 is the most versatile, as it allows for direct configuration of JMX monitoring metrics in mainstream monitoring frameworks.

In a real production environment, I recommend prioritizing Method 3, while keeping Method 1 and Method 2 as alternatives in your toolbox, ready to be used in various scenarios.

Open Discussion #

Please share your views on these three methods. In addition, in real business scenarios, how do you monitor consumer progress?

Feel free to write down your thoughts and answers, and let’s discuss together. If you feel like you have gained something, please feel free to share the article with your friends.