12 What Are Some Uncommon but Highly Advanced Client Features

12 What Are Some Uncommon but Highly Advanced Client Features #

Hello, I’m Hu Xi. Today, I want to share with you the topic: What are some uncommon but advanced features in client applications?

Since they are uncommon, it means that they don’t have a high usage rate in practical scenarios, but they are still very advanced and practical. Now, let’s welcome today’s protagonist: Kafka interceptors.

What is an Interceptor? #

If you have used Spring Interceptor or Apache Flume, you should be familiar with the concept of interceptors. The basic idea is to allow applications to dynamically implement a set of pluggable event processing logic chains without modifying the logic itself. It can insert corresponding “interception” logic at multiple points before and after the main business operation. The following diagram shows the working principle of the Spring MVC interceptor:

Image source

Interceptor 1 and Interceptor 2 insert corresponding processing logic at three different places: before sending the request, after sending the request, and after completing the request. The interceptors in Flume work similarly. The inserted logic can modify the message to be sent, create a new message, or even discard the message. These functions are dynamically inserted into the application by configuring the interceptor class, allowing for quick switching between different interceptors without affecting the main program logic.

Kafka interceptors draw inspiration from this design concept. You can dynamically implant different processing logic at multiple points before and after message processing, such as before message sending or after message consumption.

As a very niche feature, Kafka interceptors have not been widely used since their introduction in version 0.10.0.0. I have never seen any company share a successful case of using interceptors at any Kafka technology summit. Nevertheless, it is still worthwhile to have such a useful tool in your Kafka toolbox. Today, let’s unleash its power and showcase some cool features.

Kafka Interceptors #

Kafka interceptors are divided into producer interceptors and consumer interceptors. Producer interceptors allow you to implement your interceptor logic before sending a message and after a message is successfully committed, while consumer interceptors support writing specific logic before consuming a message and after committing offsets. It is worth noting that both types of interceptors support chaining, meaning you can chain a group of interceptors together into a larger interceptor, and Kafka will execute the interceptors in the order they were added.

For example, suppose you want to perform two “pre-action” steps before producing a message: the first step is to add a header information to the message, encapsulating the time the message was sent, and the second step is to update the number of messages sent. When you chain these two interceptors together and specify them as interceptors for the producer, the producer will execute the above actions in order before sending the message.

The configuration method for Kafka interceptors is done through parameters. Both the producer and consumer have a common parameter called interceptor.classes, which specifies a list of classes, where each class is an implementation class for a specific interceptor logic. Using the example above, suppose the full class path for the first interceptor is com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor, and the second class is com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor. To specify the interceptors on the producer side, you would need to do the following:

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // Interceptor 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // Interceptor 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

Now the question arises, how should you implement the classes AddTimestampInterceptor and UpdateCounterInterceptor? Actually, it’s quite simple. These two classes, as well as all the producer interceptor implementation classes you write yourself, need to inherit the org.apache.kafka.clients.producer.ProducerInterceptor interface. This interface is provided by Kafka and contains two core methods.

  1. onSend: This method is called before sending the message. If you want to make any modifications to the message before sending, this is your only chance.

  2. onAcknowledgement: This method is called after a message is successfully committed or when sending fails. Do you remember the callback notification I mentioned in the previous article? The call to onAcknowledgement comes before the call to the callback. It is important to note that this method is not called in the same thread as onSend, so if you access a shared mutable object in these two methods, make sure it is thread-safe. Furthermore, this method is on the main path of the producer sending process, so it is best not to include heavy logic in it, otherwise, you may find that your producer’s TPS drops sharply.

Similarly, specifying consumer interceptors follows the same method, except the specific implementation classes need to implement the org.apache.kafka.clients.consumer.ConsumerInterceptor interface, which also has two core methods.

  1. onConsume: This method is called before the message is returned to the consumer program. In other words, the interceptor intercepts and does some work before the message is officially processed and then returns it to you.

  2. onCommit: This method is called after the consumer commits offsets. Typically, you can perform some accounting actions in this method, such as logging.

One important thing to note is that when specifying interceptor classes, you need to provide their fully qualified names, i.e. the full package names must be included. Just having the class name is not enough, and you also need to ensure that your producer program can properly load your interceptor classes.

Typical Use Cases #

Where can Kafka interceptors be used? In fact, like many other interceptors, Kafka interceptors can be used in various scenarios including client monitoring, end-to-end system performance detection, message auditing, and more.

Let me explain in detail using the scenarios of end-to-end system performance detection and message auditing.

By default, the monitoring metrics provided by Kafka are targeted at individual clients or brokers. It is difficult to track the flow path of messages between clusters at the message level. Meanwhile, how to monitor the end-to-end latency of a message, from production to final consumption, is an urgent issue for many Kafka users.

Technically, we can add this kind of statistical logic in the client program. However, for companies that use Kafka as an enterprise-level infrastructure, it is difficult to write unified monitoring logic in the application code, as it is very flexible and it is not possible to determine all the calculation logic in advance. In addition, coupling the monitoring logic with the main business logic is also not recommended in software engineering.

Now, by implementing the logic of interceptors and the pluggable mechanism, we can quickly observe, verify, and monitor client performance metrics between clusters, especially by collecting data at the message level. This is a very typical use case of Kafka interceptors.

Let’s take a look at the scenario of message auditing. Imagine that your company uses Kafka as a private cloud message engine platform to provide services to the entire company. This would involve multiple tenants and the functionality of message auditing.

As a provider of private cloud PaaS, you definitely need to be able to view which business unit published each message at what time and which business units consumed them at what moment. One feasible approach is to write an interceptor class to implement the corresponding message auditing logic and enforce all client programs accessing your Kafka service to configure this interceptor.

Case Study #

Below I will use a specific case study to illustrate the use of interceptors. In this case, we use the implementation of an interceptor class to measure the end-to-end processing latency of messages. This is very practical, and I recommend that you directly integrate it into your own production environment.

I once conducted Kafka training for a company, and during the training, the company raised a requirement. Their scenario was simple: there was only one producer and one consumer for a certain business, and they wanted to know the average total time it took for a message in that business to be produced and consumed. However, at the time, Kafka did not provide such end-to-end latency statistics.

After learning about interceptors, we now know that interceptors can be used to meet this requirement. Since we need to calculate the total latency, there must be a common place to store it, and this common place must be accessible by both the producer and consumer programs. In this example, we assume that the data is stored in Redis.

Okay, it is obvious that we need to implement both a producer interceptor and a consumer interceptor to fulfill this requirement. Let’s start by implementing the producer interceptor:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {

    private Jedis jedis; // Jedis initialization omitted

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

The key part of the above code is to update the total number of sent messages before sending the message. To save time, I did not consider the case of sending failure, as sending failure may cause the total number of sent messages to be inaccurate. However, fortunately, the approach is the same, and you can adjust the code logic accordingly.

Here is the implementation of the consumer interceptor:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private Jedis jedis; // Jedis initialization omitted

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long latency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            latency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", latency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

In the above consumer interceptor, before consuming a batch of messages, we first update their total latency. We do this by subtracting the creation time stored in the message from the current clock time, and then accumulating the overall end-to-end processing latency of this batch of messages and updating it in Redis. The subsequent logic is simple; we read the updated total latency and total message count from Redis, and divide the two to get the average processing latency of the end-to-end messages.

After creating the producer and consumer interceptors, we configure them in the corresponding producer and consumer programs according to the specified methods above. This way, we can calculate the average processing latency of messages from the producer side to the consumer side. This end-to-end metric monitoring enables a global view of the business operation and timely evaluation of whether the business meets the end-to-end SLA goals.

Summary #

Today we spent some time discussing the obscure feature provided by Kafka: interceptors. As mentioned before, interceptors have a very low adoption rate, to the point where I have never seen any reports of domestic major companies actually using Kafka interceptors. However, obscurity does not mean uselessness. In fact, we can leverage interceptors to meet practical needs, such as end-to-end system performance detection, message auditing, etc.

Starting from this issue, we will gradually delve into more actual code. After reading today’s sharing, I hope you can get your hands dirty and write some code yourself to implement an interceptor, and experience the functionality of Kafka interceptors. You should know that “knowledge gained from reading is shallow, true understanding comes from practice”. Perhaps while coding, you will come up with a brilliant idea for using interceptors. Let’s wait and see.

Open Discussion #

Consider the following question: The signature of the onSend method of the Producer interceptor in Apache Kafka is as follows:

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)

If my implemented logic is simply to return null, do you think Kafka will discard the message or send it as it is? Please try it out and see if the result matches your expectations.

Feel free to share your thoughts and answers. Let’s discuss together. If you find it valuable, you can also share this article with your friends.