28 How Much Do You Know About Topic Management

28 How Much Do You Know About Topic Management #

Hello, I am Hu Xi. Today, I would like to discuss topic management in Kafka with you, including everyday topic management, management and operation of special topics, as well as common topic error handling.

Topic Management in Daily Operations #

The so-called daily management mainly involves the addition, deletion, modification, and query of topics. You may think, what is there to discuss? Aren’t there commands on the official website? This part of the content is indeed relatively simple, but it forms the basis for discussing the later content. Moreover, during the discussion, I will also share some tips with you. In addition, the management methods we discuss today are all based on the built-in commands of Kafka. In fact, in the later part of this column, we will specifically discuss how to use the Java API to operate the Kafka cluster.

Let’s start by learning how to create a Kafka topic using commands. Kafka provides a built-in kafka-topics.sh script to help users create topics. This script is located in the bin subdirectory of the Kafka installation directory. If you are using Kafka on Windows, then this script is located in the windows subdirectory of the bin path. A typical create command is as follows:

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1

The create option indicates that we want to create a topic, and the partitions and replication factor options specify the number of partitions for the topic and the number of replicas per partition, respectively. If you have used this command before, you may find it strange: wasn’t it specified with the --zookeeper parameter? Why is it now --bootstrap-server? Let me explain: starting from Kafka version 2.2, it is recommended by the community to use the --bootstrap-server parameter instead of the --zookeeper parameter, and explicitly mark the latter as “deprecated”. Therefore, if you are already using version 2.2, please specify the --bootstrap-server parameter when creating topics.

There are two main reasons why using --bootstrap-server is recommended instead of --zookeeper.

  1. Using --zookeeper bypasses Kafka’s security system. This means that even if you have set up security authentication for the Kafka cluster and restricted topic creation, if you use the --zookeeper command, you can still successfully create any topic without being constrained by the authentication system. This is obviously not what the Kafka cluster operators want to see.
  2. Using --bootstrap-server to interact with the cluster is becoming the standard way to use Kafka. In other words, there will be fewer and fewer commands and APIs that need to connect to ZooKeeper in the future. In this way, we only need one set of connection information to interact with Kafka in all aspects, without having to maintain both ZooKeeper and Broker connection information as before.

After creating a topic, Kafka allows us to query topics using the same script. You can use the following command to list all topics:

bin/kafka-topics.sh --bootstrap-server broker_host:port --list

If you want to query detailed information of a specific topic, you can use the following command:

bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

If the describe command does not specify a specific topic name, Kafka will return detailed information of all “visible” topics to you by default.

Here, “visible” means the Kafka topics that the user initiating this command can see. This is the same as the difference between using --zookeeper and --bootstrap-server when creating topics. If you specify --bootstrap-server, then this command will be constrained by the security authentication system, that is, it will authenticate the command initiator and then return the topics they can see. Otherwise, if you specify the --zookeeper parameter, it will return all topic details in the cluster by default. For these reasons, I recommend that you use the --bootstrap-server connection parameter consistently.

After discussing topic “creation” and “querying”, let’s talk about “modification”. There are five places in Kafka that involve topic changes.

1. Modifying the topic partitions.

It is actually adding partitions. Currently, Kafka does not allow reducing the number of partitions for a topic. You can use the kafka-topics script combined with the --alter option to increase the number of partitions for a specific topic. The command is as follows:

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <new_partition_number>

Note that the number of partitions you specify must be greater than the original number of partitions, otherwise Kafka will throw an InvalidPartitionsException error. 2. Modify topic-level parameters.

After creating a topic, we can use the kafka-configs script to modify the corresponding parameters.

We discussed this usage in the column Lesson 8, let’s review it now. Suppose we want to set the topic-level parameter max.message.bytes, the command is as follows:

        bin/kafka-configs.sh –zookeeper zookeeper_host:port –entity-type topics –entity-name <topic_name> –alter –add-config max.message.bytes=10485760

You may wonder why this script specifies –zookeeper instead of –bootstrap-server? In fact, this script can also specify the –bootstrap-server parameter, but it is used to set dynamic parameters. I will explain what dynamic parameters are and what they are later in the column. Now, you only need to know that to set regular topic-level parameters, use –zookeeper.

3. Change the number of replicas.

Use the built-in kafka-reassign-partitions script to increase the number of replicas for a topic. Here I will leave a suspense. Later I will demonstrate how to increase the number of replicas for the Kafka internal topic __consumer_offsets.

4. Modify topic throttling.

Here we mainly refer to setting the bandwidth used by the Leader replicas and Follower replicas. Sometimes, we want to limit the amount of bandwidth consumed by the replicas of a certain topic during the replication process. Kafka provides such functionality. Let me give an example. Suppose I have a topic named “test”, and I want the Leader replicas and Follower replicas of each partition of this topic to not consume more than 100MBps of bandwidth during replication. Note that it is uppercase B, i.e., not exceeding 100MB per second. In that case, how should we set it?

To achieve this goal, we must first set the broker-side parameters leader.replication.throttled.rate and follower.replication.throttled.rate, the command is as follows:

        bin/kafka-configs.sh –zookeeper zookeeper_host:port –alter –add-config ’leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600’ –entity-type brokers –entity-name 0

The –entity-name at the end of this command is the Broker ID. If the replicas of this topic are on multiple brokers 0, 1, 2, and 3, then you need to execute this command sequentially for Broker 1, 2, and 3.

After setting this parameter, we also need to set the replicas that need to be throttled for this topic. In this example, we want to set throttling for all replicas, so we use a wildcard * to indicate, the command is as follows:

        bin/kafka-configs.sh –zookeeper zookeeper_host:port –alter –add-config ’leader.replication.throttled.replicas=,follower.replication.throttled.replicas=’ –entity-type topics –entity-name test

5. Topic partition migration.

Similarly, using the kafka-reassign-partitions script, we can adjust the replicas of each partition of the topic in a surgical-like manner, such as migrating some partitions in batches to other brokers. This type of change is more complicated, and I will specifically share with you how to migrate topic partitions in a later column.

Finally, let’s talk about how to delete a topic. The command is very simple, I will share it directly.

        bin/kafka-topics.sh –bootstrap-server broker_host:port –delete –topic <topic_name>

The command to delete a topic is not complicated, but the key is that the delete operation is asynchronous. After executing this command, it does not mean that the topic is immediately deleted. It is only marked as “deleted”. Kafka silently starts the topic deletion operation in the background. Therefore, in most cases, you need to wait patiently for a while.

Management and Operation of Special Topics #

After discussing regular topic management operations, let’s talk about Kafka’s internal topics, consumer_offsets and transaction_state. You may be familiar with the former, while the latter is a new feature introduced for transaction support in Kafka. If you see many subdirectories with the prefixes consumer_offsets and transaction_state in your production environment, don’t panic, as this is normal. Both of these internal topics have a default of 50 partitions, so there will be a large number of partition subdirectories.

Regarding these two internal topics, my suggestion is not to manually create or modify them; let Kafka handle their creation automatically. However, there is a subtle issue regarding the replication factor of the __consumer_offsets topic.

Prior to Kafka 0.11, when Kafka automatically created this topic, it would consider both the number of running brokers and the offsets.topic.replication.factor broker-side parameter, and set the replication factor of the topic as the smaller value between the two. This contradicted the original intention of users setting the offsets.topic.replication.factor value. This is what many users found confusing: if I have 100 brokers in my cluster and offsets.topic.replication.factor is set to 3, why does my __consumer_offsets topic only have 1 replica? In reality, this topic was created when only one broker was started.

In versions after 0.11, this issue was fixed by the community. In other words, starting from 0.11, Kafka strictly adheres to the offsets.topic.replication.factor value. If the number of running brokers is less than the offsets.topic.replication.factor, Kafka will fail to create the topic and explicitly throw an exception.

Now, what if the replication factor of this topic is already 1, can we increase it to 3? Absolutely. Let’s take a look at the specific steps.

The first step is to create a JSON file that explicitly provides the replication factors for the 50 partitions. Note that the order of the 3 brokers in the “replicas” field is different, in order to evenly distribute the leader replicas across the brokers. Here is the exact format of the file:

{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`

The second step is to execute the kafka-reassign-partitions script, with the following command:

bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

In addition to modifying internal topics, we may also want to view the messages in these internal topics. Especially for the __consumer_offsets topic, since it stores the consumer group’s offset data, directly viewing the messages in this topic can be very convenient. The following command can help us directly view the offset data committed by consumer groups:

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

In addition to viewing the offset commit data, we can also directly read the messages in this topic to view the status information of consumer groups:

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

The same method can be applied to the __transaction_state internal topic; you just need to specify kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter.

Common Theme Error Handling #

Finally, let’s talk about common errors related to themes and their respective handling methods.

Common Error 1: Failed to delete theme.

After running the delete command above, many people find that the partition data of the deleted theme is still “lying” on the hard disk and has not been cleared. What should you do in this case?

In fact, there are many reasons why theme deletion fails, but the most common reasons are: the broker where the replicas are located crashes, or some partitions of the theme to be deleted are still in the migration process.

If it is the former, usually the delete operation can be automatically restored after you restart the corresponding broker. If it is the latter, then it is troublesome, as the two operations may interfere with each other.

Regardless of the reason, once you encounter a problem where the theme cannot be deleted, you can follow these steps:

Step 1: Manually delete the znode with the name of the theme to be deleted under the ZooKeeper node /admin/delete_topics.

Step 2: Manually delete the partition directory of the theme on the disk.

Step 3: Execute rmr /controller in ZooKeeper to trigger Controller re-election and refresh the Controller cache.

When performing the last step, you must be cautious, as it may cause a large-scale partition leader re-election. In fact, only performing the first two steps is also possible, but the deleted theme will not be cleared from the Controller cache, which does not affect normal use.

Common Error 2: __consumer_offsets consumes too much disk space.

Once you find that this theme is consuming too much disk space, you must explicitly use the jstack command to check the thread status with the prefix kafka-log-cleaner-thread. In most cases, this is because the thread has crashed and cannot clean up this internal theme in time. If this is indeed the cause, then we can only restart the corresponding broker. Also, please keep the error logs, as this is usually caused by bugs, and it is best to submit them to the community for further investigation.

Summary #

Let’s summarize. Today we focused on the topic management of Kafka, including daily operational tasks and how to manage internal topics of Kafka. Finally, I provided solutions for two common problems. There are many commands involved, so I hope you can try them out in your own environment. Additionally, I encourage you to explore other uses of these commands, as it will greatly enrich your Kafka toolset.

Open Discussion #

Please take a moment to consider why Kafka does not allow a decrease in the number of partitions. What kind of problems might occur if the number of partitions is reduced?

Feel free to write down your thoughts and answers, and let’s discuss together. If you find it helpful, please also share this article with your friends.