19 How to Handle Commit Failed Exception Exceptions

19 How to Handle CommitFailedException Exceptions #

Hello, I’m Hu Xi. Today, I’m here to talk to you about how to handle the CommitFailedException exception.

When it comes to this exception, I believe that those of you who have used the Kafka Java Consumer Client API will not feel unfamiliar. As the name suggests, CommitFailedException is an error or exception that occurs when the Consumer client is trying to commit offsets, and it is a severe and unrecoverable exception. If the exception is a recoverable transient error, the offset commit API itself can handle them, as many offset commit API methods support automatic error retries, such as the commitSync method mentioned in the previous issue.

Every time CommitFailedException occurs, there is also a very famous comment that accompanies it. Why do I say it is “famous”? First, I can’t think of any other exception class in the nearly 500,000 lines of Kafka source code that can enjoy this treatment, with such a long comment to explain its meaning; Second, even with such a long explanation, many people are still confused about the meaning of this exception.

Now, let’s appreciate the charm of this text together and see the latest explanation from the community for this exception:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

The first part of this paragraph means that the offset commit failed because the consumer group has already started the Rebalance process and assigned the partitions to another consumer instance. The reason for this is that the time interval between two consecutive calls to the poll method by your consumer instance exceeds the expected value of the max.poll.interval.ms parameter. This usually indicates that your consumer instance is spending too much time processing messages, delaying the invocation of the poll method.

In the second part, the community provides two corresponding solutions (the orange part):

  1. Increase the expected time interval max.poll.interval.ms parameter value.
  2. Reduce the number of messages returned in one call to the poll method, that is, reduce the max.poll.records parameter value.

Before discussing this paragraph in detail, I would also like to mention that there are actually three versions of this text in total. In addition to the latest version above, there are two earlier versions, which are:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

These two earlier versions are not much different from the latest version, so I won’t explain them in detail. I marked the specific differences with orange color. The reason I list these versions is to let you know what they are talking about when you encounter them in the future.

In fact, no matter which version of the text, they all indicate that an exception occurred when committing offsets. Let’s discuss when this exception is thrown. From the perspective of source code, the CommitFailedException exception usually occurs when manually committing offsets, that is, when the user explicitly calls the KafkaConsumer.commitSync() method. From the perspective of usage scenarios, there are two typical scenarios that may encounter this exception.

Scenario One

Let’s start with the most common scenario. When the total processing time of messages exceeds the preset max.poll.interval.ms parameter value, the Kafka consumer will throw the CommitFailedException exception. This is the most “authentic” way for this exception to appear. You just need to write a consumer program, use the KafkaConsumer.subscribe method to subscribe to any topic, then set the consumer-side parameter max.poll.interval.ms=5 seconds, and finally insert Thread.sleep(6000) and manually commit offsets between the loop calls to the KafkaConsumer.poll method. This will successfully reproduce this exception. Here, I will show the main code logic.

...
Properties props = new Properties();
...
props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
    ConsumerRecords<String, String> records = 
		consumer.poll(Duration.ofSeconds(1));
    // Simulate real message processing logic using Thread.sleep
    Thread.sleep(6000L);
    consumer.commitSync();
}

To prevent exceptions in this scenario, you need to simplify your message processing logic. There are four methods you can use:

  1. Shorten the time it takes to process a single message. For example, if the downstream system used to consume a message takes 100 milliseconds to process, optimizing it to take only 50 milliseconds would double the Consumer’s TPS.

  2. Increase the maximum duration allowed for the downstream system to consume a batch of messages. This depends on the value of the Consumer parameter max.poll.interval.ms. In the latest version of Kafka, the default value of this parameter is 5 minutes. If you can’t simplify your consumption logic, increasing this parameter’s value is a good option. It’s worth noting that in versions prior to Kafka 0.10.1.0, this parameter didn’t exist. So if you’re still using a client API version prior to 0.10.1.0, you would need to increase the value of the session.timeout.ms parameter. Unfortunately, the session.timeout.ms parameter has other meanings, so increasing its value may have other “adverse effects.” This is one of the reasons why the community introduced the max.poll.interval.ms parameter in version 0.10.1.0, to separate this part of the meaning from session.timeout.ms.

  3. Reduce the total number of messages consumed by the downstream system at one time. This depends on the value of the Consumer parameter max.poll.records. The current default value is 500, which means that calling the KafkaConsumer.poll method will return a maximum of 500 messages. This parameter essentially sets an upper limit on the total number of messages returned by a single call to the poll method. If the previous two methods don’t work for you, reducing this parameter’s value is the simplest way to avoid a CommitFailedException exception.

  4. Use multi-threading in the downstream system to speed up consumption. This is the most advanced and also the most difficult solution to implement. The idea is to have the downstream system manually create multiple consumer threads to process a batch of messages returned by the poll method. In the past, when consuming data with Kafka Consumer, it was mostly single-threaded. If the consumption speed couldn’t match the speed at which the Kafka Consumer returned messages, it would throw a CommitFailedException exception. With multi-threading, you have more flexibility to control the number of threads and adjust the consumption capacity at any time. Coupled with the current multi-core hardware conditions, this method can be considered the highest level solution to prevent CommitFailedException. In fact, many mainstream big data stream processing frameworks use this approach. For example, when Apache Flink integrates with Kafka, it creates multiple KafkaConsumerThread threads to handle data consumption across multiple threads. However, everything has its pros and cons. Implementing this method is not easy, especially when it comes to handling offset commits among multiple threads, which is prone to errors. In the following sections, I will focus on discussing implementation solutions for multi-threaded consumption. Based on the above 4 handling methods, I personally recommend that you first try using Method 1 to prevent this exception from occurring. Optimizing the consumption logic of the downstream system is a beneficial and harmless way, unlike Method 2 and Method 3, which involve the trade-off between Kafka Consumer TPS and consumption latency. If Method 1 is difficult to implement, then you can follow the rules below to practice Method 2 and Method 3.

First, you need to figure out the average latency of consuming each message in your downstream system. For example, if your consumption logic is to get a message from Kafka and write it to MongoDB downstream, assuming the average latency of accessing MongoDB does not exceed 2 seconds, then you can consider that message processing will take 2 seconds. If you calculate it based on max.poll.records equal to 500, the total consumption time for a batch of messages is approximately 1000 seconds. Therefore, the value of the max.poll.interval.ms parameter of your Consumer should not be lower than 1000 seconds. If you use the default configuration, the default value of 5 minutes is clearly not enough, and you will most likely encounter the CommitFailedException. Increasing max.poll.interval.ms to more than 1000 seconds belongs to the second method mentioned above.

In addition to adjusting max.poll.interval.ms, you can also choose to adjust the value of max.poll.records to reduce the number of messages returned by each poll method. Taking the previous example, you can set max.poll.records to 150, or even less, so that the total consumption time for each batch of messages does not exceed 300 seconds (150*2=300), which is the default value of max.poll.interval.ms, 5 minutes. This approach of reducing max.poll.records belongs to Method 3 mentioned above.

Scenario 2

Okay, now we have finished talking about the classic occurrence scenarios and countermeasures of the CommitFailedException. The knowledge you have gained about this exception is enough to help you deal with the “pitfalls” caused by this exception during the application development process. However, in fact, this exception also has a less well-known occurrence scenario. Understanding this obscure scenario can help you broaden your knowledge of Kafka Consumers and prevent some weird problems in advance. Now let’s talk about this scenario.

Previously, we spent a lot of time learning about Kafka consumers, but most of the focus was on consumer groups, known as Consumer Group. However, Kafka Java Consumer also provides a standalone consumer called the Standalone Consumer. It does not have a concept of consumer groups, and each consumer instance works independently without any connection to each other. However, you need to note that the offset commit mechanism of the standalone consumer is the same as that of the consumer group. Therefore, the offset commit of the standalone consumer must also comply with the previous rules, such as specifying the group.id parameter to commit the offset. You may find it strange that since it is a standalone consumer, why do you still need to specify the group.id? Well, there is no way, it is designed like this by the community. In any case, both consumer groups and standalone consumers need to specify group.id before using them.

Now the problem arises. If there are consumer group programs and standalone consumer programs in your application with the same group.id value, then when the standalone consumer program manually commits the offset, Kafka will immediately throw a CommitFailedException because Kafka cannot recognize this consumer instance with the same group.id and returns an error to it, indicating that it is not a legitimate member of the consumer group.

Although this scenario is rare, it is not completely impossible. In a large company, especially those that use Kafka as a company-wide message engine system, each department or team may have their own consumer applications. Who can guarantee that the group.id configured for each consumer program does not overlap? Once an unfortunate duplication occurs, and this scenario mentioned earlier happens, none of the above-mentioned methods can avoid this exception. It is frustrating that neither the previous versions of the exception explanation nor the community websites or online articles mention this usage scenario at all. Personally, I think this should be considered a bug in Kafka. Instead of simply returning a CommitFailedException to indicate a failed offset commit, a better approach should be to friendly prompt you the reason for the error in the Consumer application somewhere, such as through logs or other means, so that you can handle or even prevent this exception correctly.

Summary #

In summary, today we discussed in detail the CommitFailedException exception often encountered in Kafka Consumer. We started with its meaning, then discussed when and under what circumstances it occurs, as well as the responses in each scenario. Of course, I also left a cliffhanger. In the content following this column, I will discuss in detail the implementation of multi-threaded consumption. I hope that through today’s sharing, you can have a clear understanding of all aspects of the CommitFailedException exception, and be able to effectively handle this exception in the future.

Open Discussion #

Please compare the four methods we mentioned today for preventing this anomaly and share your understanding of them.

Feel free to write down your thoughts and answers, and let’s discuss together. If you feel you have gained something from this, please also share the article with your friends.