05 Message Sending Core Parameters and Working Principles Detailed Explanation

05 Message Sending Core Parameters and Working Principles - Detailed Explanation #

After the explanations in the previous articles, I believe that everyone has a fairly detailed understanding of RocketMQ’s message sending and can use the API related to DefaultMQProducer very smoothly.

This article will focus on the relevant properties of DefaultMQProducer in order to gain insights into the underlying principles of RocketMQ message sending.

1

From the class diagram of DefaultMQProducer, we can see that its properties mainly come from ClientConfig and DefaultMQProducer. Therefore, the following will be divided into two parts.

Overview of DefaultMQProducer Parameters #

The parameters of DefaultMQProducer are as follows:

InternalLogger log = ClientLogger.getLog()

The implementation class of the client’s log, the log path of the RocketMQ client is ${user.home}/logs/rocketmqlogs/rocketmq_client.log. When troubleshooting problems, you can start from the log file to find error logs and provide necessary information to solve the problem. Where user.home is the user’s home directory.

producerGroup

The group to which the sender belongs. In the open-source version of RocketMQ, the group to which the sender belongs is mainly used for transactional messages, and the broker needs to check the transaction status with the message sender. You can check the client of a specific consumer group for a topic through related commands or RocketMQ-Console, as shown in the following figure:

2

defaultTopicQueueNums = 4

The default number of queues when creating a topic with the producer.

sendMsgTimeout = 3000

The default timeout for message sending, in milliseconds. It is worth noting that before RocketMQ version 4.3.0, due to the existence of the retry mechanism, the design of the timeout was for a single retry. For example, if the retry times were set to 3, the DefaultMQProducer#send method might take more than 9 seconds to return. This issue was optimized in RocketMQ version 4.3.0, and the timeout setting is the total timeout time. For example, if the timeout is set to 3 seconds and the retry times are set to 10, it may not retry 10 times. For example, when retrying for the 5th time, it has already exceeded 3 seconds. When attempting to retry for the 6th time, it will exit and throw a timeout exception, stopping the retry.

compressMsgBodyOverHowmuch

The compression threshold, defaults to 4k. If the message body exceeds 4k, it will be compressed using zip, which will increase the CPU consumption at the broker side but reduce the network overhead.

retryTimesWhenSendFailed

The number of retries for synchronous message sending. The RocketMQ client internally defaults to 2 retries when message sending fails. Please note that this parameter works in conjunction with sendMsgTimeout, please refer to the above text for details.

retryTimesWhenSendAsyncFailed

The number of retries for asynchronous message sending, defaults to 2, which means there are three chances in general.

retryAnotherBrokerWhenNotStoreOK

The original meaning of this parameter is that if the client receives a result other than SEND_OK, it should continue to retry with another broker without asking for the cause. However, according to code analysis, currently this parameter does not work as expected, it should be a bug.

int maxMessageSize

The maximum message body size allowed to be sent, defaults to 4M. The broker also has a maxMessageSize setting, so the client’s setting cannot exceed the server’s configuration. The best practice is to set the client’s configuration smaller than the server’s configuration.

sendLatencyFaultEnable

Whether to enable the failure latency avoidance mechanism. When retrying, the RocketMQ client internally avoids the last failed broker. If failure latency avoidance is enabled, messages will not be sent to the broker for a certain period of time in the future. The specific mechanism will be discussed in detail in the third part of this article. The default is false, which means it is not enabled.

notAvailableDuration

The default value of the unavailable delay array is {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}, that is, the delay time triggered for the broker is in steps, and it will choose how long in the future not to send messages to the broker based on the delay time of each message sent.

latencyMax

Sets the maximum delay level for message sending. The default value is {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}, which corresponds to the number of elements in notAvailableDuration. The specific mechanism of the broker’s delay shutdown will be discussed in detail in the third section of this article.

Overview of ClientConfig Parameters #

As the name suggests, ClientConfig is the configuration of the client. In RocketMQ, the producer and consumer (mentioned in the previous configuration) share the same configuration.

namesrvAddr

The address list of the NameServer.

clientIP

The client IP is obtained through the RemotingUtil#getLocalAddress method. Starting from version 4.7.0, it will preferentially return the last IPV4 address that is not 127.0.0.1 or does not start with 192.168. The client IP is mainly used to locate the consumer, and the clientIP will be used as part of the client id.

As shown in the following figure: Click on a consumer group in the [Consumer] list and click the [client] button to view its clients (consumers).

3

instanceName

The instance name of the client, which is a part of the client identifier CID. It will be explained in detail in the third part.

unitName

Defines a unit. The main purpose is to be a part of the client CID. If the address of the NameServer is dynamically updated through the URL, this value will be included, so that different services for obtaining the NameServer address can be distinguished.

clientCallbackExecutorThreads

The number of threads in the client’s public callback thread pool, defaults to the CPU core count. It is not recommended to change this value.

namespace

The client namespace, introduced in version 4.5.1, has been detailed in the third part.

pollNameServerInterval

The interval at which the client updates the topic from the NameServer, with a default value of 30s. The Producer and Consumer will update the routing information of the topic to the NameServer every 30s. It is not recommended to modify this value.

heartbeatBrokerInterval

The interval at which the client sends heartbeat packets to the broker, with a default value of 30s. It is not recommended to modify this value.

persistConsumerOffsetInterval

The interval at which the client persists the consumption progress of the messages, with a default value of 5s. It is not recommended to modify this value.

Working Mechanism and Usage Suggestions for Core Parameters #

High Availability Design and Failure Avoidance Mechanism for Message Sending #

Those who are familiar with RocketMQ should know that RocketMQ’s Topic routing registration center NameServer uses the eventual consistency model, and the client regularly updates the routing information of the Topic from the NameServer. In other words, the client (Producer, Consumer) is unable to perceive the broker’s crash in real time, so the message sender continues to send messages to the crashed broker, resulting in message sending exceptions. How does RocketMQ ensure the high availability of message sending?

RocketMQ Message Retry and Load Balancing #

RocketMQ introduces a retry mechanism to ensure high availability of message sending, with a default of 2 retries. The queue load balancing strategy used by the message sender in RocketMQ is round-robin by default.

In RocketMQ, the message sender is thread-safe, which means that a message sender can be used safely in a multi-threaded environment. Each message sender maintains the last selected queue for a topic and then increments it in a round-robin fashion. It uses the ThreadLocal mechanism, which means that each sending thread holds the last selected queue using the sendWhichQueue variable.

Let’s take an example of message queue load balancing. The routing information for topicA is shown in the following diagram:

4

As shown in the diagram, topicA has 4 queues created on broker-a and broker-b. When a thread uses the Producer to send a message, it obtains the next queue through the getAndIncrement() method on sendWhichQueue.

For example, before sending, if the value of sendWhichQueue is q1 on broker-a, and due to a sudden surge in traffic on broker-a, the message sending fails, it triggers a retry. According to the round-robin mechanism, the next selected queue would be q2 on broker-a. This time, the message sending is likely to fail as well. This means that even though there are 2 retries, they are all sent to the same broker, which makes the retry process unreliable and the probability of failure is high.

To solve this problem, RocketMQ introduces a fault avoidance mechanism. When retrying messages, it tries to avoid the last sent broker. Referring to the previous example, when a message is sent to the queue q1 on broker-a and returns a failure, during the retry, it will first exclude all queues in broker-a. This means that it will select the q1 queue on broker-b instead, increasing the success rate of message sending.

This avoidance mechanism is enabled by default and does not require any intervention.

However, RocketMQ provides two avoidance strategies, which are controlled by the sendLatencyFaultEnable parameter and can be adjusted by the user. It indicates whether to enable the latency avoidance mechanism, with the default value set to false.

  • Setting sendLatencyFaultEnable to false (default): The latency avoidance strategy takes effect only during retries. For example, during a failed message sending in one process, when avoiding broker-a, the next time DefaultMQProducer is called to send a message, it will still select the messages on broker-a. Only when it continues to fail and retries, broker-a will be avoided again.
  • Setting sendLatencyFaultEnable to true: Enables the latency avoidance mechanism. Once a message sending fails, broker-a is “pessimistically” regarded as unavailable for a certain period of time, and no clients will send messages to broker-a during this period. This delay is calculated based on notAvailableDuration and latencyMax. It first calculates the delay caused by the failed message sending, then determines the corresponding index in latencyMax, and returns the corresponding delay value in notAvailableDuration using the same index.

Important: If all brokers trigger the fault avoidance mechanism and the brokers are only under heavy pressure at that particular moment, wouldn’t it be worse to avoid all brokers and not have any available brokers? To address this issue, RocketMQ will degrade to the queue round-robin mechanism, ignoring the fault avoidance factor, and select the next queue in natural order as a fallback option.

Author’s Practical Experience

Based on my practical experience, the busyness of RocketMQ brokers is mostly instantaneous and usually related to the management of the system’s PageCache kernel. They recover quickly, so I do not recommend enabling the latency mechanism. Once the latency mechanism is enabled, for example, if no messages are sent to a broker for 5 minutes, it will cause a surge of messages on other brokers, resulting in some consumers not being able to consume the messages, increasing the processing pressure on other consumers, and decreasing the overall consumption performance.

Client ID and Traps #

Introducing the client ID is mainly to correctly use message sending and consumption in the following scenarios:

  • Can the same code be deployed multiple times on the same machine?
  • Can the same code send and consume messages to different NameServer clusters?

The deployment architecture for the experimental environment of this article is as follows:

5

Two RocketMQ clusters are deployed, and the topic “dw_test_01” is created on the DefaultCluster, while the topic “dw_test_02” is created on the DefaultClusterb. The current requirement is that the order-service-app should send messages to dw_test_01 and dw_test_02. The example code is as follows:

public static void main(String[] args) throws Exception {
    // Create the first producer
    DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group1");
    producer.setNamesrvAddr("192.168.3.10:9876");
    producer.start();
    
    // Create the second producer
    DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2");
    producer2.setNamesrvAddr("192.168.3.19:9876");
    producer2.start();
    
    try {
        // Send message to the first RocketMQ cluster
        SendResult result1 = producer.send(new Message("dw_test_01", "hello 192.168.3.10 nameserver".getBytes()));
        System.out.printf("%s%n", result1);
    } catch (Throwable e) {
        System.out.println("-----first------------");
        e.printStackTrace();
        System.out.println("-----first------------");
    }
    
    try {
        // Send message to the second RocketMQ cluster
        SendResult result2 = producer2.send(new Message("dw_test_02", "hello 192.168.3.19 nameserver".getBytes()));
        System.out.printf("%s%n", result2);
    } catch (Throwable e) {
        System.out.println("-----secornd------------");
        e.printStackTrace();
        System.out.println("-----secornd------------");
    }
}

Note: The above content is a direct translation of the provided Markdown file. The translation may not be perfect and may require further editing. } // Sleep for 10s to simply delay the end of this task Thread.sleep(10000); }

The running result is shown in the following figure:

6

When attempting to send a message to cluster 2, the topic does not exist, even though dw_test_02 was clearly created. If a message is sent separately to dw_test_02 in cluster 2, it is successful, so the preliminary investigation indicates that this is caused by creating two producers that connect to different clusters. Why is this happening? How can it be resolved?

1. Problem Analysis

To solve this problem, it is necessary to first understand the core components of the RocketMQ Client, as shown in the following diagram:

7

The key points mentioned above are as follows:

  • MQClientInstance: An important object in the RocketMQ client that represents a client and is uniquely identified by the clientId. This object holds many message senders (producers), with the producer group as the key; it can also create multiple consumer groups, with the consumer group as the key stored in the consumerTable.
  • Can multiple MQClientInstances be created in a single JVM process, that is, a single application? Yes, it is possible. The MQClientManager object holds an MQClientInstance container, with the clientId as the key.

Since multiple producers can be created in a single JVM, why doesn’t it work when two producers are created with different producer groups in the above example?

This is because the clientId for the above two Producers is the same, so they correspond to the same MQClientInstance object. As a result, both producers will be registered to the same MQClientInstance, with the configuration of the first producer used for both producers. In other words, the nameserver address is configured as 192.168.3.10:9876, and the topic dw_test_02 is not created in cluster 1, resulting in an error being thrown.

We can view the clientId generated by calling the buildMQClientId() method of DefaultMQProducer; the result is shown in the following figure:

8

The solution is therefore clear: the clientId for both producers needs to be changed. Let’s take a look at how clientId is generated in RocketMQ.

ClientConfig#buildMQClientId

Note: This method is defined in ClientConfig, which is the parent class of both RocketMQ producer and consumer configurations.

The clientId generation strategy is as follows:

  • clientIp: The IP address of the client.
  • instanceName: The instance name. The default value is DEFAULT, but if the instance name is DEFAULT when the getInstanceName() method of the real clientConfig is called, it will automatically be replaced with the process ID (PID).
  • unitName: The unit name. If it is not empty, it will be appended to the clientId.

With an understanding of the clientId generation rules, it is straightforward to propose a solution.

2. Solution

Considering the three components of clientId, I do not recommend changing instanceName. It is best to keep it as the default value DEFAULT, which will automatically be changed to the process PID during runtime. This way, the same code can be deployed to multiple processes on the same machine without clientId duplication. Therefore, I recommend changing unitName. It can be changed to the name of the cluster. The modified code is as follows:

public static void main(String[] args) throws Exception {
        // Code omitted
        DefaultMQProducer producer2 = new DefaultMQProducer("dw_test_producer_group2");
        producer2.setNamesrvAddr("192.168.3.19:9876");
        producer2.setUnitName("DefaultClusterb");
        producer2.start();
      // Code omitted
}

The running result is shown in the following figure:

10

Problem solved.

Summary #

This article first introduced all the configuration parameters of the message sender and their basic meanings, and then provided a detailed introduction to the fault avoidance mechanism for sending messages in RocketMQ, the generation strategy for message client IDs, and their practical use. It also provided guidance on how to avoid pitfalls.