30 How to Reset Consumer Group Offsets

30 How to Reset Consumer Group Offsets #

Hello, I’m Hu Xi. Today I want to share with you the topic of how to reset consumer group offset.

Why do we need to reset the consumer group offset? #

As we know, there are significant differences in design between Kafka and traditional message brokers. One of the most notable differences is that Kafka consumers can consume messages in a replayable manner.

Traditional message brokers like RabbitMQ or ActiveMQ handle and respond to messages in a destructive way. Once a message is successfully processed, it is deleted from the broker.

On the other hand, Kafka is a log-based message broker, and when consumers consume messages, they simply read data from disk files. It is a read-only operation, so the consumers do not delete the message data. Additionally, since consumers control the offset data, it is easy for them to modify the offset value, enabling the functionality of consuming historical data repeatedly.

By the way, many students have asked in the comment section of the column: How do I determine whether to use traditional message brokers or Kafka in practical use cases? Let me provide a unified answer here. If your scenario has complex message processing logic with high processing costs, and you don’t care about the order of messages, then traditional message brokers are more suitable. On the other hand, if your scenario requires high throughput with short processing times for each message, and you care about the order of messages, Kafka is your preferred choice.

Resetting Offset Strategy #

Regardless of the setting method, resetting the offset can be done from two dimensions.

  1. Offset dimension. This refers to resetting based on the offset value. That is, directly resetting the consumer’s offset value to the given offset value.
  2. Time dimension. We can specify a time for the consumer to adjust the offset to the minimum offset greater than that time; or we can specify a time interval, such as 30 minutes ago, and let the consumer directly roll back to the offset value 30 minutes ago.

The table below lists 7 reset strategies. I will now explain these strategies in detail.

The Earliest strategy means adjusting the offset to the earliest offset of the topic. This earliest offset is not necessarily 0, because in a production environment, messages that are very old will be automatically deleted by Kafka, so the current earliest offset is likely to be a value greater than 0. If you want to re-consume all messages of a topic, you can use the Earliest strategy.

The Latest strategy means resetting the offset to the latest offset. If you have sent a total of 15 messages to a topic, the latest offset will be 15. If you want to skip all historical messages and start consuming from the latest message, you can use the Latest strategy.

The Current strategy means adjusting the offset to the latest offset submitted by the consumer. Sometimes you may encounter a scenario where you modify the consumer program code, restart the consumer, and find that the code has problems. You need to roll back the code changes and also adjust the offset to the position where the consumer was restarted. In this case, the Current strategy can help you achieve this function.

The Specified-Offset strategy in the 4th row of the table is a more general strategy, which means that the consumer adjusts the offset value to the offset you specify. The typical use case for this strategy is when the consumer program is processing an error message, you can manually “skip” the processing of this message. In actual use, there may be cases where corrupted messages cannot be consumed, and the consumer program throws an exception and cannot continue working. Once you encounter this problem, you can try using the Specified-Offset strategy to work around it.

If the Specified-Offset strategy requires you to specify the absolute value of the offset, then the Shift-By-N strategy specifies the relative value of the offset, that is, you only need to give the distance of a segment of messages to skip. The “skip” here is bidirectional, you can “skip” forward or backward. For example, if you want to reset the offset to 100 positions before the current offset, you need to specify N as -100.

The strategies mentioned just now are all from the offset dimension. Now, let’s talk about the DateTime and Duration strategies for resetting the offset from the time dimension.

The DateTime strategy allows you to specify a time and then reset the offset to the earliest offset after that time. A common use case is when you want to re-consume yesterday’s data, you can use this strategy to reset the offset to 00:00 yesterday.

The Duration strategy specifies a relative time interval, and then adjusts the offset to the offset that is distant from the current given time interval. The specific format is PnDTnHnMnS. If you are familiar with the Duration class introduced in Java 8, you should not be unfamiliar with this format. It is a Duration format that conforms to the ISO-8601 specification, starting with the letter P, followed by 4 parts, namely D, H, M, and S, representing days, hours, minutes, and seconds respectively. For example, if you want to roll back the offset to 15 minutes ago, you can specify PT0H15M0S.

I will provide the implementation of these 7 reset strategies later. However, before that, let me explain the methods to reset the offset. Currently, there are two ways to reset the offset of a consumer group.

  • Implement it through the consumer API.
  • Implement it through the kafka-consumer-groups command line script.

Consumer API Setting #

First, let’s take a look at how to reset offsets using the API. I will mainly use the Java API as an example. If you are using another programming language, the methods should be similar, but you need to refer to the specific API documentation.

To reset offsets using the Java API, you need to call the seek method of KafkaConsumer, or its variant methods seekToBeginning and seekToEnd. Let’s see their method signatures.

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

According to the method definitions, we can see that each call to the seek method can only reset offsets for one partition. The OffsetAndMetadata class is a compound class that encapsulates a Long offset and custom metadata. However, in most cases, the custom metadata is empty, so you can basically consider this class to represent the main offset value of a message. The variant methods seekToBeginning and seekToEnd have the ability to reset offsets for multiple partitions at once. When calling them, you can pass in multiple topic partitions at once.

Now that we have these methods, we can implement the 7 strategies mentioned earlier one by one. Let’s start with the implementation of the Earliest strategy in Java, as shown in the code below:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

String topic = "test";  // The Kafka topic to reset offsets for
try (final KafkaConsumer<String, String> consumer = 
	new KafkaConsumer<>(consumerProperties)) {
         consumer.subscribe(Collections.singleton(topic));
         consumer.poll(0);
         consumer.seekToBeginning(
	consumer.partitionsFor(topic).stream().map(partitionInfo ->          
	new TopicPartition(topic, partitionInfo.partition()))
	.collect(Collectors.toList()));
} 

There are several key parts in this code that you need to pay attention to:

  1. The consumer program you create should disable auto committing offsets.
  2. The group ID should be set to the group ID of the consumer group you want to reset offsets for.
  3. When calling the seekToBeginning method, you need to construct all partition objects of the topic at once.
  4. Most importantly, you must call the poll method that takes a long parameter, rather than consumer.poll(Duration.ofSecond(0)).

Although the community no longer recommends using poll(long), it should not be removed in the short term, so you can use it with confidence. Additionally, to avoid repetition, I will only provide the most crucial code in the following examples.

The Latest strategy is similar to the Earliest strategy. We only need to use the seekToEnd method, as shown in the code below:

consumer.seekToEnd(
	consumer.partitionsFor(topic).stream().map(partitionInfo ->          
	new TopicPartition(topic, partitionInfo.partition()))
	.collect(Collectors.toList()));

Implementing the Current strategy is simple. We need to use the `committed` method of `KafkaConsumer` to get the latest committed offset, as shown in the code below:
    
    
    consumer.partitionsFor(topic).stream().map(info -> 
    	new TopicPartition(topic, info.partition()))
    	.forEach(tp -> {
    	long committedOffset = consumer.committed(tp).offset();
    	consumer.seek(tp, committedOffset);
    });

This code first calls the partitionsFor method to retrieve all partitions of the given topic, then iterates through each partition to get the corresponding committed offset, and finally uses the seek method to reset the offset to the committed offset.

To implement the Specified-Offset strategy, you can directly call the seek method, as shown below:

long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
	TopicPartition tp = new TopicPartition(topic, info.partition());
	consumer.seek(tp, targetOffset);
}
## Command-line configuration

There is another important way to reset offsets: **using the kafka-consumer-groups script**. Please note that this feature was introduced in Kafka 0.11. This means that if you are using a version of Kafka before 0.11, you can only reset offsets using the API.

Compared to the API method, resetting offsets via the command line is much simpler. For each of the 7 strategies we just discussed, there are 7 corresponding parameters. I will provide examples for each one below.

To reset offsets using the Earliest strategy, simply specify **\--to-earliest**.

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-earliest –execute


To reset offsets using the Latest strategy, simply specify **\--to-latest**.

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-latest –execute


To reset offsets using the Current strategy, simply specify **\--to-current**.

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-current –execute


To reset offsets using the Specified-Offset strategy, simply specify **\--to-offset**.

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-offset –execute


To reset offsets using the Shift-By-N strategy, simply specify **\--shift-by N**.

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –shift-by <offset_N> –execute


To reset offsets using the DateTime strategy, simply specify **\--to-datetime**.

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –to-datetime 2019-06-20T20:00:00.000 –execute


Finally, to reset offsets using the Duration strategy, simply specify **\--by-duration**.

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –by-duration PT0H30M0S –execute

## Summary

So far, I have covered the two methods of resetting consumer group offsets. Let's summarize. Today, we mainly discussed why and how to reset offsets in Kafka. Resetting offsets is mainly used for message replay. Currently, Kafka supports 7 reset strategies and 2 reset methods. In practical usage, I recommend using the second method, which is resetting offsets using the command line. After all, executing commands is much easier than writing programs. However, it's worth noting that the command line method for adjusting offsets is only available in Kafka 0.11 and later versions. If you are using earlier versions, you will have to rely on the API method.

![](../images/98a7d3f9b0d3050947772d8cd2c4caf3.jpg)
## Open Discussion

Have you encountered any scenarios where you needed to reset the displacement during actual usage? How did you implement it?

Feel free to share your thoughts and answers, and let's discuss together. If you feel that you have gained something from this, you are also welcome to share the article with your friends.