19 How to Reduce Message Delay in a Message Queuing System

19 How to Reduce Message Delay in a Message Queuing System #

Hello, I am Tang Yang.

After completing the previous two lessons, I believe you have gained some understanding of how to use message queues to handle peak traffic during flash sales in a vertical e-commerce project. Of course, you should also know how to ensure that messages are not lost and avoid the impact caused by duplicate messages. Now, I’d like you to think about: Besides these aspects, what else do you need to consider when using message queues?

Let’s start with a scenario: In your vertical e-commerce project, after a user places an order and completes the payment, you will send a message to the message queue. The consumer program in the queue will then process the message, either by adding points to the user or sending them a coupon. It is acceptable for a user to wait a few minutes or even tens of minutes to receive their points and coupon after placing an order. However, if the message queue becomes heavily congested and the user receives the coupon several hours after completing the purchase, there may be complaints from users.

In this case, what you need to focus on is the latency of messages in the message queue. This is actually a performance issue. So how can you improve consumer performance and ensure shorter message latency? In my opinion, the first thing you need to do is to learn how to monitor message latency. Because with data, you can determine whether the current latency meets the requirements and evaluate the effects of optimization. Furthermore, you need to understand the correct way to use message queues and pay attention to how the message queue itself ensures that messages are stored and delivered as quickly as possible.

Next, let’s take a look at the first point: how to monitor message latency.

How to monitor message delay #

In my opinion, there are two ways to monitor message delay:

  1. Use the tools provided by the message queue to monitor message accumulation.
  2. Monitor message delay by generating monitoring messages.

Let me explain these methods in detail.

Assuming that there is a large amount of accumulated messages in the message queue of an e-commerce system, in order to monitor the accumulation of messages, you first need to understand the principle that how far a consumer has consumed in the message queue, because this will help calculate the current consumption delay. For example, if the producer has produced a total of 1000 messages in the queue, and a consumer has consumed 900 messages, then the consumption delay of this consumer is 100 messages.

In Kafka, the consumer’s consumption progress varies depending on the version.

In versions prior to Kafka 0.9, the consumption progress is stored in ZooKeeper. When consumers consume messages, they need to first obtain the latest consumption progress from ZooKeeper and then consume the subsequent messages based on this progress.

After Kafka 0.9, the consumption progress is moved into a dedicated topic in Kafka called “__consumer_offsets”. So, if you understand the principles of Kafka, you can obtain the consumption progress information from different locations based on different versions.

Of course, as a mature component, Kafka also provides tools to obtain this consumption progress information to help you implement your own monitoring. There are two main tools for this purpose:

  1. First, Kafka provides a tool called “kafka-consumer-groups.sh” (located in the bin directory of the Kafka installation package).

To help you understand, I have set up a simple Kafka node, written and consumed some messages, and then I will use the following command to see the message accumulation situation:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group

The result is as follows:

img

The first two columns in the image are basic information about the queues, including the topic name and the partition name. The third column is the current consumer’s consumption progress. The fourth column is the total number of messages produced. The fifth column is the number of accumulated consumed messages (i.e. the difference between the fourth and third columns).

With this command, you can easily understand the consumption situation of consumers.

  1. Secondly, the second tool is JMX.

Kafka exposes message accumulation data through JMX. I started a console consumer locally and used jconsole to connect to this consumer. Then, you can see the accumulation data of this consumer (highlighted in the red box in the image). You can write code to obtain these data and output them to the monitoring system. I recommend this method.

img

In addition to using the tools provided by the message queue, you can also monitor message delay by generating monitoring messages. How can you do that?

First, define a special type of message, and then start a monitoring program to write this message into the message queue cyclically. The content of the message can be a timestamp of when the message is generated, and it will also be consumed by the consumer of the queue. When the business processing program consumes this message, it simply discards it. However, when the monitoring program consumes this message, it can compare the consumption time with the generation time of the message. If the time difference exceeds a certain threshold, it can trigger an alert.

img

Both methods can monitor the consumption delay of messages. In my experience, I recommend using a combination of these two methods. For example, in my actual projects, I prioritize obtaining the queue accumulation data from JMX in the monitoring program and display it in a dashboard report. At the same time, I also start a probing process to confirm the actual message delay.

In my opinion, monitoring message accumulation is the basic monitoring for a message queue, which is something you must do. However, understanding the accumulation of messages does not directly provide insights into message consumption delay. You can only rely on experience to determine how much accumulated messages will affect user experience. On the other hand, the second method provides a more intuitive way to monitor consumption delay, and it is relatively easy to determine alarm thresholds based on time.

Now that we have understood how to monitor message delay, let’s take a look at how to improve the write and consumption performance of messages to ensure timely processing of asynchronous messages.

The Correct Approach to Reduce Message Delay #

To reduce the processing delay of messages, we need to address both the consumer side and the message queue side.

On the consumer side, our goal is to improve the message processing capability of consumers. Here are a few things you can do:

  • Optimize the consumer code to improve performance.
  • Increase the number of consumers (this is a relatively simple approach).

However, the second approach may be limited by the implementation of the message queue. For example, if Kafka is used as the message queue, increasing the number of consumers will not necessarily improve message processing capability.

In Kafka, a topic can be configured with multiple partitions. Data is written to multiple partitions either evenly or based on the specified producer’s settings. When consuming messages, Kafka designates that each partition can only be consumed by one consumer. Why is it designed this way? In my opinion, if multiple consumers could consume data from the same partition, there would be a need for locking when operating on the consumer’s progress, which could have some performance impact.

Therefore, the number of partitions in a topic determines the consumer’s parallelism. Adding additional consumers without more partitions is useless. You can increase the processing capability of consumers by increasing the number of partitions.

img

So, how can you improve the processing capability of consumers without increasing partitions?

Since you cannot increase the number of consumers, you can increase the parallelism of message processing within a consumer. Consider using multiple threads to increase processing capability: you can pre-create one or more thread pools, and after receiving a message, asynchronously process it by passing it to the thread pool. This way, the originally serial process of consuming messages becomes parallel, which can improve message consumption throughput. With parallel processing, you can fetch several more pieces of data in one interaction with the message queue and assign them to multiple threads for processing.

img

In addition, you need to be aware of the issue of idle consumption threads when consuming messages from the queue.

I discovered this issue when testing my own messaging middleware. At the time, I found that the process running the consumer client would occasionally consume high CPU usage. I printed the JVM thread stack and identified the thread causing the high CPU usage. It turned out that there was a period of time with no new messages in the message queue, causing the consumer client to continuously poll for messages without interruption, thereby using up the CPU.

So, when writing a consumer client, you need to consider this scenario. Instead of immediately pulling messages, you can wait for a certain period of time before pulling again. However, the waiting time should not be too long, as it could increase message delay. I generally recommend a fixed 10ms to 100ms, or you can increase it gradually. For example, wait for 10ms if no messages are pulled on the first attempt, wait for 20ms on the second attempt, and increase up to 100ms. Once a message is pulled, go back to waiting for 10ms.

After discussing the approaches on the consumer side, let’s talk about what the message queue itself has done to optimize reading performance.

When I designed a messaging middleware in the past, I primarily considered two aspects in terms of reading performance:

  1. Message storage
  2. Zero-copy technology

Regarding the first aspect, in the initial design, to keep it simple, I used a regular database for message storage. However, limited by the performance bottleneck of the database, the reading QPS (queries per second) could only reach 2000. Later, I refactored the storage module and used local disks as the storage medium. The presence of a page cache could improve the reading speed of messages. Even when reading data from the disk, since message reading is sequential and does not require cross-network data retrieval, the reading QPS of messages increased by an order of magnitude.

Another optimization point is zero-copy technology. Although we cannot completely eliminate data copying, we can try to minimize the number of copies. When reading data from the message queue, the process involves sending the data from the disk to the consumer client over the network. There are typically four copy steps involved:

  1. Copy data from disk to kernel cache.
  2. System call to copy data from the kernel cache to the user buffer.
  3. Write data from the user buffer to the socket buffer.
  4. The operating system copies data from the socket buffer to the network card’s buffer.

img

The operating system provides a Sendfile function, which reduces the number of data copies. With Sendfile, data in the kernel cache is directly copied to the socket buffer, eliminating one copy step and improving message sending performance. High-level languages have encapsulations for the Sendfile function. For example, the java.nio.channels.FileChannel class in Java provides the transferTo method to achieve Sendfile functionality.

img

Lesson Summary #

In this lesson, I have introduced you to how to improve the performance of message queues to reduce message consumption latency. Here, I want to emphasize the following points:

  • We can use tools provided by the message queue or send monitoring messages to monitor the message latency.
  • Scaling out consumers is an important way to improve consumption processing capacity.
  • Choosing high-performance data storage methods and using zero-copy technology can improve message consumption performance.

In fact, queues are commonly used components, and task accumulation is an issue that cannot be ignored whenever queues are involved. Many failures I have encountered were caused by this.

For example, in a recent incident I dealt with, there were only a few slow requests due to database performance degradation initially. However, these slow requests filled up the Tomcat thread pool, resulting in the overall unavailability of the service. If we could monitor the task accumulation in the Tomcat thread pool in real-time, or have some protection strategies for the thread pool, such as discarding requests once all threads are used, maybe the incident could have been avoided. Therefore, I hope you can be mindful of this in your actual work. Whenever there is a queue, monitor its task accumulation and eliminate issues at the early stage.