11 Default Lite Pull Consumer Core Parameters and Practical Combat

11 DefaultLitePullConsumer Core Parameters and Practical Combat #

In the “Message Consumption API and Version Changes” section, it was mentioned that the API for DefaultMQPullConsumer (PULL mode) is too low-level and inconvenient to use. The designers of RocketMQ also noticed this issue and introduced another implementation class for PULL mode in RocketMQ version 4.6.0, called DefaultLitePullConsumer. Starting from version 4.6.0, DefaultMQPullConsumer has been marked as deprecated. Therefore, in the following, we will focus on introducing DefaultLitePullConsumer and explore how to use it to solve related problems in practice.

DefaultLitePullConsumer Class Diagram #

First, let’s take a look at the class diagram of DefaultLitePullConsumer, as shown in the following figure:

1

Explanation of Core Methods #

The core methods are explained as follows.

void start()

Start the consumer.

void shutdown()

Shutdown the consumer.

void subscribe(String topic, String subExpression)

Subscribe to a topic with a message filter expression.

void subscribe(String topic, MessageSelector selector)

Subscribe to a topic with a filter expression. The filter expression can be created using the bySql and byTag methods of MessageSelector, which is similar to the PUSH mode.

Note: Subscribing to a topic using the subscribe method supports automatic rebalancing of message consumption queues. This means that if the number of consumers or the number of queues for a topic changes, the queue information subscribed by each consumer will dynamically change.

void unsubscribe(String topic)

Unsubscribe from a topic.

void assign(Collection< MessageQueue > messageQueues)

Assign specific queues to the consumer. This consumption mode does not support automatic rebalancing of message consumption queues.

List<MessageExt> poll()

Message pulling API with a default timeout of 5 seconds.

List<MessageExt> poll(long timeout)

Message pulling API with a specified timeout. For comparison, let’s also take a look at the pull method of DefaultMQPullConsumer.

2

As we can see, the pulling style of Lite Pull Consumer has changed. It no longer requires users to manually specify the queues to pull from. Instead, it automatically pulls messages based on the subscribed topics or specific queues and the message position, which is more convenient. Personally, I think the API of DefaultLitePullConsumer is somewhat similar to the working mode of Kafka.

void seek(MessageQueue messageQueue, long offset)

Change the offset for the next message pull. This means changing the offset for the next execution of the poll method, similar to seeking or skipping messages. Note: If the offset set is greater than the current consumption offset of the queue, some messages will be skipped without consumption. Please use with caution.

void seekToBegin(MessageQueue messageQueue)

Change the offset for the next message pull to the minimum offset of the queue. This is equivalent to starting over.

void seekToEnd(MessageQueue messageQueue)

Change the offset for the next message pull to the maximum offset of the queue, skipping all current messages and starting consumption from the latest offset.

void pause(Collection< MessageQueue > messageQueues)

Pause consumption by suspending certain message consumption queues. This means that the poll method will temporarily ignore consuming messages from these queues during the next message pull. It can be used for consumer-side flow control.

void resume(Collection< MessageQueue > messageQueues)

Resume consumption.

boolean isAutoCommit()

Check if the consumption offset is automatically committed. In Lite Pull mode, you can set whether to automatically commit the offset.

void setAutoCommit(boolean autoCommit)

Set whether to automatically commit the offset.

Collection<MessageQueue> fetchMessageQueues(String topic)

Get the routing information of a topic.

Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp)

Look up the message offset closest to a given timestamp.

void commitSync()

Manually commit the message consumption offset. In cluster consumption mode, calling this method only submits the message offset to the OffsetStore in memory, but not to the Broker in real-time. The submission of offsets to the Broker is done periodically according to a scheduled task.

Long committed(MessageQueue messageQueue)

Get the committed consumption offset for a message queue (obtained from the OffsetStore, i.e., in cluster mode, it obtains the consumption progress from the message consumption progress file on the Broker).

void registerTopicMessageQueueChangeListener(String topic,TopicMessageQueueChangeListener listener)

Register a listener for topic queue change events. The client will query the changes in routing information (queue information) of the subscribed topic every 30 seconds. If changes occur, the registered event listener will be called. The TopicMessageQueueChangeListener event listener is explained as follows:

3

The event listener parameters are explained as follows.

String topic

The name of the topic.

Set<MessageQueue> messageQueues

All queue information for the current topic.

void updateNameServerAddress(String nameServerAddress)

Update the address of the NameServer.

Introduction to Core Attributes #

After understanding the core methods of DefaultLitePullConsumer and combining our current knowledge of DefaultMQPullConsumer and DefaultMQPushConsumer, I believe you now have a good grasp of how to use DefaultLitePullConsumer. So, let’s briefly look at its core attributes without going into practical applications for now.

String consumerGroup

The consumer group of the message consumer.

long brokerSuspendMaxTimeMillis

Long polling mode. If long polling mode is enabled and the Broker receives a message pull request from the client but there are no new messages at that time, the Broker can suspend the current request at the Broker side. Once new messages arrive, the suspended thread will be awakened and the Broker will pull messages and return them to the client. This value sets the maximum timeout for the Broker to wait. The default value is 20 seconds, and it is recommended to keep the default value.

long consumerTimeoutMillisWhenSuspend

The maximum timeout for message consumers to pull messages, this value must be greater than brokerSuspendMaxTimeMillis, with a default value of 30s. It is also not recommended to modify this value.

long consumerPullTimeoutMillis

The maximum timeout for establishing a network connection between the client and the broker, with a default value of 10s.

MessageModel messageModel

The message consumption model for message groups, with optional values: cluster mode, broadcast mode.

MessageQueueListener messageQueueListener

The event of message consumption queue load change.

OffsetStore offsetStore

The message consumption progress storage, same as the PUSH mode mechanism.

AllocateMessageQueueStrategy allocateMessageQueueStrategy

The message consumption queue load strategy, same as the PUSH mode mechanism.

boolean autoCommit

Set whether to commit the message consumption progress, with a default value of true.

int pullThreadNums

The number of message pulling threads, with a default value of 20. Note that this is 20 threads per consumer to pull messages from the broker. This is a very big advantage of Lite PULL mode compared to PUSH mode.

long autoCommitIntervalMillis

The interval for automatically reporting message positions, with a default value of 5s.

int pullBatchSize

The maximum number of messages to return in one message pull, with a default value of 10.

int pullThresholdForQueue

The flow control threshold for the number of messages squeezed from a single queue, with a default value of 1000. If more than 1000 messages are squeezed from a single queue locally, message pulling will stop.

int pullThresholdSizeForQueue

The flow control threshold for the total size of messages squeezed from a single queue, with a default value of 100M.

int consumeMaxSpan

The flow control threshold for the difference between the maximum and minimum message offsets in a single message processing queue, with a default value of 2000.

long pullThresholdForAll

The flow control threshold for the number of message consumption requests for all queues, with a default value of 10000.

long pollTimeoutMillis

The default timeout for one message pull is 5s.

long topicMetadataCheckIntervalMillis

Routing information update frequency, updated every 30 seconds by default. #

ConsumeFromWhere consumeFromWhere

  • Specifies the starting point for consuming messages when starting for the first time, same as PUSH mode.

String consumeTimestamp

  • If the consumeFromWhere strategy is set to based on timestamp when starting for the first time, this property is used to set the locating time, same as PUSH mode.

Example of using DefaultLitePullConsumer. #

After introducing the methods and core properties of DefaultLitePullConsumer, let’s use its API to complete the debugging of the demo program. In the next article, we will further explore the usage of DefaultLitePullConsumer in combination with application scenarios. The example code is as follows:

public class LitePullConsumerSubscribe02 {
    public static volatile boolean running = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer =
            new DefaultLitePullConsumer("dw_lite_pull_consumer_test");
        litePullConsumer.setNamesrvAddr("192.168.3.166:9876");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.setAutoCommit(true); // the default value is true
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                doConsumeSomething(messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
    private static void doConsumeSomething(List<MessageExt> messageExts) {
        // actual business processing
        System.out.printf("%s%n", messageExts);
    }
}

The above example is based on automatic submission of message consumption progress. If manual submission is used, the application program needs to call the consumer’s commitSync() method manually. At first glance, you may think that the Lite Pull mode is not much different from the PUSH mode when using automatic submission of consumption offsets. Is that really the case? Let’s compare the similarities and differences between the Lite Pull and PUSH modes.

Comparison between Lite Pull and PUSH Modes #

From the above example, we can see that the related API for Lite PULL is much simpler to use compared to the previous DefaultMQPullConsumer before version 4.6.0. It has a programming style that is very similar to the PUSH mode. Is the underlying implementation principle also the same? Obviously not, let me explain.

I don’t know if you have noticed that in Lite PULL mode, only a batch of messages is pulled through the poll() method and then submitted to the application for processing. In the automatic submission mode, the submission of offsets is not directly linked to the consumption result. That is, if a message processing fails, the consumption offset will continue to advance, lacking a message retry mechanism. To illustrate my point, here is a flow chart of the poll() method of DefaultLitePullConsumer. Pay close attention to the position of the offset submission.

4

An important feature of the automatic offset submission in Lite Pull mode is that once the poll() method returns, this batch of messages is considered consumed successfully by default. If the processing fails, it will result in message loss. Is there a way to solve this problem? Here comes the seek method, if the processing fails during the handling of the business method, the consumption offset can be reset using the seek method. That is, after capturing the message business processing, you need to construct a MessageQueue object based on the information returned by the first message (MessageExt), as well as the offset that needs to be reset.

Another difference between the Lite Pull mode and the PUSH mode is that the Lite Pull mode does not have a message consumption retry mechanism. The PUSH mode provides 16 retries by default in concurrent consumption mode, and each retry has a different interval, which greatly simplifies the programming model. In this regard, the Lite Pull mode may be slightly more complex.

In the Lite Pull mode, the message pulling threads are based on the message consumption group, and a consumer by default creates 20 pulling tasks. In terms of message pulling efficiency, it has an incomparable advantage over the PUSH mode, especially suitable for batch processing tasks in the big data field, i.e., running a pulling task at regular intervals.

Implementation Principle of Long Polling #

PULL mode is usually suitable for batch processing operations in the big data field, where real-time requirements for messages are not high, and more emphasis is placed on the efficiency of message pulling. That is, as many messages as possible need to be pulled in one go, making it convenient to process a large amount of data at once and improve the efficiency of data processing. In particular, it is hoped that even if the message pulling is not successful, at least some messages can be pulled, and too many invalid pulling requests (requests without returned messages) should be avoided.

First, let’s take a look at the following scenario:

5

That is, what actions will the broker take when there are no new messages on the broker side? I think there are basically two strategies to choose from:

  • When there are no new messages on the broker side, return immediately and the pulling result does not contain any messages.
  • The current pulling request is suspended on the broker side and the broker side polls for new messages, i.e., polling mechanism.

The second method mentioned above, has a “high-end” name - polling, and according to the polling method, it can be divided into long polling and short polling.

  • Short polling: After failing to pull messages for the first time, wait for a time interval before trying again. The default value is 1 second, which can be changed by setting shortPollingTimeMills in the broker’s configuration file, that is, polling only once. Note: Polling only once.
  • Long polling: The timeout for suspension on the broker side can be set by the PULL client. The default value is 20 seconds. If no messages are pulled on the broker side, it will poll every 5 seconds by default. When new messages are obtained on the broker side, it will wake up the pulling thread, end the polling, and attempt to pull a batch of messages and return them to the client. The sequence diagram of long polling is shown below:

6

From this, we can see that long polling has a longer waiting time compared to short polling. Short polling only polls once and the default waiting time is 1 second, while long polling blocks for 5 seconds by default but supports being awakened.

The following are the parameters related to long polling on the broker side:

  • longPollingEnable: Whether to enable long polling, the default is true.
  • shortPollingTimeMills: The waiting time for short polling, the default is 1000 (1 second).

Summary #

This article detailed the core methods and core properties of the new version of the PULL mode message consumer implementation class introduced in RocketMQ 4.6.0. It provided a simple usage example and then summarized the differences between Lite Pull and PUSH modes, as well as the application scenarios. Finally, it summarized a very important mechanism in the message pulling mode - long polling, where messages are pulled to the greatest extent possible in order to maximize message processing efforts.