07 the Most Most Important Cluster Parameter Configuration Part 1

07 The Most Most Important Cluster Parameter Configuration Part 1 #

Hello, I’m Hu Xi. Today, I want to talk to you about the most important Kafka cluster configuration. I have used three “most” in the title not to grab attention, but because the significance of some configurations is not reflected in the official documentation. Moreover, from practical experience, many parameters have a more noticeable impact on the system than what is mentioned in the documentation. Therefore, it is necessary to discuss them in detail.

I hope to explain these important configurations in two parts. Strictly speaking, these configurations are not limited to just the server-side configurations of Kafka. They include broker-side parameters, topic-level parameters (which I will refer to as “Topic” for simplicity), JVM parameters, and operating system-level parameters.

Please note that the term “broker-side parameters” here is also known as static configurations. Later on in this column, I will introduce the corresponding dynamic configurations. Static configurations refer to the parameters that you must set in the Kafka configuration file, server.properties, regardless of whether you are adding, modifying, or deleting them. Additionally, you must restart the broker process for these changes to take effect. On the other hand, topic-level parameters can be modified using the dedicated kafka-configs command provided by Kafka. As for JVM and operating system-level parameters, their configuration methods are relatively standardized, and I will be discussing the standard configuration parameters, so you should find it easy to set them.

Now, let’s begin with the broker-side parameters.

Broker-side parameters #

Currently, Kafka Broker provides nearly 200 parameters, most of which you don’t need to worry about. When it comes to the usage of these parameters, most articles online list some common parameters and then define each of them one by one. In fact, I used to write articles this way as well. However, today I’m going to try a different method and introduce them in groups according to their major use categories. I hope this approach will be more targeted and more convenient for you to remember.

Firstly, the Broker needs to be configured with storage information, which refers to the disks used by the Broker. The important parameters related to storage information are as follows:

  • log.dirs: This is a very important parameter that specifies the file directory paths to be used by the Broker. It’s worth noting that this parameter doesn’t have a default value, which means what? It means that you have to specify it yourself.
  • log.dir: Pay attention to the “dir” here, without an “s” at the end, which means it can only represent a single path. It is used as a supplementary parameter to the previous one.

How should these two parameters be set? It’s simple, you only need to set log.dirs, the first parameter, and don’t set log.dir. More importantly, in the online production environment, you must configure multiple paths for log.dirs, with a specific format of CSV (comma-separated values), such as /home/kafka1,/home/kafka2,/home/kafka3. If possible, it’s best to ensure that these directories are mounted on different physical disks. Doing so has two benefits:

  • Enhances read and write performance: Multiple physical disks reading and writing data simultaneously have higher throughput compared to a single disk.
  • Enables failover: This is a powerful feature introduced in Kafka version 1.1. Previously, if any disk used by the Kafka Broker failed, the entire Broker process would be shut down. However, since version 1.1, this situation has been corrected. The data on the failed disk will be automatically transferred to other healthy disks, and the Broker will continue to work. Remember the discussion we had in the previous article about whether Kafka needs to use RAID? This improvement is the basis for abandoning the RAID scheme: without this failover, we can only rely on RAID for protection.

Next, let’s talk about the settings related to ZooKeeper. Firstly, what is ZooKeeper? It is a distributed coordination framework responsible for coordinating, managing, and saving all metadata information of the Kafka cluster, such as which Brokers are running in the cluster, which Topics have been created, the number of partitions for each Topic, and the machines on which these partition leader replicas reside.

The most important parameter related to Kafka and ZooKeeper is zookeeper.connect. This is also a CSV parameter. For example, you can specify its value as zk1:2181,zk2:2181,zk3:2181. 2181 is the default port for ZooKeeper.

Now the question arises, how should this parameter be set if I want multiple Kafka clusters to use the same ZooKeeper cluster? This is when chroot comes into play. This chroot is a concept in ZooKeeper, similar to an alias.

If you have two sets of Kafka clusters, let’s call them kafka1 and kafka2, then the zookeeper.connect parameter for the two sets of clusters can be specified as zk1:2181,zk2:2181,zk3:2181/kafka1 and zk1:2181,zk2:2181,zk3:2181/kafka2, respectively. Remember that chroot only needs to be written once and added at the end. I often encounter people specifying it like this: zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3, but this format is incorrect.

The third group of parameters is related to Broker connections, i.e., how client programs or other Brokers communicate with this Broker. There are three parameters:

  • listeners: This is the formal name for listeners. In fact, it tells external connections how to access the Kafka service opened by the specified hostname and port number using what protocol.
  • advertised.listeners: Compared to listeners, this one has an additional “advertised”. The meaning of “advertised” is that this group of listeners is published by the Broker for external access.
  • host.name/port: I mentioned these two parameters just to say that you should forget about them altogether, as they are deprecated parameters.

Let’s talk specifically about the concept of listeners. In terms of composition, it is a list of comma-separated triplets, with each triplet in the format <protocol name, hostname, port>. Here, the protocol name can be a standard name, such as PLAINTEXT for plaintext transmission, or SSL for SSL or TLS encrypted transmission, etc. It can also be a protocol name you define yourself, such as CONTROLLER: //localhost:9092.

Once you define your own protocol name, you must also specify the listener.security.protocol.map parameter to indicate which security protocol the underlying layer of this protocol uses. For example, specifying listener.security.protocol.map=CONTROLLER:PLAINTEXT means that the underlying layer of this custom protocol uses plaintext and unencrypted transmission.

As for the hostname and port number in the triplets, they are relatively straightforward and don’t need much explanation. However, there’s one thing you should pay attention to. People often ask whether to use an IP address or a hostname in this hostname setting. Here’s my unified suggestion: it’s best to use hostnames for all settings, both on the Broker-side and in the Client-side application configurations. The Broker source code also uses hostnames. If you use IP addresses for connections in some places, you may encounter connection issues.

The fourth group of parameters is about Topic management. Let me explain the following three parameters:

  • auto.create.topics.enable: Whether automatic topic creation is enabled.
  • unclean.leader.election.enable: Whether unclean leader election is enabled.
  • auto.leader.rebalance.enable: Whether periodic leader election is enabled.

Let me go one by one.

I recommend setting the auto.create.topics.enable parameter to false, which means automatic topic creation is not allowed. In our production environment, we have encountered many topics with strange names, which I believe is caused by this parameter being set to true.

You might have experienced a situation where you intended to send events to a topic named “test”, but accidentally misspelled it as “tst” and started the producer program. Congratulations, a topic named “tst” will be automatically created.

Therefore, I believe good operations should prevent such situations, especially for large companies where every department’s assigned topics should be strictly controlled by operations and no self-created topics should be allowed.

The second parameter, unclean.leader.election.enable, is about disabling unclean leader election. What does “unclean” mean? Do you remember that Kafka has multiple replicas for each partition to provide high availability? Only one replica can serve requests externally, which is the leader replica.

Now, the question is: Do all replicas have the qualification to compete for leader election? Obviously not. Only those replicas with more data are eligible to compete. Those replicas with significantly behind progress are not qualified to do so.

Now, imagine this scenario: What if all the replicas with more data are down? Should we still perform leader election? This is where this parameter comes into play.

If set to false, we stick to the previous principle and do not allow replicas with significantly behind progress to compete for leader. The consequence is that this partition becomes unavailable because there is no leader. Conversely, if set to true, Kafka allows you to choose a leader from those “slower” replicas. The consequence is that data may be lost because those replicas already have incomplete data, and when they become leader, they believe their data is authoritative.

This parameter is set to false by default in the latest version of Kafka. It may not be necessary for me to mention it explicitly, but it’s funny that the community has changed the default value of this parameter several times. Since I don’t know which version of Kafka you are using, I suggest you explicitly set it to false.

The impact of the third parameter, auto.leader.rebalance.enable, seems to be underestimated, but it is actually of great importance in production environments. Setting it to true means that Kafka periodically re-elects leaders for some topic partitions. However, this re-election is not blind and has certain conditions. Strictly speaking, the biggest difference between this and the previous leader election parameter is that it is not about selecting a leader, but about changing the leader! For example, if Leader A has been performing well, but if auto.leader.rebalance.enable is set to true, it is possible that after some time, Leader A will be forcefully replaced by Leader B.

You should know that changing the leader is very costly. All clients that were originally sending requests to A need to switch to sending requests to B. Moreover, this change of leader does not bring any performance benefits. Therefore, I suggest you set this parameter to false in a production environment.

The last set of parameters is about data retention, let me introduce them separately.

  • log.retention.{hours|minutes|ms}: This is a “trio” that controls how long a message data should be kept. In terms of priority, ms has the highest priority, followed by minutes, and hours has the lowest priority.
  • log.retention.bytes: This specifies the total disk capacity that Broker can use for storing messages.
  • message.max.bytes: This controls the maximum size of messages that Broker can receive.

Let’s start with the “trio”. Although ms has the highest priority, in most cases, we prefer to set a longer duration in hours. For example, log.retention.hours=168 means preserving data for 7 days and automatically deleting data older than 7 days. Many companies use Kafka as a storage system, so this value needs to be adjusted accordingly.

Next is log.retention.bytes. The default value is -1, which means that Broker allows you to store as much data as you want on this machine at least in terms of capacity. This parameter really comes into play in scenarios of building multi-tenant Kafka clusters in the cloud. Imagine you want to create a Kafka service in the cloud, where each tenant can only use 100GB of disk space. To avoid a “malicious” tenant using too much disk space, setting this parameter becomes crucial.

Finally, let’s talk about message.max.bytes. In fact, today I have been talking about important parameters that are not meant to use default values, and this parameter is no exception. The default value of 1000012 is too small, not even 1MB. In real scenarios, messages exceeding 1MB are very common, so it is safer to set a relatively large value in a production environment. After all, it is just a benchmark, only measuring the maximum message size that Broker can handle, and even setting it to a larger value will not consume much disk space.

Summary #

To reiterate, all the parameters I have shared with you today are those that need to be modified from their default values, as these default values are not suitable for general production environments. However, this is not to say that the other 100+ parameters are not important. In fact, in future columns, we will continue to discuss some other parameters, especially those related to performance. Therefore, I hope that all the parameters I mentioned today can serve as best practices for you, helping you plan and adjust your Kafka production environment more effectively.

Open Discussion #

In addition to the parameters I shared today, are there any parameters that you think are important but not mentioned in the document? Have you encountered any “pitfalls” regarding parameter configuration? Feel free to bring them up for discussion with me and everyone else.

You are welcome to share your thoughts or questions, and let’s discuss together. If you feel that you have gained something, feel free to share the article with your friends.