38 Have You Optimized Kafka

38 Have You Optimized Kafka #

Hello, I’m Hu Xi. Today, I want to share with you the topic of how to optimize Kafka.

Optimization Goals #

Before tuning, we must clarify the goals of optimizing Kafka. Generally speaking, tuning is done to meet the common non-functional requirements of a system. Among the numerous non-functional requirements, performance is always our top concern. Different systems have different expectations for performance. For example, for database users, performance means the response time of requests. Users always hope that query or update requests can be processed and returned faster.

For Kafka, performance generally refers to throughput and latency.

Throughput, also known as TPS (transactions per second), refers to the number of bytes or messages that the Broker-side process or Client-side application can process per second. Naturally, a higher value is better.

Latency, similar to the response time mentioned earlier, represents the time interval from sending a message from the Producer side to the persistence completion on the Broker side. This metric can also represent end-to-end latency, that is, the total time from the Producer sending a message to the successful consumption of that message by the Consumer. In contrast to TPS, we typically want latency to be as short as possible.

In summary, high throughput and low latency are the main goals of tuning a Kafka cluster. We will discuss in detail how to achieve these goals later. Before that, I would like to talk about the issue of optimizing the funnel.

Funnel Optimization #

Funnel optimization is a layered funnel in the tuning process, where we can perform corresponding optimization adjustments at each layer. Generally speaking, the higher the level, the more obvious the optimization effect, and the overall optimization effect decreases from top to bottom, as shown in the figure below:

Layer 1: Application Layer. This refers to optimizing Kafka client application code. For example, using reasonable data structures, caching computationally expensive computation results, or reusing object instances with high construction costs. The optimization effect at this layer is the most significant and usually the simplest.

Layer 2: Framework Layer. This refers to properly setting various parameters of the Kafka cluster. After all, it is not easy to directly modify Kafka source code for optimization, but it is relatively easy to configure the values of key parameters appropriately according to the actual scenario.

Layer 3: JVM Layer. The Kafka broker process is a regular JVM process, and various optimizations for the JVM also apply here. Although the optimization effect at this layer is not as good as the first two layers, it can sometimes bring significant improvements.

Layer 4: Operating System Layer. Optimizing the operating system layer is important but often does not yield as good results as expected. There is a significant gap compared to the optimization effect at the application layer.

Basic Optimization #

Next, I will introduce the optimization of the 4-layered funnel separately.

Operating System Optimization #

Let’s start with optimizing at the operating system layer. At the operating system level, it is best to disable atime updates when mounting the file system. Atime stands for access time, which records the last time a file was accessed. Recording atime requires the operating system to access inode resources, and disabling atime can avoid the write operations of inode access time, reducing the number of write operations on the file system. You can execute the command mount -o noatime to set it.

As for the file system, I suggest choosing at least ext4 or XFS. Especially XFS file system, it has features such as high performance, high scalability, etc., which are particularly suitable for production servers. It is worth mentioning that at the Kafka San Francisco Summit last October, someone shared a case of using ZFS with Kafka. We mentioned this in Lesson 8 of the column and also mentioned the related data report. The report claimed that the mechanism of ZFS multi-level cache can help Kafka improve I/O performance and reportedly achieved good results. If your environment has installed the ZFS file system, you can try to build Kafka on the ZFS file system.

Another thing is the setting of swap space. I personally suggest setting swappiness to a small value, such as between 1 and 10, to prevent Linux’s OOM Killer from killing processes indiscriminately. You can execute sudo sysctl vm.swappiness=N to temporarily set this value. If you want it to take effect permanently, you can modify the /etc/sysctl.conf file and add vm.swappiness=N. Then, restart the machine.

There are also two important parameters at the operating system level, which are ulimit -n and vm.max_map_count. If the former is set too small, you will encounter errors like Too Many File Open, and if the latter is too small, on a Broker machine with a large number of topics, you will encounter severe errors like OutOfMemoryError: Map failed. Therefore, I suggest increasing this value appropriately in the production environment, such as setting it to 655360. The specific setting method is to modify the /etc/sysctl.conf file and add vm.max_map_count=655360. After saving, execute the sysctl -p command to make it take effect.

Finally, I have to mention the size of the operating system page cache, which is crucial for Kafka. To some extent, we can say that the larger the page cache reserved for Kafka, the better. The minimum value should be able to accommodate the size of a log segment, which is the value of the Broker parameter log.segment.bytes. The default value of this parameter is 1GB. Reserving the size of a log segment can ensure that Kafka can put the entire log segment into the page cache, so that the consumer program can directly hit the page cache when consuming, thereby avoiding expensive physical disk I/O operations.

JVM Layer Optimization #

After talking about optimizing at the operating system level, let’s discuss the optimization at the JVM layer. In fact, when optimizing at the JVM layer, we still need to focus on heap settings and GC performance.

  1. Setting heap size.

How to set the heap size for the Broker is a confusing question for many people. Let me give a simple answer: Set your JVM heap size to 6-8GB.

In the actual environments of many companies, this size has been proven to be very suitable, so you can use it with confidence. If you want to adjust it accurately, I suggest you check the GC log, especially pay attention to the total size of live objects on the heap after Full GC, and then set the heap size to 1.5-2 times that value. If you find that Full GC has not been executed, you can manually run jmap -histo:live to trigger Full GC artificially. 2. Selection of GC collectors.

I strongly recommend using the G1 collector, primarily because it is convenient and much easier to optimize than the CMS collector. In addition, you should make every effort to avoid Full GC. In fact, regardless of which collector you use, you should strive to avoid Full GC. In G1, Full GC runs single-threaded, making it really slow. If you encounter frequent Full GC in your Kafka environment, you can configure the JVM parameter -XX:+PrintAdaptiveSizePolicy to investigate what is causing the Full GC.

One common issue when using G1 is dealing with large objects, which can lead to errors like “too many humongous allocations” during garbage collection. Large objects generally refer to objects that occupy at least half of a region’s size. For example, if your region size is 2MB, objects larger than 1MB are considered large objects. To address this issue, aside from increasing the heap size, you can also increase the region size by setting the JVM startup parameter -XX:+G1HeapRegionSize=N. By default, if an object exceeds N/2, it is considered a large object and is directly allocated in the large object area. If the messages in your Kafka environment have particularly large message bodies, you may encounter this issue of large object allocation.

Tuning on the Broker side #

Continuing up the funnel, let’s take a look at tuning on the Broker side.

A crucial aspect of tuning on the Broker side is to appropriately set the parameter values to match your production environment. However, we will discuss this part in detail when we talk about specific tuning goals later. Here, I want to address another optimization technique, which is to strive to keep the client and Broker versions consistent. Do not underestimate the issue of inconsistency between versions, as it can cause Kafka to lose many performance benefits, such as Zero Copy. I will use a diagram to illustrate this below.

In the diagram, the blue-colored versions of Producer, Consumer, and Broker are the same, allowing them to communicate through the fast channel of Zero Copy. On the other hand, a lower version of the Consumer program needs to rely on JVM heap to transfer data when interacting with the Producer and Broker, resulting in the loss of the fast channel, and it has to use a slower channel. Therefore, when optimizing at the Broker level, simply maintaining consistency between the server and client versions can bring many performance benefits.

Application-level tuning #

Now, we finally reach the top of the funnel. Actually, the methods for optimizing at this level vary, as each application is different. However, there are some common rules that are still worth following.

  • Avoid creating Producer and Consumer objects frequently. Constructing these objects is expensive, so try to reuse them whenever possible.
  • Close them promptly after use. These objects create many physical resources in the background, such as socket connections and ByteBuffer buffers. Failure to close them in a timely manner will inevitably result in resource leaks.
  • Make optimal use of multi-threading to improve performance. Kafka’s Java Producer is thread-safe, so you can safely share the same instance across multiple threads. Although Java Consumer is not thread-safe, we discussed multi-threading solutions in the 20th article of this column, so you can review it.

Performance Tuning #

Next, I will provide the parameter configurations and specific reasons for tuning various performance targets, hoping that they can help you adjust your Kafka cluster more effectively.

Tuning Throughput #

First, let’s talk about tuning throughput. Many people seem to have some misunderstandings about the relationship between throughput and latency. For example, there is a popular saying: suppose it takes 2ms for Kafka to send a message, then the latency is 2ms. Obviously, the throughput should be 500 messages/second, because 1 second can send 1 / 0.002 = 500 messages. Therefore, the relationship between throughput and latency can be expressed by the formula: TPS = 1000 / Latency(ms). However, the relationship between throughput and latency is far from that simple in reality.

Let’s take Kafka Producer as an example. Suppose it takes 2ms to send a message. If it only sends one message each time, then the TPS is naturally 500 messages/second. But if the Producer does not send one message each time, but waits for a period of time before sending a batch of messages, for example, the Producer waits 8ms before each send, and after 8ms, the Producer accumulates a total of 1000 messages. At this time, the total latency accumulates to 10ms (i.e., 2ms + 8ms), and the TPS equals 1000 / 0.01 = 100,000 messages/second. As you can see, although the latency has increased by 4 times, the TPS has increased by almost 200 times. This is actually why batching or micro-batching is becoming popular.

In practice, users often seem willing to pay the cost of a slight increase in latency in exchange for a significant increase in TPS. After all, an increase in latency from 2ms to 10ms is usually acceptable. In fact, Kafka Producer adopts this design idea.

Of course, you might ask: if it takes 2ms to send a message, can waiting for 8ms accumulate 1000 messages? The answer is yes! When the Producer accumulates messages, it usually only sends the messages to the buffer in memory, while sending messages involves network I/O transmission. The time scale of memory operations and I/O operations is different, with the former usually in the order of several hundred nanoseconds and the latter ranging from milliseconds to seconds. Therefore, the number of messages accumulated by the Producer during the 8ms wait may be much greater than the number of messages the Producer can send in the same amount of time.

Okay, after saying so much, how do we tune the TPS? Let me share a parameter list with you.

Let me explain the contents of the table briefly.

The broker-side parameter “num.replica.fetchers” indicates how many threads the Follower replicas use to fetch messages. The default is 1 thread. If your broker-side CPU resources are abundant, you may consider increasing this parameter to speed up the synchronization of Follower replicas. Because in the actual production environment, the primary factor that limits the throughput of the Producer program configured with acks=all is the replica synchronization performance. Increasing this value can usually increase the throughput of the Producer-side program.

Another thing to note is to avoid frequent Full GC. Currently, whether it is the CMS collector or the G1 collector, Full GC adopts the Stop The World single-threaded collection strategy, which is very slow, so it must be avoided.

On the Producer side, if you want to improve throughput, the usual practice is to increase the batch size of messages and the batch cache time, that is, batch.size and linger.ms. Currently, their default values are small, especially the default 16KB batch size, which is generally not suitable for production environments. Assuming your message size is 1KB, the default batch size of about 16 messages is obviously too small. We still hope that the Producer can send more messages at one time.

Besides these two, it is best to also configure the compression algorithm to reduce network I/O transmission and indirectly improve throughput. Currently, the two compression algorithms that work best with Kafka are LZ4 and zstd, you can give them a try.

At the same time, since our optimization goal is throughput, it is best not to set acks=all and enable retries. The replica synchronization time introduced by the former is usually the bottleneck of throughput, and the latter also reduces the throughput of the Producer application during execution.

Finally, if you share a Producer instance among multiple threads, you may encounter a situation where the buffer is not enough. If you frequently encounter exceptions like TimeoutException: Failed to allocate memory within the configured max blocking time, then you must explicitly increase the value of the buffer.memory parameter to ensure that there is always space available in the buffer.

After discussing the Producer-side, let’s talk about the Consumer-side. There are limited methods to improve the throughput on the Consumer-side. You can increase the overall throughput by using a multi-threading approach or by increasing the value of the fetch.min.bytes parameter. The default is 1 byte, which means that as long as the Kafka Broker-side accumulates 1 byte of data, it can return it to the Consumer-side. This is too small. Let’s have the Broker-side return more data at one time.

Tuning Latency #

After discussing tuning throughput, let’s talk about how to optimize latency. Below is a parameter list for tuning latency.

On the Broker-side, we still need to increase the value of num.replica.fetchers to speed up the pulling speed of Follower replicas and reduce the overall message processing latency.

On the Producer-side, we want the messages to be sent out as soon as possible, so there should not be too much delay, therefore, the linger.ms should be set to 0, and compression should not be enabled. Because the compression operation itself consumes CPU time and increases message sending latency. Also, it is best not to set acks=all. Just as we mentioned earlier, the synchronization of Follower replicas is often the primary reason for reducing the throughput and increasing the latency of the Producer-side.

On the Consumer-side, we keep fetch.min.bytes=1, which means that as long as the Broker-side has data that can be returned, it should be returned to the Consumer immediately, reducing the consumer consumption latency.

Conclusion #

Alright, let’s summarize. Today, I shared with you some content about Kafka optimization. We started with the optimization goals, then I provided a funnel of optimization levels. Next, I discussed some foundational optimizations, including operating system level optimization, JVM level optimization, and application program optimization, among others. Finally, for the two performance metrics that Kafka is concerned with, throughput and latency, I gave some best practices for parameter value settings from three dimensions: Broker, Producer, and Consumer.

Lastly, I would like to share a real case study of performance optimization.

Once, I encountered a problem in a live environment: the Consumer program in that cluster was performing well, but one day, its performance suddenly dropped, with a significant decrease in throughput. When I checked the disk read I/O usage, I found a significant increase, but previously the Consumer Lag was very low, so the message reads should have been directly hitting the page cache. At this point, when the disk read suddenly spiked, I suspected that another program had written to the page cache. After further investigation, I discovered that indeed a test Console Consumer program had been started, “polluting” a portion of the page cache, causing the main business Consumer to read messages from the physical disk instead, resulting in decreased throughput. Once I found the real cause, solving the problem became much simpler.

In fact, the real purpose of sharing this case study is to say that for performance optimization, it’s best to narrow down and locate the problem step by step according to the steps provided today. Once the cause is identified, the subsequent optimization becomes much easier.

Open Discussion #

Please share a real case where you optimized Kafka and explain in detail how you encountered performance issues and how you resolved them.

Feel free to share your thoughts and answers, and let’s discuss together. If you find it helpful, please also share this article with your friends.