18 Kafka Log Compaction Submission Details

18 Kafka Log Compaction Submission Details #

Hello, I’m Hu Xi. Today, let’s talk about offset commits in Kafka.

As we mentioned before, the consumer side has the concept of offsets, which is different from the offset of messages in partitions, even though they are both referred to as “offset” in English. Today, we will talk about the consumer’s consumption offset, which records the offset of the next message that the consumer needs to consume. This might be different from what you previously understood, but please remember that it refers to the offset of the next message, not the offset of the latest consumed message.

Let me give you an example to illustrate. Let’s say there are 10 messages in a partition with offsets ranging from 0 to 9. A consumer application has already consumed 5 messages, which means the consumer has consumed messages with offsets ranging from 0 to 4. At this point, the consumer’s offset is 5, pointing to the offset of the next message.

Consumer needs to report its offset data to Kafka, and this reporting process is called offset commit. Since a consumer can consume data from multiple partitions simultaneously, offset commits are actually performed at the partition level. In other words, the consumer needs to commit offset data for each partition assigned to it.

The purpose of offset commits is mainly to represent the consumer’s consumption progress. So, when a consumer restarts after a failure, it can read the previously committed offset values from Kafka and continue consuming from the corresponding offset, thus avoiding starting the entire consumption process from scratch. In other words, offset commits are a tool or semantic guarantee provided by Kafka. You are responsible for maintaining this semantic guarantee. If you commit an offset X, Kafka will assume that you have successfully consumed all messages with offset values less than X.

This point is particularly crucial. Because offset commits are very flexible, you can commit any offset value you want, but you will also bear the consequences. For example, suppose your consumer has consumed 10 messages, but you commit an offset value of 20. In theory, messages with offsets between 11 and 19 might be lost. On the contrary, if you commit an offset value of 5, messages with offsets between 5 and 9 might be consumed again. Therefore, I want to emphasize again that the semantic guarantee of offset commits is your responsibility, and Kafka blindly accepts the offset values you commit. How you manage offset commits directly affects the message semantic guarantee provided by your consumer.

Considering the significant impact of offset commits on the consumer side, Kafka, especially the KafkaConsumer API, provides multiple methods for committing offsets. From a user’s perspective, offset commits can be divided into automatic commits and manual commits, while from the consumer side perspective, offset commits can be divided into synchronous commits and asynchronous commits.

Let’s first talk about automatic commits and manual commits. Automatic commits mean that Kafka Consumer silently commits offsets for you in the background, and as a user, you don’t have to worry about it. On the other hand, manual commits mean that you have to commit offsets by yourself, and Kafka Consumer won’t do it at all.

Enabling automatic offset commits is straightforward. On the consumer side, there is a parameter called enable.auto.commit, which can be set to true or left unset. Its default value is true, which means that Java Consumer automatically commits offsets by default. If automatic commits are enabled, another parameter on the consumer side comes into play: auto.commit.interval.ms. Its default value is 5 seconds, indicating that Kafka will automatically commit offsets every 5 seconds for you.

To clarify this issue, I have provided complete Java code. This code demonstrates how to set automatic offset commits. With this code as a foundation, I won’t show the complete code for the rest of today’s explanation.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

}

The code in the third and fourth lines above is the method to enable automatic offset commit. Overall, it is quite simple, right?

On the contrary, manual offset commit is the opposite of automatic commit. The method to enable manual offset commit is to set enable.auto.commit to false. However, simply setting it to false is not enough because you are just telling the Kafka Consumer not to automatically commit the offset. You still need to call the corresponding API to manually commit the offset.

The simplest API is KafkaConsumer#commitSync(). This method will commit the latest offset returned by KafkaConsumer#poll(). From the name itself, it is a synchronous operation, meaning that the method will wait until the offset is successfully committed before returning. If an exception occurs during the commit process, the method will throw an exception. The following code snippet demonstrates the usage of commitSync():

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    process(records); // process messages
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        handle(e); // handle commit failed exception
    }
}

As seen, the consumer.commitSync() method is called after you have processed all the messages returned by poll(). If you prematurely commit the offset, you may encounter data loss. Now, you might ask if automatic offset commit can avoid data loss by correctly determining when to commit the offset. In order to understand this question, we need to delve deeper into the order of automatic offset commit.

Once enable.auto.commit is set to true, Kafka guarantees that when poll() is called, the offset of the last batch of messages returned by the previous poll() will be committed. In terms of order, the logic of the poll() method is to commit the offset of the previous batch of messages first before processing the next batch of messages. Therefore, it can ensure that data loss does not occur. However, automatic offset commit has a problem: it may result in duplicate consumption.

By default, the consumer automatically commits the offset every 5 seconds. Now, let’s assume that a rebalance operation occurs 3 seconds after the offset is committed. After the rebalance, all consumers continue to consume from the offset of the last commit, but this offset is 3 seconds old. Therefore, all data consumed in the past 3 seconds before the rebalance will be consumed again. Although you can reduce the value of auto.commit.interval.ms to increase the commit frequency, doing so will only shorten the time window for duplicate consumption and cannot completely eliminate it. This is a flaw in the automatic commit mechanism.

On the other hand, manual offset commit is more flexible as you have complete control over when and how often to commit the offset. However, it also has a drawback. When calling commitSync(), the consumer program will be in a blocking state until the remote broker returns the commit result, and this state will not end. In any system, blocking caused by the program rather than resource limitations can be a bottleneck of the system and can affect the TPS (transactions per second) of the entire application. Of course, you can choose to increase the commit interval, but the consequence is that the commit frequency of the consumer will decrease, and more messages will be consumed again when the consumer restarts.

Considering this issue, the Kafka community provides another API method for manual offset commit: KafkaConsumer#commitAsync(). From the name itself, it is not synchronous but an asynchronous operation. Once commitAsync() is called, it will return immediately without blocking, thus not affecting the TPS of the consumer application. Since it is asynchronous, Kafka provides a callback function for you to implement the logic after the commit, such as logging or handling exceptions. The following code snippet demonstrates how to call commitAsync():

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    // your message processing logic
    consumer.commitAsync();
}
process(records); // Process messages
consumer.commitAsync((offsets, exception) -> {
    if (exception != null)
        handle(exception);
});
}

Can commitAsync replace commitSync? The answer is no. The problem with commitAsync is that it does not automatically retry when an error occurs. Because it is an asynchronous operation, if it were to retry after a failed commit, the offset value it retries with may already be “expired” or not the latest value. Therefore, the retry of asynchronous commit is actually meaningless, so commitAsync does not retry.

Obviously, if we manually commit, we need to use a combination of commitSync and commitAsync to achieve the best effect, for two reasons:

  1. We can use the automatic retry of commitSync to avoid transient errors such as network fluctuations or broker GC. Because these issues are temporary, automatic retries usually succeed. Therefore, we don’t want to retry ourselves, but hope that Kafka Consumer can do it for us.
  2. We don’t want the program to be in a blocking state all the time, affecting TPS.

Let’s take a look at the following code, which demonstrates how to use both API methods to perform manual commits.

try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        process(records); // Process messages
        commitAsync(); // Use asynchronous commit to avoid blocking
    }
} catch(Exception e) {
    handle(e); // Handle exceptions
} finally {
    try {
        consumer.commitSync(); // Use synchronous blocking commit for the last commit
    } finally {
        consumer.close();
    }
}

This code uses both commitSync() and commitAsync(). For regular and phased manual commits, we call commitAsync() to avoid blocking the program. Before closing the Consumer, we call commitSync() to perform a synchronous blocking offset commit to ensure that the Consumer can save the correct offset data before closing. By combining the two, we achieve asynchronous non-blocking offset management and ensure the correctness of the Consumer’s offset. So if you need to develop a Kafka Consumer application by yourself, I recommend using the code example above to implement manual offset commits.

We have talked about automatic commits and manual commits, as well as synchronous commits and asynchronous commits. Is that all for Kafka offset commits? In fact, we are missing one part.

In fact, the Kafka Consumer API also provides a set of more convenient methods that can help you achieve more fine-grained offset management. All the offset commits we mentioned earlier are for committing the offsets of all the messages returned by the poll method at once, that is, directly committing the offset of the latest message. But what if I want to commit the offset more finely?

Imagine this scenario: instead of 500 messages, the poll method returns 5000 messages. In this case, you definitely don’t want to process all 5000 messages and then commit the offset, because if an error occurs in the middle, you would have to start all over again. This is similar to transaction processing in databases. Often, we want to split a large transaction into several smaller transactions and commit them separately, which can effectively reduce the time required for error recovery.

The same principle applies to Kafka. For a Consumer that needs to process a large number of messages at once, it will be concerned if there is a method in the community that allows it to commit the offset in the middle of consumption. For example, in the case of the 5000 messages example that I just mentioned, you may want to commit the offset every 100 messages processed, so as to avoid re-consuming a large batch of messages if an error occurs.

Fortunately, the Kafka Consumer API provides such methods for manual commits: commitSync(Map) and commitAsync(Map). Their parameter is a Map object, with the key being the TopicPartition, which is the consumed partition, and the value being an OffsetAndMetadata object that mainly stores the offset data.

Let’s take the example of committing the offset every 100 messages using commitAsync as an example. In fact, the calling method for commitSync is exactly the same as commitAsync.

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
...
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record: records) {
        process(record); // Process message
        offsets.put(new TopicPartition(record.topic(), record.partition()),
        new OffsetAndMetadata(record.offset() + 1));
        if (count % 100 == 0)
            consumer.commitAsync(offsets, null); // The callback handling logic is null
            count++;
    }
}

Let me explain this code briefly. The program first creates a Map object to store the partition offsets that the Consumer needs to commit during the consumption process. After that, it starts processing messages one by one and constructs the offset value to be committed. Remember when I mentioned committing the offset of the next message? That’s the reason why we add 1 to the current message offset when constructing the OffsetAndMetadata object. The last part of the code is the offset commit. I set a counter here to commit the offset every 100 messages accumulated. Unlike the commitAsync without parameters, here we call commitAsync with a Map object parameter for fine-grained offset commit. In this way, this code can commit the offset every 100 messages processed, without being limited by the total number of messages returned by the poll method.

Summary #

Now, let’s summarize today’s content. Kafka Consumer’s offset commit is an important means to achieve consumer-side semantic guarantees. Offset commits can be done automatically or manually, and manual commits can be done synchronously or asynchronously. In practice, it is recommended to use manual commit mechanism because it is more controllable and flexible. Additionally, it is suggested to use both synchronous and asynchronous commit methods, as this can ensure high TPS (Transactions Per Second) without sacrificing the automatic retry feature and improve the high availability of consumer applications. In conclusion, the Kafka Consumer API provides multiple flexible commit methods, allowing you to customize your commit strategy according to your own business scenarios.

Open Discussion #

In fact, even manual submission cannot prevent duplicate message consumption. Suppose a Consumer encounters a failure before processing a message and committing the offset, the next time it restarts, it will still result in duplicate message consumption. Please consider how to implement the deduplication logic in your business scenario.

Feel free to share your thoughts and answers, and let’s discuss together. If you find it helpful, you are welcome to share this article with your friends.