10 Producers Compression Algorithms Overview

10 Producers Compression Algorithms Overview #

Hello, I’m Hu Xi. Today, I want to share with you the topic of producer compression algorithms.

When it comes to compression, I believe it is not unfamiliar to you. It adheres to the classical trade-off idea of trading time for space. Specifically, it uses CPU time to exchange for disk space or network I/O transmission, hoping to achieve less disk usage or less network I/O transmission with smaller CPU overhead. In Kafka, compression is also used for this purpose. Today, I will share with you some things about compression in Kafka.

How to Compress? #

How does Kafka compress messages? To understand this question, we need to start with Kafka’s message format. Currently, Kafka has two major message formats, commonly referred to as V1 and V2 versions. The V2 version was officially introduced in Kafka 0.11.0.0.

Regardless of the version, Kafka’s message hierarchy consists of two layers: message sets and messages. A message set contains multiple log entries, and log entries are where the messages are actually encapsulated. Kafka’s underlying message log is composed of a series of message set log entries. Kafka typically operates at the level of message sets rather than individual messages.

So what was the purpose of introducing the V2 version? The V2 version mainly addresses some shortcomings of the V1 version. What are some of the fixes related to the topic we are discussing today? Let me introduce one, which is extracting the common portion of the messages and putting them in the outer message set. This way, we don’t need to save this information for each individual message.

Let me give an example. In the original V1 version, each message needed to undergo CRC (Cyclic Redundancy Check) validation. However, there are situations where the CRC value of the message can change. For example, the message timestamp field may be updated on the broker, which would result in a corresponding update of the CRC value. Additionally, performing message format conversions on the broker (mainly for compatibility with older version client programs) can also result in changes to the CRC value. Given these cases, performing CRC validation on each individual message becomes unnecessary, wasting both storage space and CPU time. Therefore, in the V2 version, the CRC validation is performed at the message set level.

The V2 version also includes an improvement directly related to compression. The method of storing compressed messages has changed. In the previous V1 version, multiple messages were compressed and stored in the message body field of the outer message. In contrast, the V2 version compresses the entire message set. Clearly, the latter approach should achieve better compression efficiency than the former.

I conducted a simple test on the two versions separately, and the results showed that, under the same conditions, the V2 version consistently saves more disk space than the V1 version, whether compression is enabled or not. When compression is enabled, the space savings are more apparent, as shown in the two figures below:

When to compress? #

In Kafka, compression can occur in two places: the producer side and the broker side.

Configuring the compression.type parameter in the producer program enables the specified compression algorithm. For example, the following code shows how to create a producer object with GZIP compression enabled:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Enable GZIP compression
props.put("compression.type", "gzip");

Producer<String, String> producer = new KafkaProducer<>(props);

The key line here is props.put("compression.type", "gzip"), which indicates that the producer uses GZIP compression. This means that every message set produced by this producer will be compressed using GZIP, thereby saving network bandwidth and disk space on the Kafka broker.

Enabling compression on the producer side is a natural idea, but why did I mention the possibility of compression on the broker side? In fact, in most cases, the broker simply saves the messages received from the producer without modifying them. However, there are certain conditions for this “most cases” to be met. There are two exceptions that may cause the broker to re-compress the messages.

Scenario 1: The broker specifies a different compression algorithm than the producer.

Let’s consider the following conversation:

Producer: “I want to use GZIP compression.”

Broker: “Sorry, I can only use Snappy compression for the messages I receive.”

As you can see, in this scenario, after receiving the messages compressed with GZIP, the broker can only decompress them and then compress them again using Snappy. If you look at the Kafka documentation, you’ll find that the broker also has a parameter called compression.type, similar to the one in the previous example. However, the default value for this parameter is producer, which means that the broker will “respect” the compression algorithm used by the producer. But if you set a different value for compression.type on the broker side, you need to be careful because unexpected compression/decompression operations may occur, usually resulting in a significant increase in CPU usage on the broker.

Scenario 2: Message format conversion occurs on the broker side.

Message format conversion is mainly done to be compatible with older versions of consumer programs. Remember the V1 and V2 versions mentioned earlier? In a production environment, it is very common to have multiple versions of message formats stored in the Kafka cluster. To be compatible with older formats, the broker will perform a conversion from the new version to the old version. This process involves decompressing and recompressing the messages. In general, this message format conversion has a significant impact on performance. In addition to compression, it also causes Kafka to lose its proud Zero Copy feature.

The term “Zero Copy” refers to the avoidance of expensive data copying in kernel space when data is transferred between disks and the network, resulting in fast data transfers. Therefore, if Kafka cannot enjoy this feature, performance will inevitably be affected. So it is advisable to ensure a unified message format, not only to avoid unnecessary decompression/recompression but also to improve performance in other areas. If you are interested, you can delve into the principles of Zero Copy.

When to Decompress? #

Where there is compression, there must be decompression! Generally speaking, decompression occurs in consumer programs. That means when the producer sends compressed messages to the broker, the broker receives and saves them as they are. When the consumer program requests these messages, the broker sends them as they are. When the messages reach the consumer, the consumer decompresses them to restore them to their original state.

Now the question is, how does the consumer know which compression algorithm was used for these messages? The answer is actually in the messages themselves. Kafka encapsulates the enabled compression algorithm into the message set. So when the consumer reads the message set, it naturally knows which compression algorithm was used. If we summarize compression and decompression in one sentence, I hope you remember this: compression at the producer, preservation at the broker, decompression at the consumer.

In addition to decompression at the consumer, there is also decompression at the broker. Note that this is different from the decompression that occurs during message format conversion. Each compressed message set must be decompressed when written to the broker to perform various validations on the messages. We must acknowledge that this type of decompression has a certain impact on the performance of the broker, especially in terms of CPU usage.

In fact, recently some colleagues from JD.com in China have just proposed a bugfix to the community, suggesting removing the decompression introduced for message verification. According to them, after decompression was removed, the CPU usage of the broker decreased by at least 50%. However, it is regrettable that the community has not yet adopted this suggestion because message verification is very important and cannot be blindly discarded. After all, it is most important to do things right first, and then consider doing them well and quickly. Regarding this use case, you can also consider whether there is a win-win solution that can avoid message decompression while still performing message verification.

Comparison of various compression algorithms #

Now let’s talk about compression algorithms. This is a big deal! After discussing so much before, we still need to compare the advantages and disadvantages of various compression algorithms so that we can configure a compression strategy that suits our business needs.

Before Kafka version 2.1.0, Kafka supported three compression algorithms: GZIP, Snappy, and LZ4. Starting from version 2.1.0, Kafka officially supports the Zstandard algorithm (abbreviated as zstd). It is an open-source compression algorithm developed by Facebook, which can provide extremely high compression ratios.

By the way, there are two important indicators for evaluating the advantages and disadvantages of a compression algorithm: one is compression ratio. If something that originally occupied 100 units of space now only occupies 20 units of space after compression, then the compression ratio is 5. Obviously, the higher the compression ratio, the better. The other indicator is compression/decompression throughput, which refers to how many MB of data can be compressed or decompressed per second. Similarly, a higher throughput is better.

The following table is a compression algorithm benchmark comparison provided by the Facebook Zstandard official website:

From the table, we can see that the zstd algorithm has the highest compression ratio, while its throughput is only average. On the other hand, the LZ4 algorithm is undoubtedly the leader in terms of throughput. Of course, I don’t make too much interpretation about the authority of the data in the table, I just want to use it to illustrate the general performance of various compression algorithms currently available.

In practical use, GZIP, Snappy, LZ4, and even zstd all have their own merits. However, for Kafka, their performance test results are surprisingly consistent in terms of throughput: LZ4 > Snappy > zstd and GZIP; and in terms of compression ratio: zstd > LZ4 > GZIP > Snappy. As for physical resources, Snappy algorithm consumes the most network bandwidth, while zstd consumes the least, which is reasonable considering that zstd is designed to provide extremely high compression ratios. In terms of CPU usage, all the algorithms perform similarly, except that Snappy algorithm uses more CPU during compression, while GZIP algorithm may use more CPU during decompression.

Best Practices #

With an understanding of these algorithm comparisons, we can enable the appropriate compression algorithm based on our specific situation.

First, let’s talk about compression. When is it appropriate to enable compression?

Now that you know about compression completed by the Producer, one condition for enabling compression is that the CPU resources on the machine running the Producer program must be sufficient. If the CPU on the Producer machine is already fully utilized, enabling message compression would only make things worse and have the opposite effect.

In addition to having sufficient CPU resources, if the bandwidth resources in your environment are limited, I also recommend enabling compression. In fact, I have seen many Kafka production environments where the bandwidth is completely used up. These days, bandwidth is a scarce resource even more precious than CPU and memory, as 10-gigabit networks are not a standard for ordinary companies. Therefore, it is particularly easy for Kafka cluster bandwidth resources to be exhausted in a 1-gigabit network. If there is plenty of unused CPU resources on your client machines, I highly recommend enabling zstd compression, as this can greatly save network resource consumption.

Next, let’s talk about decompression. There isn’t much to say, really. Once compression is enabled, decompression is inevitable. I just want to emphasize one point here: we cannot do anything about unavoidable decompression, but we can at least avoid unexpected decompression. As I mentioned earlier, the introduction of decompression operations for compatibility with older versions falls into this category. If possible, try to ensure that message format conversion does not occur.

Summary #

To summarize the content shared today: we mainly discussed various aspects of Kafka compression, including how Kafka compresses messages, when compression and decompression happen, and compared several compression algorithms supported by Kafka. Finally, I provided the best engineering practices. I have only one purpose in sharing so much content: I hope that you can choose the appropriate Kafka compression algorithm according to your actual situation to achieve maximum resource utilization.

Image

Open Discussion #

Finally, I would like to pose a homework question for you to ponder: Earlier, we mentioned that the Broker needs to decompress the message collection and then perform individual message verification. Someone proposed a solution: move the message verification to the Producer side, and the Broker can directly read the verification results. This way, the decompression operation can be avoided on the Broker side. Do you agree with this approach?

Feel free to write down your thoughts and answers. Let’s discuss together. If you find it insightful, you are also welcome to share this article with your friends.