08 Message Consumption API and Version Migration Explanation

08 Message Consumption API and Version Migration Explanation #

Starting from this article, we will provide a detailed introduction to the message consumption API of RocketMQ.

Message Consumption Class Diagram #

The API for the consumer side of RocketMQ is shown in the following diagram:

1

Its core class diagram is shown below.

MQAdmin

This class provides some basic management functions for MQ, such as creating a topic. It is a bit strange here, as the consumer side should not need to inherit this interface. This class has been described in detail in the message sending API section and will not be repeated here.

MQConsumer

This interface defines the MQ consumer. It is defined very simply, and if necessary, some common methods of its sub-interfaces can be extracted into this interface.

Set<MessageQueue> fetchSubscribeMessageQueues(final String topic)

Fetches all read queues assigned to the topic.

MQPushConsumer

RocketMQ supports both push and pull modes, and this interface defines the pull mode.

void start()

Starts the consumer.

void shutdown()

Shuts down the consumer.

void registerMessageQueueListener(String topic, MessageQueueListener listener)

Registers the message queue change callback event, which is triggered when the consumer is assigned a new queue. The declaration is as follows:

2

The parameters are explained as follows:

  • String topic: The topic.

  • Set<MessageQueue> mqAll: All the queues of the topic.

  • Set<MessageQueue> mqDivided: The queues assigned to the current consumer.

    PullResult pull(MessageQueue mq, String subExpression, long offset,int maxNums, long timeout)

Pulls a message from the RocketMQ server. The application can call this method to pull a message. The parameters are explained as follows:

  • MessageQueue mq: The message queue.

  • String subExpression: The message filter expression, based on tag or SQL92 filter expression.

  • long offset: The offset of the message in the ConsumeQueue.

  • int maxNums: The maximum number of messages that can be pulled at once.

  • long timeout: The timeout for this pull.

    PullResult pull(MessageQueue mq, MessageSelector selector, long offset,int maxNums, long timeout)

An overload of the “pull” method, which builds a message filter object through the MessageSelector. The filter expression can be built using the “buildSql” and “buildTag” methods of the MessageSelector.

void pull(MessageQueue mq, String subExpression, long offset, int maxNums,PullCallback pullCallback)

Asynchronously pulls messages using the callback function PullCallback.

PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression,long offset, int maxNums)

Pulls messages. If the server does not have any new messages to be pulled, it blocks until a message becomes available. This method also has an overload to support asynchronous pulling.

void updateConsumeOffset(MessageQueue mq, long offset)

Updates the message consumption progress.

long fetchConsumeOffset(MessageQueue mq, boolean fromStore)

Gets the consumption progress of a specified message queue. If the parameter “fromStore” is set to true, the consumption progress is obtained from the message consumption progress storage file.

Set<MessageQueue> fetchMessageQueuesInBalance(String topic)

Gets the message queues currently being processed (assigned by the message queue load balancing mechanism).

void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)

Sends an ACK message when message consumption fails.

MQPushConsumer

The interface for RocketMQ push mode consumers.

void start()

Starts the consumer.

void shutdown()

Shuts down the consumer.

void registerMessageListener(MessageListenerConcurrently messageListener)

Registers the listener for concurrent consumption mode.

void registerMessageListener(MessageListenerOrderly messageListener)

Registers the listener for orderly consumption mode.

void subscribe(String topic, String subExpression)

Subscribes to a topic. The parameters are explained as follows:

  • String topic: The topic to subscribe to. RocketMQ allows a consumer to subscribe to multiple topics by calling this method multiple times.

  • String subExpression: The message filter expression, such as the subscribed tag or SQL92 expression.

    void subscribe(String topic, MessageSelector selector)

Subscribes to a topic. This is an overloaded method that allows subscription using MessageSelector’s buildSQL and buildTag methods.

void unsubscribe(String topic)

Cancels a subscription.

void suspend()

Suspends message consumption.

void resume()

Resumes message consumption.

DefaultMQPushConsumer

The default implementation class for RocketMQ push mode consumers.

DefaultMQPullConsumer

The default implementation class for RocketMQ pull mode consumers. RocketMQ uses PULL mode for its internal implementation. PUSH mode is actually a pseudo-push that is a encapsulation of PULL mode. The implementation principle of PUSH mode is as shown in the following diagram:

3

In PUSH mode, after pulling a batch of messages, they are submitted to the consumer’s thread pool (asynchronously), and then messages are immediately pulled from the broker, achieving a “push” effect.

From the perspective of PULL mode, message consumption mainly includes the following aspects:

  • Message pulling: Message pulling mode pulls a batch of messages from the specified message consumption queue of the broker to the consumer client through the PULL-related API. Multiple consumers need to manually allocate queues.
  • After the message consumer finishes processing the message, it needs to report the message processing queue to the broker and then continue to pull the next batch of messages.
  • If a message consumption failure occurs, it needs to inform the broker that the message consumption failed and needs to be retried. This is achieved by manually calling the sendMessageBack method.

In PUSH mode, these handling operations do not need to be considered by the user. The user only needs to tell RocketMQ consumers the event listener that needs to be called after pulling the message. The storage of message consumption progress and the unified implementation of message consumption retry are performed by RocketMQ Client.

Simple example of using the message consumption API #

From the previous text, we can obtain that the difference between the push mode API and the pull mode API in terms of usage can be simply understood as the difference between automatic transmission and manual transmission in the automotive field. In actual business scenarios, the push-style API is usually used, which is suitable for real-time monitoring. However, in the big data field, it is usually used for batch processing, which is more suitable for pull mode.

Next, let’s write several example codes to demonstrate the usage of the pull and push-related APIs.

RocketMQ Pull mode core API usage example #

Usage scenario: For example, the big data team of a company needs to analyze orders. In order to improve calculation efficiency, tasks are scheduled every 2 hours, and all messages before the task starts are processed in each batch.

Firstly, I will provide a programming example code based on the RocketMQ PULL API. This example is close to practical production. The example code is as follows:

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class PullConsumerTest {
    public static void main(String[] args) throws Exception {
        Semaphore semaphore = new Semaphore();
        Thread t = new Thread(new Task(semaphore));
        t.start();
        CountDownLatch cdh = new CountDownLatch(1);
        try {
            // Wait for 120 seconds before introducing the program
            cdh.await(120 * 1000, TimeUnit.MILLISECONDS);
        } finally {
            semaphore.running = false;
        }
    }

    /**
     * Core implementation logic for message pulling
     */
    static class Task implements Runnable {
        Semaphore s = new Semaphore();

        public Task(Semaphore s ) {
            this.s = s;
        }

        public void run() {
            try {
                DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("dw_pull_consumer");
                consumer.setNamesrvAddr("127.0.01:9876");
                consumer.start();

                Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
                Set<MessageQueue> msgQueueList = consumer.fetchSubscribeMessageQueues("TOPIC_TEST"); // Get all queues of this topic
                
                if(msgQueueList != null && !msgQueueList.isEmpty()) {
                    boolean noFoundFlag = false;
                    while(this.s.running) {
                        if(noFoundFlag) { // No messages found, pause consumption for a while
                            Thread.sleep(1000);
                        }
                        
                        for( MessageQueue q : msgQueueList ) {
                            PullResult pullResult = consumer.pull(q, "*", decivedPulloffset(offsetTable, q, consumer) , 3000);
                            System.out.println("pullStatus:" + pullResult.getPullStatus());
                            switch (pullResult.getPullStatus()) {
                                case FOUND:
                                    doSomething(pullResult.getMsgFoundList());
                                    break;
                                case NO_MATCHED_MSG:
                                    break;
                                case NO_NEW_MSG:
                                case OFFSET_ILLEGAL:
                                    noFoundFlag = true;
                                    break;
                                default:
                                    continue ;
                            }
                            // Commit the offset
                            consumer.updateConsumeOffset(q, pullResult.getNextBeginOffset());
                        }
                        
                        System.out.println("balacne queue is empty: " + consumer.fetchMessageQueuesInBalance("TOPIC_TEST").isEmpty());
                    }
                } else {
                    System.out.println("end,because queue is enmpty");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
consumer.shutdown();
System.out.println("consumer shutdown");

The above code is used to gracefully shut down the consumer. The consumer.shutdown() method is called to stop the consumer, and then a message is printed indicating that the consumer has been shut down.

catch (Throwable e) {
    e.printStackTrace();
}

This block of code is used to catch any exception that may occur during the execution of the code and print the stack trace to the console.

The doSomething method is called to handle the messages that have been pulled. In the provided example, it simply prints the number of messages that have been pulled.

public static long decivedPulloffset(Map<MessageQueue, Long> offsetTable, 
         MessageQueue queue, DefaultMQPullConsumer consumer) throws Exception {
    long offset = consumer.fetchConsumeOffset(queue, false);
    if(offset < 0 ) {
        offset = 0;
    }
    System.out.println("offset:" + offset);
    return offset;
}

The decivedPulloffset method is used to determine the pull offset for a specific consumer group and queue. It fetches the consume offset from the consumer and checks if it is less than zero. If it is less than zero, it sets the offset to zero. The offset is then printed to the console and returned.

static class Semaphore {
    public volatile boolean running = true;
}

This is a static inner class that defines a Semaphore with a running field. The running field is volatile, which means that changes to it are immediately visible to other threads. It is used to control the running state of the consumer.