32 Kafka Admin Client Kafka's Operations Tool

32 KafkaAdminClient - Kafka’s Operations Tool #

Hello, I’m Hu Xi. Today, I want to share with you the topic: Kafka’s maintenance tool KafkaAdminClient.

Introduction #

In the previous lesson, I introduced you to various command-line scripts that come with Kafka. Although these scripts are convenient to use, they have some drawbacks.

Firstly, whether on the Windows or Linux platform, command-line scripts can only be run on the console. It can be very difficult to integrate them into applications, operations frameworks, or monitoring platforms.

Secondly, many of these command-line scripts provide services by connecting to ZooKeeper. Currently, the community no longer recommends direct connections to ZooKeeper using any tools, as this can lead to potential issues, such as bypassing Kafka’s security settings. In a previous column, I mentioned that when the kafka-topics script connects to ZooKeeper, it does not consider Kafka’s authentication mechanism. In other words, any user who uses this script, regardless of whether they have the permission to create topics, can successfully bypass the permission check and forcefully create topics. This obviously goes against the original intention of Kafka administrators configuring permissions.

Finally, running these scripts requires using the Kafka server-side code, which means the Kafka server-side code. In fact, the community still hopes that users only use the Kafka client-side code to operate and manage the cluster through the existing request mechanism. In this way, all operations and maintenance can be included in a unified processing mechanism, making it easier for future feature evolution.

Based on these reasons, the community officially introduced the Java client version of AdminClient in version 0.11 and continues to improve it in subsequent versions. I roughly calculated that there are more than a dozen community proposals for optimization and updates related to AdminClient, and they span across major versions, which shows the importance the community attaches to AdminClient.

It is worth noting that there is also an AdminClient on the server-side, with the package path being kafka.admin. This is an older maintenance tool class that provides limited functionality, and the community no longer recommends its use. Therefore, it is best to use the client-side AdminClient uniformly.

How to use? #

Next, let’s take a look at how to use AdminClient in your application. As we mentioned before, it is a tool provided by the Java client. To use it, you need to explicitly add the dependency to your project. I will use the latest version 2.3 as an example.

If you are using Maven, you need to add the following dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>

If you are using Gradle, add the dependency as follows:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'

Features #

Given that the community is continuously improving the functionality of AdminClient, you need to constantly monitor the release notes of different versions to see if any new operations have been added. In the latest 2.3 version, AdminClient provides the following nine categories of features:

  1. Theme Management: Includes creating, deleting, and querying themes.
  2. Permission Management: Includes configuring and deleting specific permissions.
  3. Configuration Parameter Management: Includes setting and querying details of various Kafka resources. These Kafka resources mainly include brokers, themes, users, client IDs, etc.
  4. Replica Log Management: Includes changing the underlying log path of replicas and querying details.
  5. Partition Management: Allows creating additional partitions for themes.
  6. Message Deletion: Enables deleting partition messages before a specified offset.
  7. Delegation Token Management: Includes creating, updating, expiring, and querying delegation tokens.
  8. Consumer Group Management: Includes querying consumer groups, offset querying, and deletion.
  9. Preferred Leader Election: Elects the preferred broker for a specified theme partition as the leader.

Working Principle #

Before diving into the main functionalities of AdminClient, let’s briefly understand its working principle. From a design perspective, AdminClient is a dual-threaded design: the front-end main thread and the back-end I/O thread. The front-end thread is responsible for converting user operations into corresponding requests and sending them to the queue of the back-end I/O thread. The back-end I/O thread reads the requests from the queue, sends them to the corresponding Broker nodes, and saves the execution results to be retrieved by the front-end thread.

It’s worth mentioning that AdminClient extensively utilizes the producer-consumer pattern internally to decouple request generation from request processing. I have roughly described its working principle in the following diagram.

As shown in the diagram, the front-end main thread creates a request object instance called Call. This instance has two main tasks.

  1. Construct the corresponding request object. For example, if creating a topic, it creates a CreateTopicsRequest; if querying consumer group offsets, it creates an OffsetFetchRequest.
  2. Specify the callback logic for the response. For example, the action to be executed after receiving a CreateTopicsResponse from the Broker. Once the Call instance is created, the front-end main thread puts it into the new request queue. At this point, the task of the front-end main thread is considered complete. It only needs to wait for the results to be returned.

The remaining work is all handled by the back-end I/O thread. As shown in the diagram, this thread uses three queues to handle requests at different stages: the new request queue, the pending request queue, and the in-flight request queue. Why three queues? The reason is that the thread safety of the new request queue is ensured by the monitor lock of Java. To ensure that the front-end main thread is not blocked by the monitor lock, the back-end I/O thread periodically moves all Call instances in the new request queue to the pending request queue for processing. The pending request queue and the in-flight request queue shown in the diagram are only operated by the back-end I/O thread, so no lock mechanisms are needed to ensure thread safety.

When the I/O thread is processing a request, it explicitly saves the request in the in-flight request queue. Once the processing is completed, the I/O thread automatically calls the callback logic in the Call object to perform the final processing. After completing all these tasks, the I/O thread notifies the front-end main thread that the results are ready, so that the front-end main thread can promptly retrieve the execution results. The notification mechanism in AdminClient is implemented using the wait and notify methods of Java Object.

Strictly speaking, AdminClient does not use Java’s existing queues to implement the aforementioned request queues. It uses simple container classes like ArrayList and HashMap, along with monitor locks to ensure thread safety. However, considering that they play the role of request queues, I still use the term “queue” to refer to them.

Understanding the working principle of AdminClient has the benefit of being able to debug the program that calls AdminClient specifically.

The aforementioned back-end I/O thread actually has a name, with the prefix “kafka-admin-client-thread”. Sometimes we may find that the AdminClient program seems to be working normally but the executed operation does not return results or hangs. Now you should know that this may be due to problems with the I/O thread. If you encounter similar issues, you can use the jstack command to check your AdminClient program and confirm whether the I/O thread is working normally.

This is not a made-up benefit, but an actual bug in the community. The root cause of this problem is that the I/O thread fails to capture certain exceptions and unexpectedly “hangs”. Because AdminClient is designed with two threads, the front-end main thread is not affected and can still receive user commands normally, but the program can no longer work properly at this time.

Constructing and Destroying AdminClient Instance #

If you have correctly imported the kafka-clients dependency, you should be able to see the AdminClient object when writing your Java program. Remember, its full class path is org.apache.kafka.clients.admin.AdminClient, not kafka.admin.AdminClient. The latter is the AdminClient on the server side, which is not recommended for use anymore.

The method of creating an AdminClient instance is similar to creating a KafkaProducer or KafkaConsumer instance. You need to manually construct a Properties object or a Map object, and then pass it to the corresponding method. The community has provided dozens of exclusive parameters for AdminClient. The most common and necessary parameter that must be specified is the bootstrap.servers parameter that we are familiar with. If you want to know the complete list of parameters, you can check it on the official website. To destroy an AdminClient instance, you need to explicitly call the close method of AdminClient.

You can simply use the following code to create and destroy an AdminClient instance at the same time.

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);

try (AdminClient client = AdminClient.create(props)) {
         // Your operations here...
}

This code uses the try-with-resource syntax feature in Java 7 to create an AdminClient instance and automatically close it after use. You can add your desired operation logic within the try block.

Common Examples of AdminClient Applications #

After explaining the working principle and constructor method of AdminClient, let me give you a few practical code examples to demonstrate how to use it. These examples are the most common ones.

Creating a Topic #

First, let’s see how to create a topic. The code is as follows:

String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) {
         NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
         CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
         result.all().get(10, TimeUnit.SECONDS);
}

This code calls the createTopics method of AdminClient to create the corresponding topic. The NewTopic class is used to construct the topic, which takes three parameters: topic name, number of partitions, and number of replicas.

Note the method used to obtain the result in the second last line. Currently, the return type of the AdminClient methods is objects named Result. These objects encapsulate the results as Java Futures. To obtain the execution result, you need to call the corresponding method to get the corresponding Future object, and then call the get method to obtain the result.

Of course, for creating a topic, once the topic is successfully created, the task is considered complete, and the result returned is not important as long as no exceptions are thrown.

Querying Consumer Group Offsets #

Next, I’ll demonstrate how to query the offset information of a specified consumer group. The code is as follows:

String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {
         ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
         Map<TopicPartition, OffsetAndMetadata> offsets = 
                  result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
         System.out.println(offsets);
}

Similar to creating a topic, we use the listConsumerGroupOffsets method of AdminClient to obtain the offset data of the specified consumer group.

However, for this result, we can no longer discard it because the returned Map object contains the offset data grouped by partition. You can use the offset() method of the OffsetAndMetadata object to get the actual offset data.

Obtaining the Disk Usage of a Broker #

Now, let’s use AdminClient to implement a slightly more advanced feature: obtaining the amount of disk space occupied by Kafka topics on a particular broker. Unfortunately, the JMX monitoring metrics of Kafka currently do not provide this feature, and disk usage is something that many Kafka operators need to monitor in real-time and attach great importance to.

Fortunately, we can use AdminClient to implement this feature. The code is as follows:

try (AdminClient client = AdminClient.create(props)) {
         DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // Specify the Broker id
         long size = 0L;
         for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) {
                  size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
                           topicPartitionReplicaInfoMap ->
                           topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
                           .mapToLong(Long::longValue).sum();
         }
         System.out.println(size);
}

The main idea of this code is to use the describeLogDirs method of AdminClient to obtain the log path information of all partition topics on the specified broker, and then accumulate them together to obtain the total disk usage.

Summary #

Alright, let’s summarize. The community officially released the Java client version of the AdminClient tool in version 0.11. This tool provides dozens of operations for operation and maintenance, and it is constantly evolving. If possible, it is best to use AdminClient to perform various Kafka cluster management operations and abandon those tools that connect to ZooKeeper. Additionally, I suggest that you always keep an eye on the improvement of this tool’s functionality, as the community is currently making frequent changes to AdminClient.

Open Discussion #

Please consider how we should write the code using AdminClient to add partitions to a topic. Please provide the main code.

Feel free to share your thoughts and answers, and let’s discuss together. If you find it helpful, feel free to share this article with your friends.