20 Multi Threaded Development of Consumer Instances

20 Multi-threaded Development of Consumer Instances #

Hello, I’m Hu Xi. Today, let’s talk about the implementation of multi-threaded consumption in Kafka Java Consumer.

Currently, the hardware conditions of computers have greatly improved. Even on ordinary laptops, having multiple cores is already standard, not to mention professional servers. If an application running on a powerful server machine is still designed as a single-threaded architecture, it would be a great waste. However, Kafka Java Consumer is designed as a single-threaded consumer, which may surprise you. Therefore, it is very necessary to explore its multi-threaded consumption solution.

Design Principles of Kafka Java Consumer #

Before delving into the details, let me briefly explain why Kafka Java Consumer employs a single-threaded design. Understanding this point will be beneficial for us when we later develop a multi-threaded approach.

When it comes to the Java Consumer API, the most important class is KafkaConsumer. It is not entirely accurate to say that KafkaConsumer is designed to be single-threaded. Starting from version 0.10.1.0, KafkaConsumer became a dual-threaded design, consisting of the user main thread and the heartbeat thread.

The user main thread refers to the thread in which you start the Consumer application’s main method, while the newly introduced heartbeat thread is responsible for sending periodic heartbeat requests to the corresponding Broker machine, indicating the liveliness of the consumer application. The introduction of this heartbeat thread also serves the purpose of decoupling the heartbeat frequency from the frequency at which the main thread calls the KafkaConsumer.poll method, thereby decoupling the actual message processing logic from the consumer group members’ liveliness management.

However, despite the presence of the heartbeat thread, the actual message retrieval logic is still performed in the user main thread. Therefore, in terms of consuming messages, we can still safely consider KafkaConsumer as being single-threaded in design.

In fact, before the Java Consumer API was introduced by the community, Kafka had a set of APIs known as the Scala Consumer API. This set of APIs or consumers, referred to as the old version Consumer, has been completely removed from the new version of the Kafka codebase.

The reason I mention this is to tell you that the old version Consumer had a multi-threaded architecture. Each Consumer instance internally created a corresponding message retrieval thread, also known as a Fetcher thread, for all subscribed topic partitions. The old version Consumer was also blocking in nature. After a Consumer instance was started, it would create many blocking message retrieval iterators internally. However, in many scenarios, the Consumer side requires non-blocking behavior. For example, in stream processing applications, operations such as filtering, joining, and grouping cannot be blocking. For this reason, the community designed a single-threaded + polling mechanism for the new version Consumer. This design allows for non-blocking message retrieval.

In addition, the single-threaded design simplifies the Consumer side’s design. After the Consumer retrieves messages, whether the message processing logic is implemented in multiple threads is entirely up to you. This way, you have the freedom to separate the multi-threaded management strategy of message processing from the Consumer code.

Furthermore, regardless of the programming language used, a single-threaded design is relatively easy to implement. On the other hand, not all programming languages support multi-threading well. From this perspective, a Consumer designed with a single thread is more easily portable to other languages. After all, if the Kafka community wants to build an upstream and downstream ecosystem, it is certainly hoped that more and more clients will emerge.

Multi-threading Solutions #

After understanding the design principles of single-threading, let’s analyze in detail the usage of the KafkaConsumer class, as well as how to deduce corresponding multi-threading solutions.

Firstly, we need to be clear that the KafkaConsumer class is not thread-safe. All network I/O operations occur in the user’s main thread, so you must ensure thread safety when using it. In simple terms, you cannot share the same KafkaConsumer instance among multiple threads, otherwise the program will throw a ConcurrentModificationException.

However, there is one method in KafkaConsumer that is an exception to this. It is called wakeup() and you can safely call KafkaConsumer.wakeup() in other threads to wake up the consumer.

Given the fact that KafkaConsumer is not thread-safe, we can come up with two multi-threading solutions.

  1. The consumer program starts multiple threads, each thread maintaining its own KafkaConsumer instance, and being responsible for the complete process of message retrieval and handling, as shown in the following diagram:

  1. The consumer program uses single or multiple threads to retrieve messages, while concurrently creating multiple consumer threads to execute the message processing logic. The thread(s) retrieving messages can be one or more, and each thread maintains its own KafkaConsumer instance, with message processing delegated to a specific thread pool, thus achieving true decoupling of message retrieval and processing. The specific architecture is shown in the following diagram:

In general, both solutions involve creating multiple threads, all participating in the message consumption process. However, their respective approaches are different.

Let’s take an analogy. For example, if a complete consumer application needs to perform tasks 1, 2, 3, 4, and 5, Solution 1 uses a coarse-grained division of work, meaning it creates multiple threads, each executing tasks 1, 2, 3, 4, and 5 completely, in order to achieve parallel processing. It does not further divide the specific subtasks. On the other hand, Solution 2 is more fine-grained, where it separates tasks 1 and 2, handled by a single thread (or multiple threads), while tasks 3, 4, and 5 are handled by additional threads.

Which solution is better? It can be said that each has its own strengths. I have summarized the pros and cons of these two solutions in the table below.

Next, I will explain the content in the table in detail. Let’s start with discussing the advantages of Solution 1.

  1. It is simple to implement because it aligns with our current practice of using the Consumer API. When writing code, we can create dedicated KafkaConsumer instances in multiple threads.
  2. There is no interaction among multiple threads, which eliminates the overhead of ensuring thread safety.
  3. Each thread uses a dedicated KafkaConsumer instance to fetch and process messages. This ensures that each partition in the Kafka topic is handled by only one thread, making it easy to achieve message consumption order within partitions. This is a significant advantage for applications that require strict event ordering.

Having discussed the advantages of Solution 1, let’s now look at its drawbacks.

  1. Each thread maintaining its own KafkaConsumer instance will inevitably consume more system resources, such as memory and TCP connections. In resource-constrained environments, this disadvantage of Solution 1 becomes more apparent.
  2. The number of threads that can be used in this solution is limited by the total number of partitions subscribed by the Consumer. In a consumer group, each subscribed partition can only be consumed by one consumer instance within the group. Suppose a consumer group subscribes to 100 partitions, Solution 1 can only scale up to a maximum of 100 threads. Any additional threads will not be assigned to any partition and will only consume system resources unnecessarily. Of course, this scalability limitation can be mitigated by a multi-rack architecture. In addition to enabling 100 threads to consume data on one machine, we can also choose to create one thread on each of 100 machines, achieving the same effect. Therefore, if you have abundant machine resources, this drawback is not a cause for concern.
  3. Each thread fully executes message fetching and processing logic. If the message processing logic is heavy and causes slow message processing, unnecessary rebalancing is likely to occur, resulting in the consumption halt of the entire consumer group. You need to pay attention to this drawback. We previously discussed how to avoid rebalancing. If you don’t remember, you can review it in Lecture 17 of the column.

Now let’s talk about Solution 2.

Different from the coarse-grained approach of Solution 1, Solution 2 divides the task into two parts: message fetching and message processing, which are handled by separate threads. The biggest advantage of Solution 2 compared to Solution 1 lies in its high scalability, meaning that we can independently adjust the number of threads for message fetching and message processing without considering if they affect each other. If your message fetching speed is slow, you can increase the number of threads for message fetching. If the message processing speed is slow, you can increase the number of worker threads in the thread pool.

However, this architecture also has its shortcomings.

  1. It is much more difficult to implement compared to Solution 1, as it involves managing two groups of threads separately.
  2. Because Solution 2 separates message fetching and message processing, i.e., the thread that fetches a certain message is not necessarily the thread that processes it, it cannot guarantee the consumption order within partitions. For example, in a partition, if message 1 is stored before message 2, the order in which the Consumer fetches the messages will be message 1 first, then message 2. However, it is possible for the subsequent Worker threads to process message 2 before processing message 1, which breaks the order of messages within the partition. As I mentioned before, if you care about the ordering of messages in Kafka, this drawback of Solution 2 is fatal.
  3. Solution 2 introduces multiple groups of threads, which lengthens the entire message consumption pipeline and makes it extremely difficult to correctly commit offsets. As a result, duplicate message consumption may occur. If you care about this, I do not recommend using Solution 2.

Implementation Code Examples #

After discussing so much pure theoretical stuff, let’s take a look at what the actual implementation code might look like. After all, as Linus said, “Talk is cheap, show me the code!”

First, I’ll share with you the main code for solution 1:

public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;


     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
	 		ConsumerRecords records = 
	 			consumer.poll(Duration.ofMillis(10000));
                 //  Execute message processing logic
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }


     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
}

This code creates a Runnable class that represents the logic for fetching and processing messages. Each KafkaConsumerRunner class will create an exclusive KafkaConsumer instance. In a real application, you can create multiple KafkaConsumerRunner instances and start them sequentially to implement a multi-threaded architecture for solution 1.

For solution 2, the core code looks like this:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...

private int workerNum = ...;
executors = new ThreadPoolExecutor(
	workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
	new ArrayBlockingQueue<>(1000), 
	new ThreadPoolExecutor.CallerRunsPolicy());

...
while (true)  {
	ConsumerRecords<String, String> records = 
		consumer.poll(Duration.ofSeconds(1));
	for (final ConsumerRecord record : records) {
		executors.submit(new Worker(record));
	}
}
..

The most important part of this code is the last line: after the Consumer’s poll method returns messages, a dedicated thread pool is responsible for handling the specific messages. The main thread that calls the poll method is not responsible for message processing logic, thus implementing a multi-threaded architecture for solution 2.

Summary #

To summarize, today I shared with you the implementation solutions for multi-threaded consumption with Kafka Java Consumer. I provided two commonly used approaches and introduced their respective advantages, disadvantages, and code examples. I hope that based on this content and combined with your actual business scenarios, you will be able to implement a multi-threaded architecture that suits your needs. By truly understanding the essence of multi-threaded consumption and applying it to different situations, you can master this concept and eventually build more sophisticated systems in the future.

Image

Open Discussion #

Today we are discussing solutions for multithreading. Some may argue, why go through the trouble and not just start multiple consumer processes directly? So, please compare the advantages and disadvantages of the multithreading solution and the multiprocessing solution, and think about their respective merits.

Please feel free to write down your thoughts and answers, and let’s discuss together. If you find it helpful, feel free to share this article with your friends.