12 Discussing the Usage of Default Lite Pull Consumer in Practical Application Scenarios Again

12 Discussing the Usage of DefaultLitePullConsumer in Practical Application Scenarios Again #

After the explanation in the previous section, readers should have a comprehensive understanding of DefaultLitePullConsumer. However, you may still feel like there is more to learn because the practical example provided earlier was only at the demo level. In this article, we will enrich the usage techniques of DefaultLitePullConsumer in a batch processing scenario for pulling messages in the big data field.

Scenario Description #

Currently, the order system sends messages to the ORDER_TOPIC, and the big data team needs to import order data into their own computing platform to analyze the behavior of users and merchants.

Choosing between PUSH and PULL Modes #

For the big data team, subscribing to the ORDER_TOPIC is sufficient to complete data synchronization. So, should they use the PUSH mode or the PULL mode?

In the big data field, the PULL mode is usually used because big data computations are typically based on batch processing frameworks like Spark. These frameworks often execute batch processing tasks every 5 or 10 minutes. The more data a batch can process, the better the performance of distributed computing. In PUSH mode, although you can specify the number of messages to pull each time, the server is unlikely to have accumulated a large number of messages since PUSH mode is almost real-time. This means that each pull request will not retrieve many messages. Additionally, for a consumer JVM, it only opens one thread to pull messages from a RocketMQ cluster. In contrast, PULL mode allows each consumer to specify multiple message pulling threads (default is 20 threads). Therefore, in terms of message pulling performance, PULL mode is superior. Moreover, for this scenario, real-time requirements are not very high. Taking these factors into consideration, PULL mode is the preferred choice.

Solution Design #

The rough implementation idea is shown in the following diagram:

1

Code Implementation and Explanation #

// BigDataPullConsumer.java
package org.apache.rocketmq.example.simple.litepull;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class BigDataPullConsumer {

    private final ExecutorService executorService = new ThreadPoolExecutor(30, 30, 0L,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000), new DefaultThreadFactory("business-executer-
                                                                                        "));

    private final ExecutorService pullTaskExecutor = new ThreadPoolExecutor(1, 1, 0L,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new DefaultThreadFactory("pull-batch-"));

    private String consumerGroup;
    private String nameserverAddr;
    private String topic;
    private String filter;
    private MessageListener messageListener;
    private DefaultMQProducer rertyMQProducer;
    private PullBatchTask pullBatchTask;

    public BigDataPullConsumer(String consumerGroup, String nameserverAddr, String topic, String filter) {
        this.consumerGroup = consumerGroup;
        this.nameserverAddr = nameserverAddr;
        this.topic = topic;
        this.filter = filter;
        initRetryMQProducer();
    }

    private void initRetryMQProducer() {
        this.rertyMQProducer = new DefaultMQProducer(consumerGroup + "-retry");
        this.rertyMQProducer.setNamesrvAddr(this.nameserverAddr);
        try {
            this.rertyMQProducer.start();
        } catch (Throwable e) {
            throw new RuntimeException("Failed to start", e);
        }

    }

    public void registerMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    public void start() {
        // Not considering the case where start() is called repeatedly
        this.pullBatchTask = new PullBatchTask(consumerGroup, nameserverAddr, topic, filter, messageListener);
        pullTaskExecutor.submit(this.pullBatchTask);
    }

    public void stop() {
        while(this.pullBatchTask.isRunning()) {
            try {
                Thread.sleep(1 * 1000);
            } catch (Throwable e) {
                //ignore
            }
        }
        this.pullBatchTask.stop();
        pullTaskExecutor.shutdown();
        executorService.shutdown();
        try {
            // Wait for the retry tasks to finish
            while(executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                this.rertyMQProducer.shutdown();
                break;
            }
        } catch (Throwable e) {
            //ignore
        }
    }

    /**
     * Task listener
     */
    static interface MessageListener {
        boolean consumer(List<MessageExt> msgs);
    }

    /**
     * Scheduled task that will be invoked, for example, every 10 minutes
     */
    class PullBatchTask implements Runnable {
        DefaultLitePullConsumer consumer;
        String consumerGroup;
        String nameserverAddr;
        String topic;
        String filter;
        private volatile boolean running = true;
        private MessageListener messageListener;

        public PullBatchTask(String consumerGroup, String nameserverAddr,String topic, String filter,
                             MessageListener messageListener) {
            this.consumerGroup = consumerGroup;
            this.nameserverAddr = nameserverAddr;
            this.topic = topic;
            this.filter = filter;
            this.messageListener = messageListener;
            init();
        }

        private void init() {
            System.out.println("The init method is called");
            consumer = new DefaultLitePullConsumer(this.consumerGroup);
            consumer.setNamesrvAddr(this.nameserverAddr);
            consumer.setAutoCommit(true);
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            try {
                consumer.subscribe(topic, filter);
                consumer.start();
            } catch (Throwable e) {
                e.printStackTrace();
            }

        }

        public void stop() {
            this.running = false;
            this.consumer.shutdown();
        }

        public boolean isRunning() {
            return this.running;
        }

        @Override
        public void run() {
            this.running = true;
            long startTime = System.currentTimeMillis() - 5 * 1000;
            System.out.println("The run method is called");
            int notFoundMsgCount = 0;

            while(running) {
                try {
                    // Poll a batch of messages
                    List<MessageExt> messageExts = consumer.poll();
                    if(messageExts != null && !messageExts.isEmpty()) {
                        notFoundMsgCount = 0; // Reset to 0 if data is found
                        // Consume messages using a dedicated business thread pool
                        try {
                            executorService.submit(new ExecuteTask(messageExts, messageListener));
                        } catch (RejectedExecutionException e) { // If rejected, stop pulling messages. The business code does not have to pull messages and RocketMQ will eventually trigger flow control and not pull more messages to prevent memory overflow.
                            boolean retry = true;

以上代码是一个大数据领域的消息拉取批处理场景的示例。代码中定义了一个名为BigDataPullConsumer的类,该类封装了DefaultLitePullConsumer的使用,并通过使用线程池进行消息的消费。

具体的代码解读如下:

  • BigDataPullConsumer类中,定义了两个线程池:executorServicepullTaskExecutor

  • executorService是用于消费消息的线程池,初始时配置了30个核心线程和30个最大线程。

  • pullTaskExecutor是用于拉取消息的线程池,初始时配置了1个核心线程和1个最大线程。

  • BigDataPullConsumer类有一个PullBatchTask内部类,用于定时拉取消息。该类实现了Runnable接口,当run()方法被调用时,它会通过DefaultLitePullConsumer对象拉取一批消息,并使用executorService线程池中的线程去消费这批消息。

  • PullBatchTask类的init()方法用于初始化DefaultLitePullConsumer对象并设置相关属性,然后订阅主题并启动消费者。 while (retry) try { Thread.sleep(5 * 1000);//Simple rate limiting executorService.submit(new ExecuteTask(messageExts, messageListener)); retry = false; } catch (RejectedExecutionException e2) { retry = true; } }

                          MessageExt last = messageExts.get(messageExts.size() - 1);
                          /**
                           * If the processing time of the message exceeds the start time of the task, end the current batch processing first
                           * Before stopping the consumer, it is recommended to pause pulling so that no more messages will be pulled from the broker
                           * */
                          if(last.getStoreTimestamp() > startTime) {
                              System.out.println("The consumer.pause method will be called.");
                              consumer.pause(buildMessageQueues(last));
                          }
    
                      } else {
                          notFoundMsgCount ++;
                      }
    
                      //If no messages have been pulled for 5 consecutive times, it means that all messages in the local cache have been processed and the pull thread has stopped pulling. At this time, the message pulling of this batch can be ended and wait for the next scheduled task
                      if(notFoundMsgCount > 5) {
                          System.out.println("Failed to pull messages for more than 5 consecutive times and will exit the current task.");
                          break;
                      }
                  } catch (Throwable e) {
                      e.printStackTrace();
                  }
              }
              this.running = false;
          }
    
          /**
           * Build MessageQueues
           * @param msg
           * @return
           */
          private Set<MessageQueue> buildMessageQueues(MessageExt msg) {
              Set<MessageQueue> queues = new HashSet<>();
              MessageQueue queue = new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId());
              queues.add(queue);
              return queues;
          }
      }
    
      /**
       * Task execution
       */
      class ExecuteTask implements Runnable {
          private List<MessageExt> msgs;
          private MessageListener messageListener;
          public ExecuteTask(List<MessageExt> allMsgs, MessageListener messageListener) {
              this.msgs = allMsgs.stream().filter((MessageExt msg) -> msg.getReconsumeTimes() <= 
                                                  16).collect(Collectors.toList());
              this.messageListener = messageListener;
          }
          @Override
          public void run() {
              try {
                   this.messageListener.consumer(this.msgs);
              } catch (Throwable e) {
                  //Message consumption failed, need to trigger retry
                  //Here you can refer to the PUSH mode and send the message to the server again.
                  try {
                      for(MessageExt msg : this.msgs) {
                          msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                          rertyMQProducer.send(msg);
                      }
                  } catch (Throwable e2) {
                      e2.printStackTrace();
                      // todo Retry
                  }
              }
          }
      }
    

    }

    // DefaultThreadFactory.java package org.apache.rocketmq.example.simple.litepull;

    import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger;

    public class DefaultThreadFactory implements ThreadFactory { private AtomicInteger num = new AtomicInteger(0); private String prefix;

      public DefaultThreadFactory(String prefix) {
          this.prefix = prefix;
      }
    
      @Override
      public Thread newThread(Runnable r) {
          Thread t = new Thread(r);
          t.setName(prefix + num.incrementAndGet());
          return t;
      }
    

    }

    // LitePullMain.java package org.apache.rocketmq.example.simple.litepull;

    import org.apache.rocketmq.common.message.MessageExt;

    import java.util.List; import java.util.concurrent.*;

    public class LitePullMain { public static void main(String[] args) {

          String consumerGroup = "dw_test_consumer_group";
          String nameserverAddr = "192.168.3.166:9876";
          String topic = "dw_test";
          String filter = "*";
          /** Create a scheduled task thread pool */
          ScheduledExecutorService schedule = new ScheduledThreadPoolExecutor(1, new 
                                                   DefaultThreadFactory("main-schdule-"));
          schedule.scheduleWithFixedDelay(new Runnable() {
              @Override
              public void run() {
                  BigDataPullConsumer demoMain = new BigDataPullConsumer(consumerGroup, nameserverAddr, topic, 
                                                                         filter);
                  demoMain.registerMessageListener(new BigDataPullConsumer.MessageListener() {
                      /**
                       * Business processing
                       * @param msgs
                       * @return
                       */
                      @Override
                      public boolean consumer(List<MessageExt> msgs) {
                          System.out.println("Number of messages processed in this batch: " + msgs.size());
                          return true;
                      }
                  });
                  demoMain.start();
                  demoMain.stop();
              }
          }, 1000, 30 * 1000, TimeUnit.MILLISECONDS);
    
          try {
              CountDownLatch cdh = new CountDownLatch(1);
              cdh.await(10 , TimeUnit.MINUTES);
              schedule.shutdown();
          } catch (Throwable e) {
              //ignore
          }
    
      }
    

    }

The running result of the program is as follows:

2

It meets the expected result. We can see two scheduling tasks, and each task ends properly.

Firstly, let’s briefly introduce the responsibilities of each class:

  • MessageListener: Used to define the user’s message processing logic.
  • PullBatchTask: The core implementation of the message pulling with RocketMQ Lite Pull Consumer.
  • ExecuteTask: The task for message processing, which internally calls the business listener and executes the retry logic.
  • BigDataPullConsumer: The specific implementation class for this task.
  • LitePullMain: The main entry class for this test.

Next, let’s take a brief look at the implementation ideas of PullBatchTask and ExecuteTask, so as to understand some usage points of the message PULL mode.

The run method of PullBatchTask mainly uses a while loop. However, it is usually not used to real-time monitoring like the PUSH mode, but to batch processing. In this example, the messages sent before the current task starts will be consumed in this task, and the new messages will be consumed in another task after they are accumulated. In order to ensure that the message consumption progress is persisted before the consumer stops, it will not end immediately, but pause the message queue after there are no suitable messages pulled for a certain number of consecutive times. Then, the shutdown method of DefaultLitePullConsumer will be called after a few more consecutive times without messages being pulled, in order to ensure that the message progress is correctly submitted to the broker without duplication.

The business processing of the message consumer side introduces a business thread pool. If the business thread pool is backlogged, it will trigger the flow control of the message pulling side to avoid memory overflow.

After the business processing fails, the message needs to be retried. The message is first sent to the broker (the main purpose is to push the message consumption progress forward).