09 Default Mqpush Consumer Core Parameters and Working Principles

09 DefaultMQPushConsumer Core Parameters and Working Principles #

The PUSH mode is an encapsulation of the PULL mode, similar to a high-level API. It is very easy to use and encapsulates most of the problems that need to be solved for message consumption, making it simple to use. However, to make good use of it, it is necessary to understand its internal working principles and the parameters supported by the PUSH mode, how these parameters work, and what precautions to take when using them.

Overview of DefaultMQPushConsumer Core Parameters and Internal Principles #

The core parameters of DefaultMQPushConsumer are as follows.

InternalLogger log

This is a final attribute of the consumer used to record some logs during the operation of the RocketMQ Consumer. The log files are stored in the default path ${user.home}/logs/rocketmqlogs/rocketmq_cliente.log.

String consumerGroup

The name of the consumer group. In RocketMQ, a consumer group is an independent isolation unit for consumers. For example, if multiple consumer groups subscribe to the same topic, their message progress (message processing progress) is independent of each other and there will be no interference between them.

MessageModel messageModel

The message consumption mode for a consumer group. RocketMQ supports two modes: clustering mode and broadcasting mode. Clustering mode means that multiple consumers within a consumer group consume messages from a topic together, and each message is consumed by only one consumer in the cluster. Broadcasting mode means that each consumer within a consumer group consumes all messages from the topic.

ConsumeFromWhere consumeFromWhere

The strategy for where to start consuming messages when a consumer is started for the first time (i.e., the consumer group’s progress cannot be queried in the progress manager). The possible values are as follows:

  • CONSUME_FROM_LAST_OFFSET: Start consuming from the latest message.
  • CONSUME_FROM_FIRST_OFFSET: Start consuming from the earliest position.
  • CONSUME_FROM_TIMESTAMP: Start consuming from the specified timestamp. The implementation approach here is to find the message with the maximum message offset whose storage time is less than or equal to the specified timestamp on the Broker server, and start consuming from this message.

String consumeTimestamp

The timestamp from which to start consuming messages. The format is yyyyMMddHHmmss, with the default value being 30 minutes ago. This parameter only takes effect when consumeFromWhere is set to CONSUME_FROM_TIMESTAMP.

AllocateMessageQueueStrategy allocateMessageQueueStrategy

The message queue load balancing algorithm. It mainly solves the problem of load balancing among message consumption queues across multiple consumers. For example, if a topic has 8 queues and a consumer group has 3 consumers, each consumer needs to consume specific queues.

RocketMQ provides the following load balancing algorithms by default:

  • AllocateMessageQueueAveragely: Average continuous allocation algorithm.
  • AllocateMessageQueueAveragelyByCircle: Average round-robin allocation algorithm.
  • AllocateMachineRoomNearby: Allocate queues near the machine room.
  • AllocateMessageQueueByConfig: Manually specify the allocation of queues. This usually requires coordination with the configuration center. When a consumer starts, an AllocateMessageQueueByConfig object is created, and then the allocation is performed based on the configuration from the configuration center and the current queue information. This method does not have automatic load balancing for queues. It cannot automatically sense queue expansion on the Broker side and requires manual configuration changes.
  • AllocateMessageQueueByMachineRoom: Consume queues in the specified machine room. This allocation algorithm first needs to call the setConsumeridcs(Set<String> consumerIdCs) method to set the machine room(s) to be consumed, and then perform queue load balancing using the average continuous allocation algorithm on the filtered messages.

AllocateMessageQueueConsistentHash

Consistent hash algorithm.

OffsetStore offsetStore

The message progress storage manager. This attribute is private and cannot be modified through the API. This parameter is mainly automatically created internally based on the consumption mode. The message consumption progress storage strategy in RocketMQ differs between broadcasting messages and clustering consumption.

  • Clustering mode: RocketMQ stores the message consumption progress on the Broker server in the ${ROCKET_HOME}/store/config/consumerOffset.json file.
  • Broadcasting mode: RocketMQ stores the message consumption progress on the machine where the consumer is located, in the ${user.home}/.rocketmq_offsets file.

To facilitate your understanding of message consumption progress, below is a screenshot of the message consumption progress file in the Broker cluster during my local testing:

1

The message consumption progress is stored using the topic@consumerGroup as the key, and its value is a map where the key is the queue sequence of the topic, and the value is the current message consumption offset.

int consumeThreadMin

The minimum number of threads in the consumer’s thread pool for each consumer group. The default value is 20. In RocketMQ consumers, a separate thread pool is created for each consumer.

int consumeThreadMax

The maximum number of threads in the consumer’s thread pool. In the current version of RocketMQ, this parameter is usually kept consistent with consumeThreadMin, as exceeding it does not make sense because the thread pool created internally by RocketMQ has an unbounded queue.

int consumeConcurrentlyMaxSpan

The threshold for the difference between the maximum and minimum offsets in the processing queue when consuming messages concurrently. If the difference exceeds this value, it triggers flow control on the consumer side. The specific approach of flow control is to no longer pull messages from the message queue of this consumer. The default value is 2000.

int pullThresholdForQueue

The maximum number of messages that a consumer allows a single queue to accumulate. If the processing queue exceeds this value, it triggers flow control on the message consumer. The default value is 1000, and it is not recommended to modify this value.

pullThresholdSizeForQueue

The default value is 100MB, which indicates the maximum size of the message body that a consumer allows a single queue to accumulate.

pullThresholdForTopic

Limits the number of messages at the topic level. By default, it is not enabled (-1). If this value is set, it will be divided by the number of queues allocated to the current consumer to obtain the message threshold for each individual message queue, thus changing pullThresholdForQueue.

pullThresholdSizeForTopic

Limits the size of the message body at the topic level. By default, it is not enabled. It ultimately changes pullThresholdSizeForQueue to achieve flow control.

long pullInterval = 0

The interval for message pulling. A value of 0 means that the message client immediately pulls the next batch of messages from the server after submitting a batch to the thread pool. It is not recommended to modify this value in PUSH mode.

int pullBatchSize = 32

The maximum number of messages returned by a single message pull request from the Broker. The default value is 32.

int consumeMessageBatchMaxSize

The maximum number of messages consumed at one time. This value represents the maximum number of messages in the list<MessageExt> msgs parameter in the following diagram.

2

int maxReconsumeTimes

The number of times a message can be retried for consumption. In concurrent consumption mode, the message will be sent to the dead letter queue after 16 retries. If it is ordered consumption, the maximum number of retries is Integer.MAX_VALUE.

long suspendCurrentQueueTimeMillis

When consuming messages in ordered consumption mode, this parameter sets the interval between each retry to improve the success rate of retries.

long consumeTimeout = 15

The timeout period for consuming messages, with a default value of 15 minutes.

Working Principle of Core Parameters #

Message Consumption Queue Load Balancing Algorithms #

In this section, we will use diagrams to explain the queue load balancing mechanisms provided by RocketMQ by default.

AllocateMessageQueueAveragely

This algorithm evenly distributes message queues to consumers in a continuous manner.

3

AllocateMessageQueueAveragelyByCircle

This algorithm evenly distributes message queues to consumers in a round-robin manner. The diagram below illustrates the allocation process.

4

AllocateMachineRoomNearby

This algorithm prioritizes assigning message queues to consumers in the same data center. The diagram below illustrates the allocation process.

5 The above background is that two brokers of an MQ cluster are deployed in two different data centers, and some consumers are deployed in each data center. The load of the queues is that the queues in the same data center are assigned to consumers in the same data center first. The allocation algorithm can be specified as other algorithms, such as the average allocation in the example. However, if a consumer in data center B crashes and there are no surviving consumers in data center B, the queues in that data center will be consumed by consumers in other data centers.

AllocateMessageQueueByConfig

This is manually specified and usually needs to be used with the configuration center. When the consumer starts, it first creates the AllocateMessageQueueByConfig object, and then based on the configuration in the configuration center and the current queue information, it performs the allocation. This method does not have automatic load balancing for queues. When the queues are expanded on the broker, it cannot be automatically perceived and needs to be manually changed.

AllocateMessageQueueByMachineRoom

This algorithm consumes queues in the specified data center. This allocation algorithm needs to call the setConsumeridcs(Set<String> consumerIdCs) method to set the data centers that need to be consumed. The messages filtered out are allocated based on the average continuous allocation algorithm. The allocation example is shown in the following figure:

6

Since the consumerIdCs is set to data center A, the queues in data center B will not receive any messages.

AllocateMessageQueueConsistentHash

Consistent hashing algorithm. To be honest, using a consistent algorithm for message queue load balancing does not provide any practical benefits. The best use of the consistent hashing algorithm is in the distributed field of Redis caching.

PUSH Model Message Pull Mechanism #

When introducing the message consumer flow control mechanism, let’s first use the following diagram to briefly introduce the RocketMQ message pull execution model.

7

The core points are as follows:

  1. After the queue load balancing mechanism, the current consumer is assigned some queues. Note that one consumer group can subscribe to multiple topics. As shown in the pullRequestQueue above, the topics “topic_test” and “topic_test1” are assigned one queue each.
  2. Pull a PullRequest object from the pullRequestQueue in turn, and initiate a pull request to the broker based on the pull offset in the object. By default, 32 messages are pulled, which can be changed using the pullBatchSize mentioned earlier. This method not only returns the message list, but also changes the next pull offset in the PullRequest object.
  3. After receiving the messages returned from the broker, they are first put into the ProcessQueue, which is implemented as a TreeMap. The key stores the message offset in the consumequeue, and the value is the specific message object.
  4. The pulled messages are then submitted to the thread pool inside the consumer group for immediate return. The PullRequest object is then placed back into the pullRequestQueue, and the next PullRequest object is taken out to continue the message pull process. From this, it can be seen that message pulling and message consumption are performed by different threads.
  5. After the message consumption thread pool in the consumer group completes processing a message, the message is removed from the ProcessQueue, and the message consumption progress is reported to the broker to start consumption from the previous progress when it restarts.

Message Consumption Progress Reporting #

Through the introduction above, readers should have a relatively intuitive understanding of the message consumption progress. Next, let’s introduce the message consumption progress reporting mechanism of the RocketMQ PUSH mode.

As can be seen from the message consumer pull model in the previous section, after the message consumer thread pool completes processing a message, it removes the message from the ProcessQueue and reports the message consumption progress to the broker. Now, let’s think about the following question:

8

For example, there are 5 messages in the processing queue, and concurrent consumption is performed by the thread pool. How does the consumer report the message consumption progress if the message with offset 3 (3:msg3) is processed before the messages with offsets of 0, 1, and 2?

Some readers may say that after msg3 is processed, the consumer will certainly report the offset of msg3 as the message consumption progress. However, upon careful consideration, it is found that if the offset of msg3 is submitted as the message consumption progress, and after the report is completed, if the consumer encounters a JVM exception such as a memory overflow and exits abnormally, and msg1 has not been processed yet, when the consumer restarts, it will continue to consume from the offset of msg3, which will cause message loss. Obviously, this approach is not feasible.

The approach adopted by RocketMQ is that after msg3 is processed, it is removed from the message processing queue, but when reporting the message consumption progress to the broker, the minimum offset in the ProceeQueue is taken as the message consumption progress, that is, the reported message consumption progress is 0.

9

That is, if the processing queue is as shown in the above figure, the submitted message progress is 2. However, this method is not perfect and may cause messages to be consumed repeatedly. For example, if there is an exception such as a memory overflow, and the consumer restarts, it will continue to consume from the offset of message 2, and message 3 will be consumed multiple times. Therefore, RocketMQ does not guarantee message duplication.

The specific process of submitting the message consumption progress is shown in the following figure:

10

From this, it can also be seen that in order to reduce network interaction between consumers and brokers and improve performance, the submitted message consumption progress is first stored in the local cache table, and then reported to the broker periodically. Similarly, the broker also stores the message consumption progress in the local cache table first and then flushes it to disk periodically.

Summary #

This article detailed DefaultMQPushConsumer’s configurable parameters and three important aspects of message consumption: message queue load balancing mechanism, message pull mechanism, and message consumption progress reporting. This lays a solid foundation for subsequent practices and problem-solving.