07 Transactional Message Usage and Solution Selection Thinking

07 Transactional Message Usage and Solution Selection Thinking #

Use Cases for Transactional Messages #

First of all, it needs to be clarified that transactional messages and the use of RocketMQ to solve distributed transactions are not the same thing.

Why did RocketMQ introduce transactional messages? Let’s use an example of an e-commerce system that rewards users with points for logging in to explain.

In the early stages of e-commerce development, in order to increase user engagement, it is common to have a daily login activity where users can earn points for their first login each day.

Before the introduction of the points reward activity, the code for user login would be as follows:

public Map<String, Object> login(String userName, String password) {
    Map<String, Object> result = new HashMap<>();
    if(StringUtils.isEmpty(userName) || StringUtils.isEmpty(password)) {
       result.put("code", 1);
       result.put("msg", "Username and password cannot be empty");
       return result;
     }
     try {
        User user = userMapper.findByUserName(userName);
        if(user == null || !password.equals(user.getPassword()) ) {
            result.put("code", 1);
            result.put("msg", "Incorrect username or password");
            return result;
         }
         // Successful login, record login log
         UserLoginLogger userLoginLogger = new UserLoginLogger(user.getId(), System.currentTimeMillis());
         userLoginLoggerMapper.insert(userLoginLogger);
         result.put("code", 0);
         result.put("data", user);
     } catch (Throwable e) {
         e.printStackTrace();
         result.put("code", 1);
         result.put("msg", e.getMessage());
     }
     return result;
 }

Tip: All the code examples in this article are for illustrative purposes only, and there may be some differences between the code shown and the actual production code, especially in terms of business completeness.

The above code example for user login can be divided into 3 steps:

  1. Validate user parameters.
  2. Validate the username and password.
  3. Record the login log.

Now, in order to increase daily user activity, a new activity is introduced where users can earn 200 points for their first login each day from September 1st to September 15th, 2020.

After receiving this requirement, the development team quickly implemented the code as follows:

1

The red section in the code is the new code added for the points reward requirement. However, this approach has a clear drawback - it increases the login time because we need to call a points RPC service. After analysis, we can see that the user login operation is the main process and the points reward is just an auxiliary process that can be decoupled. At this point, it is natural to think of using a message middleware to achieve decoupling. That’s right, after the code is modified, it looks like the following:

2

But now another problem arises. Suppose the message is successfully sent, but then the user directly terminates the application by using the kill -9 command. In this case, the login record will not be saved in the database. However, since the message has been sent successfully, the points will still be awarded when the user logs in again. In other words, the key issue here is that we cannot guarantee that the MySQL database transaction and the message sending, which are independent operations, either succeed both or fail both. What we need is the ability to handle distributed transactions.

Therefore, in order to solve the inconsistency between message sending and database transactions, RocketMQ introduced transactional messages in version 4.3.0, which perfectly solves the above problem.

RocketMQ Transactional Message Principle #

The implementation principle of transactional messages is shown in the following diagram:

3

After the application completes the relevant business data persistence in the transaction, it needs to call the RocketMQ message sending interface synchronously to send a message with the status “prepare”. After the message is successfully sent, the RocketMQ server will callback the event listener of the rocketMQ message sender to record the local transaction status of the message. This status is part of the same transaction as the local business operation, ensuring the atomicity of message sending and local transactions.

When RocketMQ receives a message of the “prepare” type, it first backs up the original topic and message consumer queue of the message, and then stores the message in the message consumer queue with the topic “RMQ_SYS_TRANS_HALF_TOPIC”. This is why the message is not immediately consumed by the consumer.

The RocketMQ message server starts a scheduled task to consume messages from the “RMQ_SYS_TRANS_HALF_TOPIC” and initiate a transaction status query to the message sender (application). The application feedbacks the transaction status to the message server based on the saved transaction status. If it is “commit” or “rollback”, the message server commits or rolls back the message accordingly. If it is “unknown”, the message server waits for the next query. RocketMQ allows setting the interval and number of queries for each message. If the transaction status of an unknown message is not determined after exceeding the query limit, the message will be rolled back by default.

Case Study of Transactional Messages #

From the above process, the implementation principle of RocketMQ transactional messages can be seen as similar to two-phase commit + periodic polling. The implementation approach is actually similar to what we used before transactional messages were available, where we would use a combination of databases and scheduled tasks. The only difference is that RocketMQ’s transactional messages automatically provide the functionality of scheduled checkbacks.

Next, we will use the user login + reward points scenario mentioned above to demonstrate the use of transactional messages in a real application based on Spring Boot. This will help us solve the problems we encountered and achieve consistency between user login and message sending, which are both distributed operations.

The overall sequence diagram of this transactional message is as follows:

4

Next, I will provide screenshots of the key code. In order to better align the demonstration with real-world scenarios, starting from this article, I have provided a project based on Spring Boot, Dubbo, MyBatis, and RocketMQ. You can download the project by clicking the link below:

https://pan.baidu.com/s/1ccSMN_dGMaUrFr-57UTn_A

Extraction code: srif

The overall directory structure of the project is as follows:

5

The code for implementing transactional messages is located in the rocketmq-example-user-service module.

RocketMQ Producer Initialization #

6

7

Code Explanation: The TransactionMQProducerContainer is introduced above, and its main purpose is to associate a transactional callback listener with the transactional message. Therefore, here we use a separate TransactionMQProducer for each Topic, and the producer group is a combination of a fixed prefix and the Topic name.

UserServiceImpl Business Method Implementation Overview #

8

Code Explanation: The login method in UserServiceImpl implements the login processing logic. First, it executes the regular business logic, which is to verify the username and password of the login user. If they do not match, it returns the corresponding business error message. If the login is successful, it builds a login log and generates a globally unique business serial number for the login log. This serial number will be used throughout the entire business processing process, including transactional callbacks, message consumption, and other steps.

Note: There is no database write operation here. The database write operation is placed in the transactional message listener.

Event Callback Listener #

9

Code Explanation:

executeLocalTransaction

Here, we introduced a local transaction table. In a transaction, we operate on both the business data and the local transaction log in the database, and only when this transaction is successfully committed, we update the business type. Why the business write operation needs to be placed here is mainly because RocketMQ captures the exceptions generated in this method. If we place the business operation in the UserServiceImpl class and place the local transaction log table in the callback function, it will lead to inconsistency between the two.

checkLocalTransaction

This method is called by the RocketMQ Broker. In this method, if we can find a record based on the unique serial number, it means the transaction has been successfully committed, so we return COMMIT_MESSAGE. RocketMQ will then submit the transaction and send the message in the PREPARE state to the actual Topic to be consumed by consumers. If we cannot find the message in the local transaction table, we return UNOWN. We cannot directly return ROLL_BACK. Only when RocketMQ fails to find the message after a certain number of checkbacks, it will rollback the message. The client will not consume the message. In this way, we achieve consistency between business and message sending in distributed transactions.

There are additional test codes above to test the success and failure of the transaction, and see if the database and MQ are consistent.

To test the example, start the rocketmq-example-gateway and rocketmq-example-user-service modules, and make sure the RocketMQ server and ZooKeeper server are running. Then you can send requests using a browser or Postman to test. An example is shown below:

10

You can also use the RocketMQ-Console to check if the message has been successfully sent. The screenshot is as follows:

11

Architectural Considerations for Transactional Messages #

Transactional messages can ensure strong consistency between business operations and message sending. Previously, when transactional messages were not available, there were usually two solutions:

  • Strict transactional consistency: Implement a local transaction table and a callback function, and use a scheduled task to scan the local transaction messages and send messages.
  • Based on the compensation idea: For example, when sending a message, use a message retry mechanism to ensure the message is successfully sent. Additionally, combine the business status, such as in the case of an order flow, when the order status is successfully paid, a message is sent to the RocketMQ cluster, and then the merchant logistics system subscribes to this Topic and consumes it. After processing, the order status is changed to “shipped.” However, if this message is lost, it cannot drive the subsequent flow of the order. In this case, we can use a timer to scan the order table, find orders that have been paid but not shipped, and have exceeded a certain time, and then re-send a message. This can also achieve the ultimate consistency.

Indeed, transactional messages can provide strong consistency. However, they require the introduction of a transaction local table, and each business operation incurs an additional database write overhead. On the other hand, based on the compensation approach, it adopts an optimistic mechanism, and the probability of failure is already very low, so the efficiency is usually better.

Therefore, everyone can choose the appropriate technical solution based on the actual situation, and not feel compelled to choose the transactional message solution just because it is a powerful technology.

Summary #

The structure of this article is to first introduce the use cases of transactional messages, then explain the implementation principle of RocketMQ transactional messages in detail, and finally provide a runnable example project based on Spring Boot, MyBatis, RocketMQ, and Dubbo. Finally, I provided some simple thoughts on the architecture of this example.