10 Default Mqpush Consumer Usage Examples and Precautions

10 DefaultMQPushConsumer Usage Examples and Precautions #

The previous article provided a detailed introduction to the core attributes of DefaultMQPushConsumer and the theory of message consumption. This article will focus on common issues that may arise during usage and provide solutions.

Precautions for ConsumeFromWhere #

First, let’s take a look at the common usage of a RokcetMQ PUSH mode consumer:

1

To specify where to consume from, you need to set the consumeFromWhere(…) method. As mentioned in the previous article, RocketMQ supports three ways of consumption: from the latest message, the earliest message, and a specified timestamp. Now, let’s consider this scenario: if a consumer has been running for a period of time and needs to be stopped first due to a version release or other reasons, and then restarted after the code update, can the consumer still consume from the new message using the above three strategies? If so, all the messages sent during the release period will be lost, which is obviously unacceptable. To ensure that messages are not lost, you need to consume from the time when you last consumed.

Therefore, in actual usage, if a consumer has been running for a long time with the setting CONSUME_FROM_FIRST_OFFSET, and the current version’s business logic undergoes significant refactoring, and the business wants to consume from the latest message, you might think that you can achieve this by using the following code, but it won’t work:

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

The above practice is incorrect. To achieve the business goal, you need to use the reset offset provided by RocketMQ, and the command is as follows:

sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876  -g CID_CONSUMER_TEST -t TopicTest -s now

The parameters are explained as follows:

  • -n: NameServer address
  • -g: Consumer group name
  • -t: Topic name
  • -s: Timestamp, with options of ’now’, timestamp in milliseconds, or yyyy-MM-dd#HH:mm:ss:SSS

You can also reset the offset using RocketMQ Console. The operation is shown in the following image:

2

Load Balancing Algorithm Based on Multiple Data Centers #

In practice, we usually use the average allocation algorithm, AllocateMessageQueueAveragely or AllocateMessageQueueAveragelyByCircle, because these two solutions are simple to implement. Here, I would like to emphasize again that the Consistent Hash algorithm doesn’t have much advantage in terms of service load balancing and is more complex. This section mainly discusses the support of RocketMQ for multiple data centers.

In the company where the author works, multiple data centers are established in two nearby locations to avoid network failures in the gateway that would result in the unavailability of all business systems, causing serious impacts on couriers and transit centers. The network architecture adopted is as shown in the following image:

3

The two data centers can be accessed through a dedicated line, with a network delay of 1-2ms. In this scenario, only one data center has outbound traffic at a time.

In the deployment plan of a RocketMQ cluster in a multi-data-center scenario, each data center deploys one broker, so there are two brokers in total, which handle message writing and consumption together. Two consumers are also deployed in each data center.

From the perspective of consumers, if we adopt average distribution, especially using the AllocateMessageQueueAveragelyByCircle solution, there will be cross-consumption by consumers. If we can prioritize consumers in the local data center to consume messages in the same data center, we can effectively avoid cross-data-center consumption. Fortunately, the designers of RocketMQ have provided a solution for us - AllocateMachineRoomNearby.

Next, let’s introduce how to use the AllocateMachineRoomNearby queue load balancing algorithm.

First of all, since it is a multi-data-center scenario, we need to be able to identify which broker belongs to which data center for several main entities (brokers) and consumers during consumption. Therefore, we need to do the following two things.

  1. Rename brokers by adding data center information to their names. This mainly involves modifying the broker.conf configuration file, for example:

    brokerName = MachineRoom1-broker-a

That is, the names of brokers are unified according to the pattern (data center name - brokerName).

  1. Rewrite the clientId of message consumers, also using the data center name as the prefix. You can change the clientId with the following code:

    consumer.setClientIP(“MachineRoom1-” + RemotingUtil.getLocalAddress());

The default `clientIP` of the `consumer` is set to `RemotingUtil.getLocalAddress()`, which is the IP address of the local machine. The `cid` of the client is shown in the following image:

![4](../images/20200815222805880.png)

Next, let's take a look at the core properties of `AllocateMachineRoomNearby`, as shown in the following image:

![5](../images/20200815222812913.png)

Their meanings are as follows:

**1. AllocateMessageQueueStrategy allocateMessageQueueStrategy**

Internal allocation algorithm, which can be considered as a proximity allocation algorithm for machine rooms. It is actually a proxy that needs to hold an allocation algorithm internally, such as the average allocation algorithm.

**2. MachineRoomResolver machineRoomResolver**

Multi-machine room resolver, which identifies the machine room based on `brokerName` and `clientId` of the client.

The cluster for the test scenario in this article is as follows:

![6](../images/20200815222819501.png)

The test code is as follows:

public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“dw_test_consumer_6”); consumer.setNamesrvAddr(“127.0.0.1:9876”); consumer.setClientIP(“MachineRoom1-” + RemotingUtil.getLocalAddress()); // consumer.setClientIP(“MachineRoom2-” + RemotingUtil.getLocalAddress()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(“machine_topic_test”, “*”); AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely(); AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() { @Override public String brokerDeployIn(MessageQueue messageQueue) { return messageQueue.getBrokerName().split("-")[0]; } @Override public String consumerDeployIn(String clientID) { return clientID.split("-")[0]; } };

consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(averagely, machineRoomResolver));
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Throwable e) {
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
});
consumer.start();
System.out.printf("Consumer Started.%n");

}


Note: The above code needs to be packaged and run on different machines as much as possible, and the `clientIp` needs to be modified.

After running, the load status of the queues is as follows:

![7](../images/2020081522282834.png)

If the consumer located in MachineRoom2 is stopped, can the messages in Machine Room 2 continue to be consumed? Now we stop the consumer in Machine Room 2 and take a look at the queue load of the room, as shown in the figure below:

8

We found that consumers in Machine Room 1 can still consume messages from Machine Room 2. From here, we can see that the AllocateMachineRoomNearby queue load only prioritizes the same room. If there are no active consumers in a particular room, the queues in that room will still be consumed by consumers in other rooms.

In RocketMQ, there is another way to handle queue load in multi-machine rooms, which is AllocateMessageQueueByMachineRoom. It allows each consumer to specify the machine rooms it can consume from. This can be done by calling the setConsumeridcs(…) method to indicate which machine rooms a specific consumer can consume messages from. This creates a logical concept of a large machine room. To save space, this demonstration will not be repeated in this article. The usage is similar to what has been shown.

Considerations for setting consumer group thread numbers #

In RocketMQ, each consumer group starts a thread pool to achieve isolation among consumer groups. RocketMQ provides two parameters, consumeThreadMin and consumeThreadMax, to set the number of threads in the thread pool. However, since the internal queue held by the thread pool is an unbounded queue, setting consumeThreadMax to a value greater than consumeThreadMin will result in a maximum number of threads equal to consumeThreadMin. Therefore, in practice, these two values are often set to the same value to avoid any confusion. In scenarios where there are many messages, more threads will be created to improve the message processing speed.

Tip: The names of consumer group threads in RocketMQ start with ConsumeMessageThread_, as shown in the example below.

9

Considerations for batch consumption #

RocketMQ supports batch message consumption. There are two parameters related to batch consumption on the consumer side:

  • pullBatchSize: The maximum number of messages returned per batch when the client requests messages from the broker. The default value is 32.
  • consumeMessageBatchMaxSize: The maximum number of messages that can be submitted to the message consumption listener. The default value is 1.

consumeMessageBatchMaxSize #

By default, 32 messages are pulled at a time, but the business listener receives one message by default. To have a more intuitive understanding, the following example code is provided:

10

If consumeMessageBatchMaxSize is set to 10, the result will be as shown in the figure below:

11

It can be seen that this parameter is effective. consumeMessageBatchMaxSize is very suitable for batch processing, such as combining it with batch processing of a database, which can significantly improve performance.

pullBatchSize #

You may have noticed an issue. If the processing time for individual messages is fast, increasing the number of consumer group threads cannot significantly increase the message consumption TPS. By using the jstack command, it can be seen that almost all threads are in a waiting state for tasks, as shown in the following screenshot:

12

This situation indicates that the threads are “idle” and should be given more tasks. This naturally requires increasing the number of messages pulled each time. Attempt to pull 100 messages each time and consume 50 messages per batch. This can be set by the following code:

consumer.setPullBatchSize(100);
consumer.setConsumeMessageBatchMaxSize(200);

The main purpose of setting consumeMessageBatchMaxSize to a value greater than pullBatchSize is to verify each batch of messages fetched. If consumeMessageBatchMaxSize is greater than pullBatchSize, each batch of messages will be equal to pullBatchSize. If consumeMessageBatchMaxSize is smaller than pullBatchSize, the client will handle pagination and try to pass as many as consumeMessageBatchMaxSize messages at a time.

To ensure that there are enough messages, it is recommended to push a large number of messages using the producer before message fetching.

13

It can be seen that the maximum number of pulled messages per batch will not exceed 32, which indicates that the server has enough messages to be fetched.

This is because the broker also provides a protection mechanism for message fetching. There are parameters that can control the maximum number of messages returned per fetch, as shown below:

int maxTransferCountOnMessageInMemory;

If all messages can be fetched in this fetch, the maximum number of messages that can be fetched in one fetch is determined by the value of maxTransferCountOnMessageInMemory, which is set to a default of 32.

int maxTransferBytesOnMessageInMemory;

If all the messages in this message pull request can be hit, the maximum message size allowed for a single message pull in memory is 256K by default.

int maxTransferCountOnMessageInDisk

If the messages in this message pull request cannot be hit and need to be read from disk, the maximum number of messages allowed for each pull request is 8 by default.

int maxTransferBytesOnMessageInDisk

If the messages in this message pull request cannot be hit and need to be read from disk, the maximum total size of the messages allowed for each pull request is 64K by default.

Therefore, if you need to pull 100 messages in one pull request, you also need to modify the related configuration information on the broker side. It is usually recommended to only modify the memory-related settings. If you need to pull from disk, in order to include the Broker, maxTransferCountOnMessageInDisk and maxTransferBytesOnMessageInDisk should remain their default values.

If the use case is in the big data field, the recommended configuration is as follows:

maxTransferCountOnMessageInMemory=5000
maxTransferBytesOnMessageInMemory=5000*1024

If the use case is in the business field, the recommended configuration is as follows:

maxTransferCountOnMessageInMemory=2000
maxTransferBytesOnMessageInMemory=2000*1024

After modifying the related configuration on the broker side, run the above program again, and the result will be as follows:

14

Message Loss due to Inconsistent Subscription Relationships #

In RocketMQ, a consumer group can subscribe to multiple topics and multiple tags. Multiple tags are separated by ||. However, the subscription relationships of all consumers in the same consumer group must be consistent. It is not allowed for one consumer to subscribe to TAGA and another consumer in the same group to subscribe to TAGB. The incorrect usage is shown in the following diagram:

15

The key point in the above error is that the consumer group names created by the two JVM processes are both “dw_tag_test”, but one consumer group subscribes to TAGA and the other consumer group subscribes to TAGB. This will cause message loss, i.e., some messages are not consumed, as shown in the following diagram:

16

The tag of a message is TAGA, and one consumer in the dw_tag_test consumer group has subscribed to TAGA. So why does it show CONSUMED_BUT_FILTERED? This status means that the message does not meet the message filtering rule and is filtered. The principle is illustrated in the following diagram:

17

The root cause is that a queue is only assigned to one consumer at a time. Messages that do not meet the subscription filter are consumed and filtered, and the message consumption progress is moved forward, resulting in message loss.

Consumer clientId Not Unique Leads to No Consumption #

RocketMQ’s clientId generation rules are the same as those of the Producer. If there are duplicates, it will also cause problems. Please refer to the following code:

18

In this example, two consumers with the same clientId are artificially created. In actual production, it may be caused by exceptions such as Docker containers obtaining the id of the host machine or getting the process ID, resulting in the same clientId for all consumers on the host machine. This will result in the following effect:

19

Why are there only half of the queues not allocated to consumers?

This is because of the same clientId. Let’s consider the average allocation algorithm as an example. When the queue load algorithm is applied, it first queries the routing information of the Topic from the NameServer. It returns a queue count of 4 and queries the number of active consumers from the Broker, which returns 2. Then it starts to allocate. When the queue load algorithm is used for allocation, it first sorts the queues and consumer cids. The first consumer is assigned the first two queues, and the second consumer is assigned the last two queues. However, due to the same cid, both consumers consider themselves the first consumer when allocating queues. Therefore, both of them are assigned to the first 2 queues, which means the first two queues are assigned to both consumers, causing duplicate consumption of messages and some queues cannot be consumed.

Best Practice: It is recommended to customize the clientIP, preferably the client IP + timestamp, or the client IP + uuid.

Conclusion #

This article provides a detailed introduction to the queue load balancing mechanism of RocketMQ, especially demonstrating the queue load balancing mechanism in multi-data center scenarios. It also demonstrates and analyzes common usage misconceptions in RocketMQ, such as ConsumeFromWhere, thread pool size, inconsistent subscription relationships, the same consumer clientId, and batch pull, and provides solutions to these problems.