31 Collection of Common Tools and Scripts

31 Collection of Common Tools and Scripts #

Hello, I am Hu Xi. Today I want to share with you the topic: A summary of common Kafka scripts.

Overview of Command Line Scripts #

Kafka provides a number of command line scripts by default, which are used for various functionalities and operational management. Today, I will take version 2.2 as an example to list these command line tools in detail. The following image shows all the command line scripts provided in version 2.2.

Command Line Scripts

From the image, we can see that version 2.2 provides a total of 30 shell scripts. The “windows” in the image is actually a subdirectory that contains BAT batch files for the Windows platform. The other .sh files are standard shell scripts for the Linux platform.

By default, running a shell script without any parameters or with the -help parameter will display the usage instructions for that script. The image below shows the invocation method of the kafka-log-dirs script.

kafka-log-dirs script

With this basic understanding, let me explain the purpose of these scripts one by one, and then provide you with detailed explanations of some common scripts.

First, let’s talk about the connect-standalone and connect-distributed scripts. These two scripts are startup scripts for the Kafka Connect component. In Lesson 4 of the column, when discussing the Kafka ecosystem, I mentioned that the community provides the Kafka Connect component for data transfer between Kafka and external systems. Kafka Connect supports both single-node Standalone mode and multi-node Distributed mode. These two scripts are the startup scripts for these two modes. Since Kafka Connect is not within the scope of our discussion, I won’t go into further detail.

Next is the kafka-acls script. It is used to set Kafka permissions, such as setting permissions for users to access Kafka topics. I will discuss Kafka security settings later in the column, and we can talk about this script in more detail at that time.

Next is the kafka-broker-api-versions script. The main purpose of this script is to verify the compatibility between different versions of Kafka servers and clients. Let me give you an example. The two images below show the compatibility between a 2.2 version server and a 2.2 version client, and between a 2.2 version server and a 1.1.1 version client.

Compatibility between 2.2 server and 2.2 client

Compatibility between 2.2 server and 1.1.1 client

I have extracted some of the output content. Now let me explain the meaning of these outputs. Let’s take the first line as an example:

Produce(0): 0 to 7 [usable: 7]

“Produce” represents the Produce request. When a producer produces messages, it essentially sends Produce requests to the broker. This request is the number one request among all types of Kafka requests, so its sequence number is 0. The “0 to 7” after that represents the 8 versions of the Produce request in Kafka 2.2, with sequence numbers ranging from 0 to 7. “usable: 7” means the version number of the client API connected to this broker that can be used is 7, which is the latest version.

Please pay attention to the differences in the red-lined parts between these two images. In the first image, we use a 2.2 version script to connect to a 2.2 version broker, so the usable number is 7, which means the latest version can be used. In the second image, we use a 1.1 version script to connect to a 2.2 version broker, so the usable number is 5, which means the 1.1 version client API can only send Produce requests with a version number of 5.

If you want to know the compatibility between your client version and server version, it is best to use this command to check. Note that before 0.10.2.0, Kafka was only unidirectionally compatible, meaning that a higher version broker could handle requests from a lower version client, but not vice versa. Since version 0.10.2.0, Kafka officially supports bidirectional compatibility, which means that a lower version broker can also handle requests from a higher version client.

Next is the kafka-configs script. I think you should already be familiar with this script. We mentioned its usage when discussing parameter configuration and dynamic broker parameters. I won’t go into detail here. The following two scripts are heavyweight utility scripts: kafka-console-consumer and kafka-console-producer. To some extent, it is not an exaggeration to say that they are the most commonly used scripts. Let’s skip them for now, and I will focus on them later.

In addition to the producer and consumer, there is another set of scripts: kafka-producer-perf-test and kafka-consumer-perf-test. They are performance testing tools for producers and consumers, which are very practical. I will focus on them later.

The kafka-consumer-groups command, which I briefly touched on when discussing resetting consumer group offsets, will be discussed in more detail later.

The kafka-delegation-tokens script may not be well known, but it is used to manage Delegation Tokens. Delegation Token-based authentication is a lightweight authentication mechanism that complements the existing SASL authentication mechanism.

The kafka-delete-records script is used to delete Kafka partition messages. Considering that Kafka itself has its own automatic message deletion policy, the actual usage of this script is not high.

The kafka-dump-log script is a very practical script. It can view the contents of Kafka message files, including various metadata information of messages, and even the message body itself.

The kafka-log-dirs script is a relatively new script that helps query the disk usage of various log paths on each Broker.

The kafka-mirror-maker script helps you synchronize messages between Kafka clusters. I will discuss its usage separately in a later lecture.

The kafka-preferred-replica-election script is used to perform a Preferred Leader election. It can perform a “Leader Reassignment” operation for a specified topic.

The kafka-reassign-partitions script is used to perform partition replica migration and replica file path migration.

You should be familiar with the kafka-topics script. It implements all topic management operations.

The kafka-run-class script is quite mysterious. You can use this script to run any Kafka class with a main method. In the early stages of Kafka’s development, many utility classes did not have their own dedicated SHELL scripts. For example, the previously mentioned kafka-dump-log can only be indirectly implemented by running kafka-run-class kafka.tools.DumpLogSegments. If you open kafka-dump-log.sh in a text editor, you will find that it actually calls this command. Later, the community gradually added dedicated command-line scripts for these important utility classes, so the usage rate of kafka-run-class script has been greatly reduced. In actual work, you hardly ever encounter scenarios where you need to use this script directly.

You should not be unfamiliar with the kafka-server-start and kafka-server-stop scripts, as they are used to start and stop Kafka Broker processes.

The kafka-streams-application-reset script is used to reset the offset of Kafka Streams applications in order to reprocess data. If you are not using the Kafka Streams component, this script is not useful to you.

The kafka-verifiable-producer and kafka-verifiable-consumer scripts are used to test producer and consumer functionality. They are quite “old” scripts that you hardly ever need. In addition, the previously mentioned Console Producer and Console Consumer can completely replace them.

The remaining scripts starting with zookeeper are used to manage and operate ZooKeeper. I won’t go into too much detail about them here.

Finally, let’s talk about the trogdor script. This is a very mysterious guy that has never appeared on the official website. According to internal community information, it is Kafka’s testing framework used to perform various benchmark tests and load tests. Ordinary Kafka users should not need this script.

Alright, I have gone through all the built-in Kafka scripts. Although the description may seem a bit disorganized, having this basic understanding will allow us to better utilize these scripts. Now, I will provide a detailed introduction to the key script operations.

Key Script Operations #

Producing Messages #

To produce messages, you can use the kafka-console-producer script. A typical command looks like the following:

$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4
>

In this command, we specify the producer parameter acks as -1 and enable the LZ4 compression algorithm. This script allows us to conveniently send messages to the specified topic in Kafka using the console.

Consuming Messages #

Now let’s talk about data consumption. If you want to quickly consume data from a topic to verify the existence of messages, running the kafka-console-consumer script is the most convenient method. The commonly used command usage is as follows:

$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false

Note that in this command, we specify the group information. If not specified, each time the Console Consumer is run, it will generate a new consumer group to consume. Over time, you may find that there are many consumer groups starting with console-consumer in your cluster. In most cases, it is better to include the group.

In addition, --from-beginning is equivalent to setting the auto.offset.reset parameter on the consumer side to earliest, indicating that we want to consume from the beginning of the topic. If not specified, it will default to reading messages from the latest offset. If there are no new messages at that time, the output of this command will be empty.

Finally, I disabled automatic offset commit in the command. In general, it is meaningless to let the Console Consumer commit offsets, as we are only using it for simple testing.

Testing Producer Performance #

If you want to do some simple performance testing on Kafka, you can try the following set of tools. They are used to test the performance of the producer and consumer, respectively.

Let’s start with the script for testing the producer: kafka-producer-perf-test. It has many parameters, but a typical command invocation looks like this:

$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4

2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.

The above command sends 10 million messages to the specified topic, with each message size being 1KB. This command allows you to specify producer parameters to set using producer-props, such as the compression algorithm and the delay time in this example.

The output of this command is worth mentioning. It will print out the throughput (MB/s) of the producer, message sending latency, and latency at various percentiles. In general, message latency is not a simple number, but a distribution. In other words, we should be concerned about the probability distribution of the latency and knowing only the average value is meaningless. This is why percentiles are calculated here. Usually, we only care about the 99th percentile. For example, in the above output, the 99th value is 604ms, which means that 99% of the messages produced by the test producer have a latency within 604ms. You can treat this data as the Service Level Agreement (SLA) promised by this producer.

Testing Consumer Performance #

Testing the consumer follows a similar principle, but we use the kafka-consumer-perf-test script instead. The command looks like this:

$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

Although the output format is different, this script also prints out the consumer’s throughput data. For example, in this example, it is 1723 MB/s. Unfortunately, it does not calculate the distribution at different percentiles. Therefore, the usage of this script is lower in practical scenarios compared to the producer performance testing script.

Viewing the Total Message Count in a Topic #

Many times, we want to view the current total number of messages in a particular topic. Surprisingly, the built-in Kafka commands do not provide this functionality, so we have to “detour” to obtain it. By “detour”, it means that we must call a command that is not documented on the official website. The command is as follows:

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic

test-topic:0:0
test-topic:1:0

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic

test-topic:0:5500000
test-topic:1:5500000

We use the Kafka tool class GetOffsetShell to calculate the current earliest and latest offsets for a given topic and specific partition. By adding the difference between the two, we can obtain the total number of messages for the topic. In this example, the total number of messages for the test-topic is 11 million (5500000 + 5500000).

View Message File Data #

As a Kafka user, you must be interested in the content saved in Kafka’s underlying files. If so, you can use the kafka-dump-log script to view the specific content.

$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
......

If you only specify --files, the command will display metadata information for message batches or message sets, such as creation time, compression algorithm used, CRC checksum value, etc.

If we want to see each specific message in detail, we need to explicitly specify the --deep-iteration parameter like the following:

$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration
Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
| offset: 0 CreateTime: 1561597044911 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 1 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 2 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 3 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 4 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 5 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 6 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 7 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 8 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 9 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 10 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 11 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 12 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 13 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 14 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
......

In the above output, messages under a batch are marked with a vertical bar at the beginning. If you want to see the actual data inside the messages, you also need to specify the --print-data-log parameter, as shown below:

$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log

Query Consumer Group Offsets #

Next, let’s see how to use the kafka-consumer-groups script to view consumer group offsets. In the previous section, when we discussed resetting consumer group offsets, we used this command as well. At that time we used the --reset-offsets parameter, but today we are using --describe. If we want to query the offsets of consumers in a consumer group with Group ID “test-group”, the command would be as follows:

In the above graph, CURRENT-OFFSET represents the current offset that the consumer has consumed, LOG-END-OFFSET represents the offset of the latest produced message for the corresponding partition, and LAG is the difference between the two. CONSUMER-ID is an ID automatically generated by the Kafka consumer program. Up to version 2.2, you cannot influence the generation process of this ID. If the consumer program has already terminated when you run this command, the value in this column will be empty.

Summary #

Alright, let’s summarize. Today, we went through all the scripts that come with Kafka 2.2 version, and I provided the commonly used operational tool commands. I hope these commands will be helpful for you to operate and manage your Kafka cluster. Furthermore, I want to emphasize that as Kafka is continuously evolving, the usage of the commands mentioned today may change with the versions. When using these commands, it’s best to carefully read their Usage instructions.

Open Discussion #

During your use of Kafka commands, what “pitfalls” have you encountered or what painful experiences have you had?

Feel free to write down your thoughts and answers so that we can discuss them together. If you think you have gained something valuable, feel free to share this article with your friends.