20 Rocket Mq Cluster Monitoring Ii

20 RocketMQ Cluster Monitoring II #

Introduction #

Senders and consumers usually care about resources related to topics and consumer groups. Senders focus on topics, while consumers focus on consumer groups. Administrators pay more attention to the health of the cluster. This article introduces the practical monitoring of topics and consumer groups, including the design of monitoring items and the implementation of each monitoring item.

Design of Monitoring Items #

First, let’s refer to both topic monitoring and consumer monitoring as resource monitoring. The following diagram lists the monitoring items included in topics and consumer groups.

img

Topic Monitoring #

We organize the monitoring items of topics based on the sending rate, sending latency, message size, and daily message volume. The importance of these monitoring items is explained below.

Sending Rate

By collecting real-time sending rates of topics, we can understand the traffic situation of topics. For example, in some business scenarios, it is not allowed for the sending rate of a topic to drop to zero. By collecting real-time sending rate metrics, we can prepare for future alarms.

Sending Rate Change

Sending rate change refers to the change in sending rate of a topic during a specific period of time. For example, if the sending rate increases by 2 times within 5 minutes. It is usually used for two purposes. One is to protect the cluster. A sudden increase in the sending rate of a topic may pose a security risk to the cluster. For example, if the sending rate of a topic is 5000 and it increases by 5 times to 25000 within 3 minutes, this kind of traffic poses a security risk to the cluster. Another purpose is to detect if the business is operating normally. For example, if the sending rate of a topic is 8000 and it drops to 80 within 3 minutes, we can use this kind of cliff-like decline to detect the health of the business.

Sending Latency

By collecting the distribution of sending message latency, we can understand the sending situation of the client. The latency distribution can be divided into the following intervals, in milliseconds: [0, 1), [1, 5), [5, 10), [10, 50), [50, 100), [100, 500), [500, 1000), [1000, ∞). For example, if the distribution of message latency is concentrated in the range of 500ms~1000ms, it is necessary to investigate the reasons for such long latency.

Message Size

By collecting the distribution of message sizes, we can identify clients that send large messages. Large messages with high sending rates also pose a security risk to the cluster. For example, if there are topics sending messages larger than 5K, this provides data support for targeted governance or real-time alarms in the future. The message size distribution intervals are as follows, in KB: [0, 1), [1, 5), [5, 10), [10, 50), [50, 100), [500, 1000), [1000, ∞).

Daily Message Volume

Daily message volume refers to the number of messages sent per day, forming a time curve. It can analyze the changes in the total message volume during a week or a month.

Consumer Monitoring #

Consumer Speed

By collecting real-time consumer speed metrics, we can understand the health of the consumer group. Similarly, some scenarios pay close attention to the size of the consumer speed. By collecting real-time message consumption rate, we can provide data support for alarms.

Message Backlog

Message backlog refers to the number of messages that have not been consumed at a specific moment. Message backlog = total number of sent messages - total number of consumed messages. The backlog of messages is the most important monitoring item for consumer groups, and some near real-time scenarios have strict requirements for backlogs. Therefore, it is particularly important to collect and alarm the backlog of messages.

Consumption Latency

Consumption latency is a metric collected from the client. By collecting the distribution of client consumption latency, we can detect the consumption situation of the client. By observing the consumption latency, we can determine if the client is blocked and assist colleagues in troubleshooting and locating problems.

Monitoring Development Practice #

Among the monitoring indicators of topic monitoring and consumer monitoring listed above, some indicators need to be collected from the RocketMQ cluster, such as sending rate, daily message volume, consumer speed, and backlog. Some indicators need to be reported by the client, such as sending latency, message size, and consumption latency.

img

Practical Explanation #

The task scheduling and the getMqAdmin utility class used in the following code can be seen in “RocketMQ Cluster Monitoring (1)”. Regarding the collection frequency of the task, you can choose either 1 second or 5 seconds.

For the “Topics for Metric Collection” in the above figure, considering that some companies may have thousands or tens of thousands of applications, Kafka can be used instead.

In the practical example below, we mainly focus on how to collect relevant metrics of RocketMQ and report them to Kafka. The code for storing metrics to a time series database or inserting them into a database is not provided. This part of the logic, such as sending and inserting into a database, is not complicated and can be completed by yourself.

In practice, it is recommended to provide an SDK to encapsulate sending and consumption, as well as the collection of monitoring metrics. This way, the users are unaware of it.

Collecting Topic Sending Rate #

First, obtain the topic list in the cluster, and then calculate the rate for each topic in each master. Finally, report the calculated results to the metric topic or directly write them to the time series database.

In addition, some built-in topics in MQ that do not need to be counted are filtered out during statistics. For example, the retry queue (%RETRY%) and dead letter queue (%DLQ%).

public void collectTopicTps() throws Exception {
    DefaultMQAdminExt mqAdmin = getMqAdmin();
    Set<String> topicList = mqAdmin.fetchTopicsByCLuster("demo_mq").getTopicList();
    ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo();
    Map<String/*Topic Name*/, Double/*Tps*/> topicTps = Maps.newHashMap();
// Collecting the speed of each topic on each Master
for (Map.Entry<String, BrokerData> stringBrokerDataEntry : clusterInfo.getBrokerAddrTable().entrySet()) {
    BrokerData brokerData = stringBrokerDataEntry.getValue();
    // Getting the Master node
    String brokerAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    for (String topic : topicList) {
        try {
            // Note that built-in MQ topics such as %DLQ% and %RETRY% are filtered out here
            if (topic.contains("%DLQ%") || topic.contains("%RETRY%")) {
                continue;
            }
            BrokerStatsData topicPutNums = mqAdmin.viewBrokerStatsData(brokerAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
            double topicTpsOnBroker = topicPutNums.getStatsMinute().getTps();
            if (topicTps.containsKey(topic)) {
                topicTps.put(topic, topicTps.get(topic) + topicTpsOnBroker);
            } else {
                topicTps.put(topic, topicTpsOnBroker);
            }
        } catch (MQClientException ex) {
            ex.printStackTrace();
        }
    }
}
// Report the collected topic speed, topicTps, to the topic or write it directly to the time series database
}
    // Get the Master node
    String brokerAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    ConsumeStatsList consumeStatsList = mqAdmin.fetchConsumeStatsInBroker(brokerAddr, false, 5000);
    for (Map<String, List<ConsumeStats>> consumerStats : consumeStatsList.getConsumeStatsList()) {
        for (Map.Entry<String, List<ConsumeStats>> stringListEntry : consumerStats.entrySet()) {
            String consumer = stringListEntry.getKey();
            List<ConsumeStats> consumeStats = stringListEntry.getValue();
            Double tps = 0d;
            for (ConsumeStats consumeStat : consumeStats) {
                tps += consumeStat.getConsumeTps();
            }
            if (consumerTps.containsKey(consumer)) {
                consumerTps.put(consumer, consumerTps.get(consumer) + tps);
            } else {
                consumerTps.put(consumer, tps);
            }
        }
    }
}
// Report the collected consumption rate, consumerTps, to the metrics topic or write it directly to the time-series database.

}

Collect Consumer Lag #

To calculate the backlog of a consumer group, we need to calculate the backlog of each consumer queue and sum them up.

public void collectConsumerLag() throws Exception {
    DefaultMQAdminExt mqAdmin = getMqAdmin();
    ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo();
    Map<String/*consumer name*/, Long/*consumer lag*/> consumerLags = Maps.newHashMap();
  
    // Calculate the backlog of the topic on each Master node
    for (Map.Entry<String, BrokerData> stringBrokerDataEntry : clusterInfo.getBrokerAddrTable().entrySet()) {
        BrokerData brokerData = stringBrokerDataEntry.getValue();
     
        // Get the Master node
        String brokerAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
        ConsumeStatsList consumeStatsList = mqAdmin.fetchConsumeStatsInBroker(brokerAddr, false, 5000);
        
        for (Map<String, List<ConsumeStats>> consumerStats : consumeStatsList.getConsumeStatsList()) {
            for (Map.Entry<String, List<ConsumeStats>> stringListEntry : consumerStats.entrySet()) {
                String consumer = stringListEntry.getKey();
                List<ConsumeStats> consumeStats = stringListEntry.getValue();
                Long lag = 0L;
                
                for (ConsumeStats consumeStat : consumeStats) {
                    lag += computeTotalDiff(consumeStat.getOffsetTable());
                }
                
                if (consumerLags.containsKey(consumer)) {
                    consumerLags.put(consumer, consumerLags.get(consumer) + lag);
                } else {
                    consumerLags.put(consumer, lag);
                }
            }
        }
    }
  
    // Report the collected consumer lag, consumerLags, to the metrics topic or write it directly to the time-series database.
}

public long computeTotalDiff(HashMap<MessageQueue, OffsetWrapper> offsetTable) {
    long diffTotal = 0L;
    long diff = 0L;
    
    for (Iterator it = offsetTable.entrySet().iterator(); it.hasNext(); diffTotal += diff) {
        Map.Entry<MessageQueue, OffsetWrapper> next = (Map.Entry)it.next();
        long consumerOffset = next.getValue().getConsumerOffset();
        
        if (consumerOffset > 0) {
            diff = ((OffsetWrapper)next.getValue()).getBrokerOffset() - consumerOffset;
        }
    }
  
    return diffTotal;
}

Collect Send Time and Message Size #

The DistributionMetric class provides two methods for collecting message size and send time. The time distribution range is: [0, 1), [1, 5), [5, 10), [10, 50), [50, 100), [100, 500), [500, 1000), [1000, ∞) in milliseconds. The message size distribution range is: [0, 1), [1, 5), [5, 10), [10, 50), [50, 100), [500, 1000), [1000, ∞) in KB.

public class DistributionMetric {

    private String name;

    private LongAdder lessThan1Ms = new LongAdder();
    private LongAdder lessThan5Ms = new LongAdder();
    private LongAdder lessThan10Ms = new LongAdder();
    private LongAdder lessThan50Ms = new LongAdder();
    private LongAdder lessThan100Ms = new LongAdder();
    private LongAdder lessThan500Ms = new LongAdder();
    private LongAdder lessThan1000Ms = new LongAdder();
    private LongAdder moreThan1000Ms = new LongAdder();

    private LongAdder lessThan1KB = new LongAdder();
    private LongAdder lessThan5KB = new LongAdder();
    private LongAdder lessThan10KB = new LongAdder();
    private LongAdder lessThan50KB = new LongAdder();
    private LongAdder lessThan100KB = new LongAdder();
    private LongAdder lessThan500KB = new LongAdder();
    private LongAdder lessThan1000KB = new LongAdder();
    private LongAdder moreThan1000KB = new LongAdder();

    public static DistributionMetric newDistributionMetric(String name) {
        DistributionMetric distributionMetric = new DistributionMetric();
        distributionMetric.setName(name);
        return distributionMetric;
    }

    public void markTime(long costInMs) {
        if (costInMs < 1) {
            lessThan1Ms.increment();
        } else if (costInMs < 5) {
            lessThan5Ms.increment();
        } else if (costInMs < 10) {
            lessThan10Ms.increment();
        } else if (costInMs < 50) {
            lessThan50Ms.increment();
        } else if (costInMs < 100) {
            lessThan100Ms.increment();
        } else if (costInMs < 500) {
            lessThan500Ms.increment();
        } else if (costInMs < 1000) {
            lessThan1000Ms.increment();
        } else {
            moreThan1000Ms.increment();
        }
    }

    public void markSize(long costInMs) {
        if (costInMs < 1024) {
            lessThan1KB.increment();
        } else if (costInMs < 5 * 1024) {
            lessThan5KB.increment();
        } else if (costInMs < 10 * 1024) {
            lessThan10KB.increment();
        } else if (costInMs < 50 * 1024) {
            lessThan50KB.increment();
        } else if (costInMs < 100 * 1024) {
            lessThan100KB.increment();
        } else if (costInMs < 500 * 1024) {
            lessThan500KB.increment();
        } else if (costInMs < 1024 * 1024) {
            lessThan1000KB.increment();
        } else {
            moreThan1000KB.increment();
        }
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

public class MetricInfo {
private String name;

private long lessThan1Ms;
private long lessThan5Ms;
private long lessThan10Ms;
private long lessThan50Ms;
private long lessThan100Ms;
private long lessThan500Ms;
private long lessThan1000Ms;
private long moreThan1000Ms;

private long lessThan1KB;
private long lessThan5KB;
private long lessThan10KB;
private long lessThan50KB;
private long lessThan100KB;
private long lessThan500KB;
private long lessThan1000KB;
private long moreThan1000KB;

public String getName() {
    return name;
}

public void setName(String name) {
    this.name = name;
}

public long getLessThan1Ms() {
    return lessThan1Ms;
}

public void setLessThan1Ms(long lessThan1Ms) {
    this.lessThan1Ms = lessThan1Ms;
}

public long getLessThan5Ms() {
    return lessThan5Ms;
}

public void setLessThan5Ms(long lessThan5Ms) {
    this.lessThan5Ms = lessThan5Ms;
}

public long getLessThan10Ms() {
    return lessThan10Ms;
}

public void setLessThan10Ms(long lessThan10Ms) {
    this.lessThan10Ms = lessThan10Ms;
}

public long getLessThan50Ms() {
    return lessThan50Ms;
}

public void setLessThan50Ms(long lessThan50Ms) {
    this.lessThan50Ms = lessThan50Ms;
}

public long getLessThan100Ms() {
    return lessThan100Ms;
}

public void setLessThan100Ms(long lessThan100Ms) {
    this.lessThan100Ms = lessThan100Ms;
}

public long getLessThan500Ms() {
    return lessThan500Ms;
}

public void setLessThan500Ms(long lessThan500Ms) {
    this.lessThan500Ms = lessThan500Ms;
}

public long getLessThan1000Ms() {
    return lessThan1000Ms;
}

public void setLessThan1000Ms(long lessThan1000Ms) {
    this.lessThan1000Ms = lessThan1000Ms;
}

public long getMoreThan1000Ms() {
    return moreThan1000Ms;
}

public void setMoreThan1000Ms(long moreThan1000Ms) {
    this.moreThan1000Ms = moreThan1000Ms;
}

public long getLessThan1KB() {
    return lessThan1KB;
}

public void setLessThan1KB(long lessThan1KB) {
    this.lessThan1KB = lessThan1KB;
}

public long getLessThan5KB() {
    return lessThan5KB;
}

public void setLessThan5KB(long lessThan5KB) {
    this.lessThan5KB = lessThan5KB;
}

public long getLessThan10KB() {
    return lessThan10KB;
}

public void setLessThan10KB(long lessThan10KB) {
    this.lessThan10KB = lessThan10KB;
}

public long getLessThan50KB() {
    return lessThan50KB;
}

public void setLessThan50KB(long lessThan50KB) {
    this.lessThan50KB = lessThan50KB;
}

public long getLessThan100KB() {
    return lessThan100KB;
}

public void setLessThan100KB(long lessThan100KB) {
    this.lessThan100KB = lessThan100KB;
}

public long getLessThan500KB() {
    return lessThan500KB;
}

public void setLessThan500KB(long lessThan500KB) {
    this.lessThan500KB = lessThan500KB;
}

public long getLessThan1000KB() {
    return lessThan1000KB;
}

public void setLessThan1000KB(long lessThan1000KB) {
    this.lessThan1000KB = lessThan1000KB;
}

public long getMoreThan1000KB() {
    return moreThan1000KB;
}

public void setMoreThan1000KB(long moreThan1000KB) {
    this.moreThan1000KB = moreThan1000KB;
}

    
    

The `ClientMetricCollect` class simulates the statistical information of the time and size of message sending. By scheduling the `recordMetricInfo()` method with a timed task, the collected data is reported to a specific topic and stored in a time series database. This completes the collection of sending time and message size.
    
    
    public class ClientMetricCollect {
    
        public Map<String, DefaultMQProducer> producerMap = Maps.newHashMap();
    
        private DistributionMetric distributionMetric;
    
        public DefaultMQProducer getTopicProducer(String topic) throws MQClientException {
    
            if (!producerMap.containsKey(topic)){
                DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup".concat("_").concat(topic));
                producer.setNamesrvAddr("dev-mq3.ttbike.com.cn:9876");
                producer.setVipChannelEnabled(false);
                producer.setClientIP("mq producer-client-id-1");
                try {
                    producer.start();
                    this.distributionMetric = DistributionMetric.newDistributionMetric(topic);
                    producerMap.put(topic,producer);
                } catch (MQClientException e) {
                    throw e;
                }
    
            }
            return producerMap.get(topic);
        }
    
        public void send( Message message) throws Exception {
            long begin = System.currentTimeMillis();
            SendResult sendResult = null;
            sendResult = getTopicProducer(message.getTopic()).send(message);
            SendStatus sendStatus = sendResult.getSendStatus();
            if (sendStatus.equals(SendStatus.SEND_OK)) {
                long duration = System.currentTimeMillis() - begin;
                distributionMetric.markTime(duration);
                distributionMetric.markSize(message.getBody().length);
            }
        }
    
        public void recordMetricInfo(){
            MetricInfo metricInfo = new MetricInfo();
            metricInfo.setName(distributionMetric.getName());
    
            metricInfo.setLessThan1Ms(distributionMetric.getLessThan1Ms().longValue());
            metricInfo.setLessThan5Ms(distributionMetric.getLessThan5Ms().longValue());
            metricInfo.setLessThan10Ms(distributionMetric.getLessThan10Ms().longValue());
            metricInfo.setLessThan50Ms(distributionMetric.getLessThan50Ms().longValue());
            metricInfo.setLessThan100Ms(distributionMetric.getLessThan100Ms().longValue());
            metricInfo.setLessThan500Ms(distributionMetric.getLessThan500Ms().longValue());
            metricInfo.setLessThan1000Ms(distributionMetric.getLessThan1000Ms().longValue());
            metricInfo.setMoreThan1000Ms(distributionMetric.getMoreThan1000Ms().longValue());
    
            metricInfo.setLessThan1KB(distributionMetric.getLessThan1KB().longValue());
            metricInfo.setLessThan5KB(distributionMetric.getLessThan5KB().longValue());
            metricInfo.setLessThan10KB(distributionMetric.getLessThan10KB().longValue());
            metricInfo.setLessThan50KB(distributionMetric.getLessThan50KB().longValue());
            metricInfo.setLessThan100KB(distributionMetric.getLessThan100KB().longValue());
            metricInfo.setLessThan500KB(distributionMetric.getLessThan500KB().longValue());
            metricInfo.setLessThan1000KB(distributionMetric.getLessThan1000KB().longValue());
            metricInfo.setMoreThan1000KB(distributionMetric.getMoreThan1000KB().longValue());
    
            // Report the collected distribution of sending time and message size, metricInfo to the topic or directly write it to the time series database
            // System.out.println(JSON.toJSONString(metricInfo));
        }
    
        @Test
        public void test() throws Exception {
            for(int i=0; i<100; i++){
                byte[] TEST_MSG = "helloworld".getBytes();
                Message message = new Message("melon_online_test", TEST_MSG);
                send(message);
            }
        }
    
    }
    
    

#### **Reporting time consumption**

Next, use the `markTime()` method of the DistributionMetric class that was mentioned earlier to record the time consumption. This can measure the distribution of time consumption in processing business messages. The time distribution intervals are: [0, 1), [1, 5), [5, 10), [10, 50), [50, 100), [100, 500), [500, 1000), [1000, ) in milliseconds.
    
    
    public class ConsumerMetric {
    
        private DistributionMetric distributionMetric;
    
        public static void main(String[] args) throws Exception {
            String consumerName = "demo_consumer";
            ConsumerMetric consumerMetric = new ConsumerMetric();
            consumerMetric.startConsume(consumerName);
        }
    
        public void startConsume(String consumerName) throws Exception{
            this.distributionMetric = DistributionMetric.newDistributionMetric(consumerName);
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerName);
            consumer.setNamesrvAddr("dev-mq3.ttbike.com.cn:9876");
            consumer.subscribe("melon_online_test", "*");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //wrong time format 2017_0422_221800
            consumer.setConsumeTimestamp("20181109221800");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    
                    long begin = System.currentTimeMillis();
    
                    // Process business logic
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    
                    // Record the time consumption of business logic
                    distributionMetric.markTime(System.currentTimeMillis() - begin);
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    
        public void recordMetricInfo(){
            MetricInfo metricInfo = new MetricInfo();
            metricInfo.setName(distributionMetric.getName());
    
            metricInfo.setLessThan1Ms(distributionMetric.getLessThan1Ms().longValue());
            metricInfo.setLessThan5Ms(distributionMetric.getLessThan5Ms().longValue());
            metricInfo.setLessThan10Ms(distributionMetric.getLessThan10Ms().longValue());
            metricInfo.setLessThan50Ms(distributionMetric.getLessThan50Ms().longValue());
            metricInfo.setLessThan100Ms(distributionMetric.getLessThan100Ms().longValue());
            metricInfo.setLessThan500Ms(distributionMetric.getLessThan500Ms().longValue());
            metricInfo.setLessThan1000Ms(distributionMetric.getLessThan1000Ms().longValue());
            metricInfo.setMoreThan1000Ms(distributionMetric.getMoreThan1000Ms().longValue());
    
            // Report the collected distribution of sending time and message size, metricInfo to the topic or directly write it to the time series database
            // System.out.println(JSON.toJSONString(metricInfo));
        }
    }
    
    

#### Calculating send change rate

Calculation of the send change rate depends on the functions of the time series database. The send Tps change rate is calculated as (maximum - minimum) / median. In the example below, the TPS change rate over a 5-minute period is 3%. This indicator can be calculated by scheduling it regularly and sending an alarm message when it exceeds a threshold (for example, 100%).
    
    
    > select SPREAD(value)/MEDIAN(value) from mq_topic_info where clusterName='demo_mq' and topicName='max_bonus_send_topic' and "name"='tps' and "time" > now()-5m ;
    name: mq_topic_info
    time                spread_median
    ----                -------------
    1598796048448226482 0.03338460146566541