04 Discussing Message Sending Practical Application Scenarios

04 Discussing Message Sending - Practical Application Scenarios #

In this section, we will combine various usage scenarios and use appropriate APIs to solve specific real-world problems.

Message Sending Modes #

RocketMQ supports three types of message sending modes: synchronous, asynchronous, and oneway.

  • Synchronous: The client initiates a message sending request and waits synchronously for the server’s response.
  • Asynchronous: The client initiates a message sending request, but does not wait for the server’s response. Instead, it immediately returns, without blocking the client’s sub-thread. The client automatically calls the callback function when it receives the response from the server (Broker).
  • Oneway: The client initiates a message sending request without waiting for the server’s response or calling the callback function. It does not care about the final result of message sending.

First, let’s demonstrate the asynchronous message sending mode using a demo.

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("testProducerGroup");
    producer.setNamesrvAddr("192.168.3.10:9876");
    try {
        producer.start();
        // Sending a single message
        Message msg = new Message("TOPIC_TEST", "hello rocketmq".getBytes());
        producer.send(msg, new SendCallback() {
            // Callback function for successful message sending
            public void onSuccess(SendResult sendResult) {
                System.out.printf("%s%n", sendResult);
            }
            // Callback function for failed message sending
            public void onException(Throwable e) {
                e.printStackTrace();
                // Message sending failed, can compensate here, such as storing the message in the database and retrying at a scheduled time.
            }
        });
    } catch (Throwable e) {
        e.printStackTrace();
        // Message sending failed, can compensate here, such as storing the message in the database and retrying at a scheduled time.
    }
    Thread.sleep(3000);
    // After use, close the message producer
    // In applications based on Spring Boot, the shutdown method is not called when sending messages, but waits for the spring container to stop
    producer.shutdown();
}

The oneway mode is usually used to send less important messages, such as operation logs, occasional message loss does not affect the business. So, how to choose between synchronous sending and asynchronous sending in actual production?

When answering the question of whether to choose synchronous sending or asynchronous sending, let’s first briefly introduce the implementation principle of asynchronous sending:

  • Each message sender instance (DefaultMQProducer) internally creates an asynchronous message sending thread pool with a default thread count equal to the number of CPU cores. The thread pool holds a bounded queue with a default length of 50,000 and controls the maximum concurrency of asynchronous calls, which is 65536 by default and can be configured through the parameter clientAsyncSemaphoreValue.
  • The client uses the thread pool to send messages to the server. After the server finishes processing, it returns the result and calls the SendCallback callback function based on whether an exception occurred.

Based on my practical experience, I recommend the following:

If the MQ and application servers are in the same internal network, the network traffic can usually be ignored, and the design goal of MQ is low latency and high concurrency. Therefore, there is usually no need to use asynchronous sending, especially when it comes to improving RocketMQ Broker-related parameters, especially the flush strategy and replication strategy. However, if in a scenario where multiple messages need to be sent in a database transaction, using asynchronous sending can also provide some performance improvements.

If asynchronous sending is used, the programming model may be slightly more complex, and the compensation mechanism and fault tolerance mechanism will become more complex.

As mentioned in the above example, the compensation code should be called in two places:

  • Exception handling in the producer#send method, common exception information: MQClientException("executor rejected ", e).
  • Compensate in the onException of SendCallback, common exceptions include timeout and RemotingTooMuchRequestException.

Queue Selection Mechanism #

Imagine this scenario: the order management system allows users to update order information, and orders have their own lifecycle, such as pending payment, paid, seller shipped, buyer received, and so on. The current system architecture is designed as shown in the diagram below:

1

An order corresponds to multiple messages (such as creation, order modification, order status change). If left unmanaged, messages with the same order ID will be stored in multiple queues of the order_topic. From the RocketMQ queue load balancing mechanism, different queues will be consumed by different consumers. However, this business has its own uniqueness. When order-service-app consumes messages, it wants to process them in the order of order changes. So how should we handle this?

From previous articles, we learned that RocketMQ supports queue-level sequential consumption. Therefore, all we need to do is to send different messages with the same order number to the same queue. This way, when we consume the messages, we can process them in order.

To solve this problem, RocketMQ provides a custom queue load balancing mechanism during message sending. The default queue load balancing mechanism for message sending is round-robin. How can we choose the queue? RocketMQ provides the following API:

2

Here is an example of how to use it:

public static void main(String[] args) throws Exception {
    
    DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    // Order entity
    Order order = new Order();
    order.setId(1001L);
    order.setOrderNo("2020072823270500001");
    order.setBuyerId(1L);
    order.setSellerId(1L);
    order.setTotalPrice(10000L);
    order.setStatus(0);
    System.out.printf("%s%n", sendMsg(producer, order));
    // Order status change
    order.setStatus(1);
    // Resend message
    System.out.printf("%s%n", sendMsg(producer, order));
    producer.shutdown();
}

public static SendResult sendMsg(DefaultMQProducer producer, Order order) throws Exception {
    // To facilitate message retrieval, the order number is used as the key in message construction
    Message msg = new Message("order_topic", null, order.getOrderNo(), JSON.toJSONString(order).getBytes());
    return producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            // Select the queue based on the hashcode of the order number
            if (mqs == null || mqs.isEmpty()) {
                return null;
            }
            int index = Math.abs(arg.hashCode()) % mqs.size();
            return mqs.get(index < 0 ? 0 : index);
        }
    }, order.getOrderNo());
}

After running the above code, you can use the queryMsgByKey command to query messages based on the set key. The result is shown in the following image:

3

From here, we can see that both messages are written to queue 1. The sequential consumption of queues will be detailed in the message consumption section. This article only focuses on message sending.

Friendly reminder: If you use the MessageQueueSelector during message sending, the message’s retry mechanism will be invalid. In other words, RocketMQ client does not retry. The high availability of message sending needs to be ensured by the business, such as saving the failed message in the database and then scheduling it regularly until it is sent to the MQ.

Use Cases of RocketMQ Key #

RocketMQ has a very attractive feature compared to Kafka, especially for business-related scenarios. RocketMQ provides a rich message query mechanism, such as using message offset, globally unique msgId, and message key.

When sending a message with RocketMQ, you can set an index key for the message. For example, in the example above, the order number is used as the message key, so we can query messages using this index key.

To specify a key for a message, you only need to pass the key parameter when constructing the message. For example, the following API:

4

If you want to specify multiple keys for a message, you can separate them with spaces. The example code is as follows:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    
    // Order entity
    Order order = new Order();
    order.setId(1001L);
    order.setOrderNo("2020072823270500002");
    order.setBuyerId(1L);
    order.setSellerId(2L);
    order.setTotalPrice(10000L);
    order.setStatus(0);
    
    Message msg = new Message("dw_test", null, "2020072823270500002 ODS0002", JSON.toJSONString(order).getBytes());
    
    System.out.printf("%s%n", producer.send(msg));
    producer.shutdown();
}

In addition to the queryMsgByKey method mentioned above, you can also perform message queries through RocketMQ-Console. The screenshot is as follows:

5

RocketMQ Tag usage scenarios #

RocketMQ allows you to set tags for a topic, so consumers can filter messages in the topic based on tags, selectively processing messages.

For example, in the life cycle of an order, there are different stages such as order creation, pending payment, payment completed, merchant review, merchant delivery, and buyer delivery. Each change in the order status sends a message to the order_topic. However, different downstream systems are only interested in certain stages of the order flow and do not need to process all messages.

For example, we have the following two scenarios:

  • In the activity module, a coupon is issued whenever a user places an order and successfully pays.
  • In the logistics module, once an order is approved, logistics information needs to be created, and a supplier needs to be selected.

To handle these scenarios, two consumer groups, order_topic_activity_consumer and order_topic_logistics_consumer, are created. However, these consumer groups do not need to process all messages. This is where the Tag mechanism comes in.

When sending messages, for example, when creating an order, set the tag to c, and for messages created when payment is successful, set the tag to w. Then, consumers in different scenarios subscribe to the topic and specify the tag they are interested in. The example code is as follows:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    
    // Order entity
    Order order = new Order();
    order.setId(1001L);
    order.setOrderNo("2020072823270500003");
    order.setBuyerId(1L);
    order.setSellerId(2L);
    order.setTotalPrice(10000L);
    order.setStatus(0);
    
    Message msg = new Message("dw_test", "c", "2020072823270500003", JSON.toJSONString(order).getBytes());
    
    System.out.printf("%s%n", producer.send(msg));
    
    order.setStatus(1);
    msg = new Message("dw_test", "w", "2020072823270500003", JSON.toJSONString(order).getBytes());
    
    System.out.printf("%s%n", producer.send(msg));
    producer.shutdown();
}

// Consumer example
public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_topic_activity_consumer");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("dw_test", "c");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

The key point here is to subscribe to the topic and specify the tag when subscribing. After running the consumer, you can check the consumption status of these messages, as shown in the following figure:

6

Messages that do not match the subscription tag have a consumption status of “CONSUMED_BUT_FILTERED”.

RocketMQ msgId explained #

The result of sending a message is shown in the following figure:

8

The returned fields include msgId and offsetMsgId.

msgId

This ID is generated by the message sender when sending the message, and it is globally unique. In RocketMQ, this ID is also called “uniqId” to emphasize its uniqueness. The composition is as follows:

  • Client’s IP, supports IPV4 and IPV6
  • Process PID (2 bytes)
  • The hash code of the class loader (4 bytes)
  • The difference between the current system timestamp and the startup timestamp (4 bytes)
  • Incremental sequence (2 bytes)

offsetMsgId

The physical offset of the message in the broker, which is the offset in the commitlog file. It consists of the following two parts:

  • Broker’s IP and port number
  • Physical offset in the commitlog

Note: You can locate a specific message based on the offsetMsgId without knowing other information such as the topic of the message.

Sometimes when troubleshooting a problem, especially when the msgId is provided but cannot be queried in the message cluster, you can decode this message ID to find out the sender’s IP or the IP of the broker storing the message.

The msgId can be obtained the IP from the MessageClientIDSetter’s getIPStrFromID method, and the offsetMsgId can be decoded using the MessageDecoder’s decodeMessageId method.

Summary #

This article mainly explains the advantages, disadvantages, and usage scenarios of three message sending methods, as well as the custom load balancing mechanism, and the usage of Key and Tag. It also provides specific scenarios, solutions, and example programs.