16 Unveil the Mystery of Log Compaction Topic

16 Unveil the Mystery of Log Compaction Topic #

Hello, I’m Hu Xi. Today I want to share with you the mysterious internal topic in Kafka, __consumer_offsets.

**consumer_offsets has a more formal name in Kafka source code, called “Offsets Topic”. For the convenience of today’s discussion, I will use the term “Offsets Topic” to refer to consumer_offsets. Note that it has two underscores.

Now let’s delve into the background and reasons behind the introduction of the Offsets Topic, that is, the history of the Offsets Topic.

In the previous issue, I mentioned that the offset management in old versions of Consumer relied on Apache ZooKeeper. It automatically or manually submitted offset data to ZooKeeper for storage. When the Consumer restarts, it can automatically read the offset data from ZooKeeper to continue consumption from where it left off. This design means that Kafka Broker does not need to store offset data, reducing the state space it needs to hold and thus facilitating high scalability.

However, ZooKeeper is not suitable for such high-frequency write operations. Therefore, starting from version 0.8.2.x, the Kafka community began to plan to modify this design and finally introduced a brand new offset management mechanism in the new version of Consumer, which naturally includes this new Offsets Topic.

The offset management mechanism in the new version of Consumer is actually quite simple. It submits the Consumer’s offset data as ordinary Kafka messages to the consumer_offsets topic. It can be said that the main function of consumer_offsets is to store the offset information of Kafka consumers. This submission process not only requires high persistence, but also supports high-frequency write operations. Obviously, Kafka’s topic design naturally satisfies these two conditions. Therefore, the idea of using a Kafka topic to store offsets is a perfect match.

Here I want to re-emphasize that, like other topics you create, the Offsets Topic is just a regular Kafka topic. You can manually create it, modify it, or even delete it. However, it is also an internal topic. In most cases, you don’t really need to “interact” with it or spend much effort managing it. Just leave it to Kafka.

Although the Offsets Topic is a regular Kafka topic, its message format is defined by Kafka itself and cannot be modified by users. In other words, you can’t randomly write messages to this topic because if the messages you write do not meet Kafka’s specified format, Kafka will not be able to parse them successfully, resulting in broker crashes. In fact, Kafka Consumer has an API to help you submit offsets, which means writing messages to the Offsets Topic. So, do not create your own Producer to send messages to this topic.

You may wonder, what format of messages does this topic store? The so-called message format can be simply understood as a key-value pair. The Key and Value represent the key and message body of the message, and in Kafka, they are just byte arrays. Imagine if you were to design this topic, what would you think the message format should look like? Before I reveal the community’s design solution, let’s design our own.

Let’s start with the Key. There are many consumers in a Kafka cluster, so since this topic is supposed to store consumer offset data, there must be a field in the message format to identify which consumer the offset data belongs to. Which field is more suitable for this data? Obviously, it should be placed in the Key.

Now that we know that the Key should contain a field to identify the consumer, which field in the current Kafka can identify a consumer? Remember when we talked about Consumer Group before? Yes, it’s the Group ID field, which can identify a unique Consumer Group.

Speaking of which, let me say a few more words. In addition to Consumer Group, Kafka also supports Standalone Consumers. Its operating mechanism is completely different from Consumer Groups, but the offset management mechanism is the same. Therefore, even with Standalone Consumers, they also have their own Group ID to identify themselves, so this message format is applicable to them as well.

Now we know that the Key saves the Group ID, but is it enough to just save the Group ID? Don’t forget, Consumer submits offsets at the partition level, which means it submits offsets for a certain or certain partitions. So obviously, the Key should also include the partitions for which the Consumer wants to submit offsets.

Now, let’s summarize our conclusion. The Key of the Offsets Topic should contain 3 parts:. If you agree with this conclusion, congratulations, that’s how the community designed it!

Next, let’s talk about the message body design. You might think that the message body should be simple, just saving an offset value. In reality, the community’s solution is much more complex. For example, the message body also stores some other metadata of the offset submission, such as timestamps and user-defined data. Storing this metadata helps Kafka perform various operations subsequently, such as deleting expired offset messages. But overall, we can still simply think of the message body as storing the offset value.

Of course, the message format of the Offsets Topic is not limited to just this one. In fact, it has 3 message formats. In addition to the format we just mentioned, there are 2 other formats:

  1. Messages used to store Consumer Group information.
  2. Messages used to delete expired offset messages or even delete Consumer Groups. Format 1 is very mysterious, to the point where you can hardly find it in search engines. However, you just need to remember that it is used to register Consumer Groups.

Format 2 is relatively more well-known. It has an exclusive name: tombstone message, also known as delete mark. The next time you see these words on Google or Baidu, don’t be surprised, they are referring to the same thing. These messages only appear in the source code and are not exposed to you. Its main feature is that its message body is null, or empty.

So, when will these types of messages be written? Once all Consumer instances under a Consumer Group have stopped, and their offset data has been deleted, Kafka will write tombstone messages to the corresponding partition of the offset topic, indicating that it wants to completely delete the information of this group.

Anyway, we have covered enough about message formats. Now let’s talk about how the offset topic is created. Typically, when the first Consumer program in the Kafka cluster starts, Kafka will automatically create the offset topic. As we mentioned before, the offset topic is just a regular Kafka topic, so it naturally has a corresponding number of partitions. But if it is automatically created by Kafka, how is the number of partitions set? This depends on the value of the broker-side parameter offsets.topic.num.partitions. Its default value is 50, so Kafka will automatically create an offset topic with 50 partitions. If you have ever been surprised by the appearance of many directories named __consumer_offsets-xxx under the Kafka log path, now you should understand that these are the offset topics automatically created by Kafka.

You may ask, besides the number of partitions, how is the replication factor controlled? The answer is simple, it is the responsibility of another broker-side parameter offsets.topic.replication.factor. Its default value is 3.

In summary, if the offset topic is automatically created by Kafka, then the topic will have 50 partitions and a replication factor of 3.

Of course, you can also choose to manually create the offset topic. The specific method is to use the Kafka API to create it before any Consumer is started in the Kafka cluster. The advantage of manual creation is that you can create an offset topic that meets your actual scenario needs. For example, many people say that 50 partitions are too many for them, and they don’t want that many partitions. In that case, you can create it yourself, without worrying about the value of offsets.topic.num.partitions.

However, my recommendation is still to let Kafka automatically create it. Currently, there are some places in the Kafka source code where 50 partitions are hard-coded. Therefore, if you manually create an offset topic with a different number of partitions than the default, you may encounter various strange issues. This is a bug in the community, and although the code has been fixed, it is still under review.

Creating the offset topic is of course for a purpose, so where exactly is the offset topic used? We have been talking about how Kafka Consumers commit offsets to this topic. So how do Consumers commit offsets? Currently, there are two ways for Kafka Consumers to commit offsets: automatic offset commit and manual offset commit.

On the Consumer side, there is a parameter called enable.auto.commit. If its value is true, then the Consumer silently commits offsets for you in the background at regular intervals controlled by a dedicated parameter auto.commit.interval.ms. Automatic offset commit has a significant advantage in that it is convenient, you don’t have to worry about offset commit, and it guarantees that message consumption will not be lost. But at the same time, this is also a disadvantage. Because it is too convenient, to the point where it loses a lot of flexibility and control. You have no control over Consumer-side offset management.

In fact, many big data frameworks integrated with Kafka, such as Spark and Flink, disable automatic offset commit. This leads to another way of committing offsets: manual offset commit, which means setting enable.auto.commit = false. Once set to false, as a Consumer application developer, you have to take responsibility for offset commit. The Kafka Consumer API provides methods for offset commit, such as consumer.commitSync. When these methods are called, Kafka writes the corresponding message to the offset topic.

If you choose automatic offset commit, then there may be a problem: as long as the Consumer is continuously running, it will indefinitely write messages to the offset topic.

Let’s take an extreme example. Suppose a Consumer is currently consuming the latest message of a topic, with an offset of 100, and no new messages are generated in this topic afterward, so the Consumer has no messages to consume, and the offset remains at 100. Since it is automatically committing offsets, the offset topic will be continuously written with messages where the offset = 100. Obviously, Kafka only needs to keep the latest message in this type of message, the previous messages can be deleted. This requires Kafka to have a message deletion strategy specific to the characteristics of the offset topic messages, otherwise these messages will accumulate and eventually fill up the entire disk.

How does Kafka delete expired messages in the offset topic? The answer is Compaction. Many Chinese documents translate it as compression, but I personally have some reservations about it. In English, the specific term for compression is Compression, and its principle is very different from Compaction. I prefer to translate it as compaction or simply use the term used in JVM garbage collection: compaction.

Regardless of the translation, Kafka uses the Compact strategy to delete expired messages in the offset topic, avoiding unlimited expansion of the topic. So how do we define expiration in the Compact strategy? For two messages M1 and M2 with the same key, if the sending time of M1 is earlier than M2, then M1 is an expired message. The Compact process is to scan all messages in the log, remove those expired messages, and then rearrange the remaining messages together. Here is a figure from the official website to illustrate the Compact process.

In the picture, the keys of the messages with offsets 0, 2, and 3 are all K1. After compaction, the partition only needs to keep the message with offset 3 because it was the latest one sent.

Kafka provides dedicated background threads that periodically inspect topics waiting for compaction, to see if there is eligible data to be deleted. This background thread is called Log Cleaner. In many actual production environments, there have been issues of the offset topic expanding indefinitely and occupying too much disk space. If you have this problem in your environment, I suggest you check the status of the Log Cleaner thread, as it is usually caused by the thread being hung.

Summary #

To sum up, today I shared with you the mysterious offset topic __consumer_offsets in Kafka. This includes the reasons and motivations for its introduction, its purpose, message format, timing of writes, and management strategies. This is very helpful in understanding how Kafka, especially Kafka Consumer, manages offsets. In fact, the practice of storing many types of metadata as messages in internal Kafka topics is becoming increasingly popular. In addition to consumer offset management, Kafka transactions also utilize this method, although that is another internal topic.

The idea in the community is simple: since Kafka naturally provides high durability and high throughput, any sub-service with these requirements does not need to rely on external systems and can be implemented using Kafka itself.

Open Discussion #

Today, we talked about many benefits of the displacement theme. Please consider the possible disadvantages compared to the ZooKeeper solution.

Feel free to write down your thoughts and answers, and let’s discuss together. If you feel that you have gained something from this, you’re welcome to share the article with your friends.