14 Message Consumption Backlog Problems Troubleshooting

14 Message Consumption Backlog Problems Troubleshooting #

Problem Description #

One of the most common issues in message consumption with RocketMQ is message backlog, which is indicated by the following diagram:

1

Message backlog refers to the difference between the current maximum offset of valid data in the broker queue (brokerOffset) and the current processing progress of the message consumer (consumerOffset). It represents the messages that need to be consumed but have not been consumed yet.

Problem Analysis and Solution #

When a team encounters a message backlog issue, they usually suspect it to be a problem with the RocketMQ broker and will contact the responsible person for the message middleware. The person in charge will first investigate any anomalies on the broker side. However, based on my experience, this situation is usually caused by issues on the consumer side, while problems encountered during message sending are more likely to be broker-related. Of course, I also have a method to provide evidence, and I will provide diagnostic methods for the server later. Here, we can basically use an analogy because a topic is usually subscribed by multiple consumer groups. We just need to check if other consumer groups also have a backlog, as shown in the following diagram:

2

From the diagram above, we can see that two different consumer groups have subscribed to the same topic. One group is experiencing message backlog, while the other group is consuming messages normally. From here, we can focus our analysis on the specific project team. But how do we analyze this problem specifically?

To better grasp the entry point for problem analysis, I would like to reiterate the RocketMQ message pulling model and the message consumption progress submission mechanism. The message pulling model in RocketMQ is shown in the following diagram:

3

In RocketMQ, each client creates a separate thread PullMessageService to continuously pull a batch of messages from the broker. These messages are then submitted to the consumer’s thread pool for consumption. Once a thread in the thread pool finishes consuming a message, it reports the current consumption progress of the consumer to the server. When submitting the consumption progress, it submits the offset of the message with the minimum offset in the processing queue as the progress of the consumer group. For example, if there is a message with an offset of 100 that has not been successfully consumed for some reason, the progress of that consumer group will not advance. Over time, the offset of the message in the broker will be much larger than the current consumption progress of the consumer group, resulting in a message backlog.

Therefore, when encountering this situation, you should usually check the status of the threads in the consumer’s thread pool. You can use the following command to obtain the thread stack of the application:

4

You can obtain the currently running Java program by using the jps -m or ps -ef | grep java command. By starting the main class, you can get the process ID of the application. Then, you can use the jstack pid > j.log command to capture the thread stack. Here, I recommend running this command consecutively 5 times to obtain 5 thread stack files, which can be used to compare the state of the threads and see if they are making progress.

After obtaining the thread stack information using jstack, you can search for threads starting with ConsumeMessageThread_ and check their statuses, as shown in the following diagram:

5

The consumer thread with a status of RUNNABLE is waiting for network reading. You can check the status of this thread in other files. If its status remains RUNNABLE, it indicates that the thread is always waiting for network reading, which means that the thread is constantly “blocked” in network reading. Once blocked, the message being processed by that thread will remain in the consuming state, and the consumption progress of the message will be stuck here without moving forward. This eventually leads to message backlog.

By examining the thread stack, you can find the specific method causing the blocking. From here, it can be seen that an HTTP request is being called. The code trace is as follows:

6

Once you have located the code, it is relatively easy to proceed with the problem. Typically, a timeout should be set for network calls. In this case, since the timeout was not set, the thread was waiting for the response from the other end indefinitely, causing the consumption progress of the messages to be unable to advance. The solution is to set a timeout.

Common scenarios that can cause thread blocking include:

  • HTTP requests with no timeout set
  • Slow database queries causing excessive query time, resulting in high message consumption delay

Experience in Thread Stack Analysis #

Some online sources suggest focusing on WAIT, Block, and TIMEOUT_WAIT states when analyzing thread stacks. However, this is not always the case. Threads in the RUNNABLE state should not be ignored either because for MySQL reading and writing, network reading and writing during which a thread waits for the response from the other end will be in the RUNNABLE state, not in the so-called BLOCK state.

If the number of threads in the thread stack shown in the following diagram is larger, it indicates that the message consumer has good processing capacity, but the speed of pulling messages cannot keep up with the speed of message consumption. 7

RocketMQ Consumer Flow Control Mechanism #

RocketMQ consumer restricts the message consumption from three dimensions:

  1. When the number of messages in the consumer queue exceeds 1000.
  2. When the difference between the maximum and minimum offset in the processing queue exceeds 2000, even if the number of messages in the processing queue has not exceeded 1000.
  3. When the total size of messages in the processing queue exceeds 100MB.

To understand the design concept of the three rules above, let’s first take a look at the data structure of the consumer, as shown in the diagram below:

8

The PullMessageService thread fetches a batch of messages from the broker according to the queue, and then stores them in the ProcessQueue, which is the processing queue. The messages are then submitted to the consumer thread pool for consumption. After the messages are consumed, the corresponding messages are removed from the ProcessQueue, and the consumer progress is submitted to the broker. The submitted consumer offset is the minimum offset in the ProcessQueue.

Rule 1: When the number of messages in the consumer queue exceeds 1000, it triggers flow control. Flow control pauses message fetching from the broker for that queue, but it does not prevent message fetching from other queues. For example, if the number of messages in queue q0 exceeds 1000, but the number of messages in queue q1 is less than 1000, the messages in queue q1 will continue to be consumed. The purpose is to avoid excessive message backlog, which may cause memory overflow if fetching continues.

Rule 2: The messages in the ProcessQueue are actually maintained in a TreeMap, where the key is the message offset and the value is the message object. Since TreeMap is sorted, it is easy to calculate the difference between the maximum offset and the minimum offset, which means there may only be 3 messages in the processing queue, but the offset exceeds 2000. For example, as shown in the diagram below:

9

This situation is very likely to occur. The main reason is that the thread with a consumption offset of 100 is stuck (“blocked”) due to some reasons, while other messages can be consumed normally. Although this situation does not cause memory overflow, it is highly likely to cause a large number of messages to be consumed repeatedly. The reason for this is related to the mechanism of message consumption progress submission. In RocketMQ, when a message with an offset of 2001 is successfully consumed, the consumer reports the consumption progress to the server not as 2001, but takes the minimum offset in the processing queue, which is 100. In this way, although the messages are always being processed, the message consumption progress cannot move forward. Imagine that at this time, the maximum message offset is 1000, and the project team found that there is a backlog of messages, so they restart the consumer. Then the messages will be consumed from offset 100 again, causing a large number of duplicated message consumption. To avoid a large number of duplicated message consumption, RocketMQ limits this situation by restricting it to a difference of no more than 2000.

Rule 3: When the total size of messages in the processing queue exceeds 100MB.

This is more direct. Flow control is performed not only based on the number of messages, but also based on the message size. Flow control is triggered if the total size of messages in the processing queue exceeds 100MB, which obviously aims to prevent memory overflow.

After understanding the flow control rules of RocketMQ messages, relevant flow control logs will be output in rocketmq_client.log. You can search for “so do flow control” to find the logs. The details are shown in the following diagram:

10

RocketMQ Server Performance Self-inspection Tips #

How to prove that the RocketMQ cluster itself has no issues? It’s actually quite simple. A common technique is to check the performance of RocketMQ message writing by executing the following command:

cd ~/logs/rocketmqlogs/
grep 'PAGECACHERT' store.log  | more

The output results are shown in the following diagram:

11

In RocketMQ Broker, the time distribution of message writing in the previous minute is printed every minute, for example, [<=0ms] represents the number of messages written to the Broker within 0ms in that second. The others follow in the same order. Usually, within tens of thousands of message sending, there will not be one occurrence within 100~200ms. From this, you can basically see the pressure of writing messages to the Broker.

Conclusion #

In this section, we started with the phenomenon of message backlog, then analyzed and solved the problem, and finally supplemented it with some theoretical explanations, trying to avoid knowing the results without understanding the reasons behind them.