13 Practical Combat of Sequential Message Consumption Filtering in Real World Scenes

13 Practical Combat of Sequential Message Consumption Filtering in Real-world Scenes #

After going through the previous sections, I believe that everyone has already mastered the common usage techniques in message consumption. In this section, we will demonstrate some practical scenarios in the field of message consumption and provide some explanations.

Sequential Consumption #

Business Scenario Description #

We are currently developing a banking project where we need to send a notification message to users for every balance change. Imagine a scenario where a user places an order on an e-commerce platform and performs a transfer transaction at the same time, resulting in simultaneous balance changes. In this case, the user must receive the notification messages in the correct order. For example, if the user first makes a purchase of 128, reducing the balance to 1000, and then transfers 200 to a friend, resulting in a remaining balance of 800, it is crucial that the order of these two messages is maintained. If the order of these two messages is reversed, it would cause a lot of confusion for the user. Therefore, it is necessary to ensure the order of messages in this scenario. The concept of “order” here refers to the order within the same account. No order needs to be maintained between different accounts. For example, if the balance of user A changes and the balance of user B changes, the order of sending these two messages does not need to be maintained since they do not interfere with each other.

Code Implementation #

In this section, we will mainly use screenshots to showcase the key code and provide some simple explanations.

1

First, the main business here is to manipulate the account balance. After the balance changes, a notification message needs to be sent to the user. Because sending messages and transferring funds are two relatively independent but closely related operations, we can introduce a message middleware to decouple these two operations. However, since the order of sending messages must be consistent with the order of deducting funds, sequential consumption needs to be used.

Since RocketMQ only provides partial ordering for message queues, in order to achieve sequential execution of a certain type of messages, these messages must be sent to the same queue. In this case, the MessageQueueSelector is used when sending messages, and the user account is used to load balance the queues. This ensures that messages of the same account will arrive at the queue in the order of balance changes, allowing them to be sequentially consumed from the queue.

2

The event listener for sequential consumption is MessageListenerOrderly, which indicates sequential consumption.

Sequential consumption is relatively simple to use, but how does RocketMQ achieve sequential consumption? Can it maintain the order when queues are reloaded? Will sequential consumption result in duplicate consumption?

Brief Introduction to RocketMQ Sequential Consumption Principle #

In RocketMQ, the PUSH-based message pulling model is illustrated in the following diagram:

3

The above process has been described in detail in previous chapters, so we won’t repeat it here. Instead, we want to emphasize the thread pool.

In RocketMQ, the message consumer is isolated based on consumer groups, and each consumer group has its own thread pool. One thread pool is responsible for allocating messages from all queues assigned to the consumer.

To ensure the sequential processing of messages in a single queue, it is necessary to lock based on the message consumption queue when processing multiple threads. Therefore, the concurrency of sequential consumption on the consumer side does not depend on the size of the consumer’s thread pool, but rather on the number of queues assigned to the consumer. If a topic is used in a sequential consumption scenario, it is recommended to increase the number of queues for the consumer to 2-3 times the number of queues for non-sequential consumption. This helps improve the concurrency of the consumer and facilitates horizontal scaling.

Horizontal scaling of the consumer or changes in the number of queues on the broker side will trigger a reload of the message consumption queues. In the process of concurrent messaging, when the queues are reloaded, one consumable queue may be accessed by multiple consumers at the same time. However, this situation does not occur in sequential consumption. This is because in addition to locking the message consumption queue when consuming messages, applying for a lock for the consumer queue at the broker is also required when assigning the message queue. In other words, only one consumer can pull messages from the queue at the same time, ensuring the semantics of sequential consumption.

In previous articles, it was also mentioned that the concurrent consumption mode has a retry mechanism when consumption fails, with a default maximum of 16 retries. However, this retry mechanism, which sends the message back to the broker before re-pulling it, will compromise the order of consumption. Therefore, if the sequential consumption mode is enabled, messages will be retried continuously on the consumer side, with the number of retries set to Integer.MAX_VALUE. As a result, if a message cannot be successfully consumed, its consumption progress will never move forward, leading to a backlog of messages.

Friendly reminder: It is important to capture exceptions in sequential consumption. You must be able to differentiate between system exceptions and business exceptions, and more accurately discern which exceptions can be resolved through retries and which ones cannot. Exceptions that cannot be resolved should be intercepted before sending them to the message queue, and the alerting function should be enhanced.

Message Filtering in Action #

Business Scenario Description #

For example, let’s consider a company that adopts a microservices architecture. It consists of several sub-systems, such as basic data, order module, and merchant module. Each module has its own independent database. Although the architectural scalability brought by microservices is beyond doubt, it becomes less convenient to perform join operations on databases that are separate from each other. In other words, if the order module needs to make use of basic data, it has to request the interface through a Dubbo service. To avoid making interface calls and considering that the amount of basic data is not particularly large, the project team prefers to synchronize the data of the basic data module to the databases of other business modules. This way, join operations with basic data can be performed within the same database.

Technical Solution #

4

The key ideas of the above solution are as follows:

  1. Once the basic data undergoes changes, a message is sent to the base_data_topic in the MQ.
  2. Downstream systems, such as the order module and merchant module, subscribe to the base_data_topic to complete data synchronization.

Question: If the order module encounters some unpredictable errors, causing data synchronization to fail, and when this is discovered, the messages stored in the MQ have already been deleted, it is necessary for the upstream (basic data) to resend the data. At this point, if the resending messages from the basic data are sent directly to the base_data_topic, all consumers of that topic will receive the messages, which is obviously inappropriate. How can this be resolved?

There are usually two ways to handle this:

  • Create another topic for each submodule, such as retry_ods_base_data_topic, and send messages to the specific submodule’s topic when needed.
  • Introduce the Tag mechanism.

This section will mainly introduce the Tag approach.

First, under normal circumstances, the basic module sends the data changes to the base_data_topic, with the Tag of the message set as “all”. Then, define a separate resending Tag for each submodule, such as ods and shop.

The consumer side subscribes to both “all” and their respective resending Tags, perfectly solving the problem.

Code Implementation #

When sending messages, specify the Tag of the message according to the requirement. Here is an example code:

5

Then, when consuming messages, subscribe to the specific tags that the module is interested in. Here is an example code:

6

When subscribing to messages, a consumer group can subscribe to multiple tags, and multiple tags are separated by double vertical bars.

The Debate between Topic and Tag #

Using Tags to differentiate messages within the same topic will have a “side effect”, which is that when a consumer group needs to reset the message consumption offset, it needs to “process” data for all tags. Although the Broker and message consumers will filter out messages that do not meet the tags, the messages still need to be read into the PageCache for filtering during message pulling, resulting in some performance loss. However, this is not a big problem.

In the data push scenario, in addition to using the Tag mechanism to differentiate resending data, it is also possible to apply for an additional topic for the resending data, i.e., using topics to differentiate different types of data. This solution is not entirely invalid, but it requires applying for numerous topics at the operational management level. Moreover, these topics actually store the same type of data, so using different topics to store the same type of data may appear somewhat loose. Of course, if it is for different business scenarios, it is recommended to use topics for isolation.

Summary #

This article mainly introduces how to use sequential messages and message filtering in two practical scenarios, and all example code is integrated into a project with Spring Boot, Dubbo, RocketMQ, and MyBatis.