19 Rocket Mq Cluster Monitoring I

19 RocketMQ Cluster Monitoring I #

Introduction #

In the RocketMQ system, there are clusters, topics, and consumer groups, with clusters consisting of NameSrv and Broker. This article mainly introduces the aspects that should be considered in the design of RocketMQ cluster monitoring and how to implement it. The next article will focus on monitoring topics and consumer groups. This article is based on the architecture pattern of 4 masters and 4 slaves with asynchronous replication.

Design of Monitoring Items #

The purpose of cluster monitoring is to record the health status of the cluster. The specific monitoring items are shown in the following figure:

img

Node Count #

If there is a 4-master-4-slave architecture in the cluster, there will be 8 Broker nodes in the cluster. The number of nodes can be seen by using the clusterList command. When the number of nodes in the cluster is less than 8, it indicates that some nodes are offline.

$ bin/mqadmin clusterList -n x.x.x.x:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name  #Broker Name         #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
demo_mq        demo_mq_a            0     10.111.89.111:10911    V4_7_0                 380.96(0,0ms)       383.16(0,0ms)          0 557.15 0.2298
demo_mq        demo_mq_a            1     10.111.89.110:10915    V4_7_0                 380.76(0,0ms)         0.00(0,0ms)          0 557.15 0.4734
demo_mq        demo_mq_b            0     10.111.89.112:10911    V4_7_0                 391.86(0,0ms)       381.66(0,0ms)          0 557.22 0.2437
demo_mq        demo_mq_b            1     10.111.89.110:10925    V4_7_0                 391.26(0,0ms)         0.00(0,0ms)          0 557.22 0.4734
demo_mq        demo_mq_c            0     10.111.26.96:10911     V4_7_0                 348.37(0,0ms)       342.77(0,0ms)          0 557.22 0.2428
demo_mq        demo_mq_c            1     10.111.26.91:10925     V4_7_0                 357.66(0,0ms)         0.00(0,0ms)          0 557.22 0.4852
demo_mq        demo_mq_d            0     10.111.26.81:10911     V4_7_0                 421.16(0,0ms)       409.86(0,0ms)          0 557.18 0.2424
demo_mq        demo_mq_d            1     10.111.26.91:10915     V4_7_0                 423.30(0,0ms)         0.00(0,0ms)          0 557.18 0.4852

Node Availability #

Checking the availability of nodes in the cluster is also important. The number of Broker nodes or the detection of processes cannot guarantee the availability of nodes. It is easy to understand that a Broker process may be running, but it may not provide normal services or be in a stuck state. We can detect this by periodically sending heartbeats to each Broker node in the cluster. Additionally, recording the response time is crucial. If the response time is too long, such as exceeding 5 seconds, it often indicates cluster fluctuations, which are manifested as client timeouts.

Availability heartbeat detection:

  • Successful sending: indicates that the node is running normally
  • Failed sending: indicates that the node is running abnormally

Response time detection:

  • Normal response: response time in the range of a few milliseconds to tens of milliseconds, which is a reasonable range
  • Excessive response time: response time greater than 1 second, or even more than 5 seconds, which is abnormal and requires investigation

Cluster Write TPS #

In the previous article, the performance of the RocketMQ cluster was discussed, with the highest tested TPS being over 120,000. Therefore, we expect the cluster to handle a range of 40,000 to 60,000 TPS, with some room for growth. Continuous monitoring of the cluster’s write TPS ensures that the cluster remains within the expected capacity. From the clusterList command, the InTPS of each node can be seen, and the sum of the InTPS of each Master node is the TPS of the cluster.

Cluster Write TPS Change Rate #

Considering that excessively high instantaneous traffic will cause flow control in the cluster, monitoring the change rate of the cluster’s write TPS becomes important. Based on the monitoring data of the cluster’s write TPS, we can use time series database functions to calculate the change rate of the cluster TPS over a certain period of time.

Monitoring Development Practice #

This section provides an architecture diagram and example code for the monitoring design, which collects RocketMQ monitoring metrics using a collection service and stores them in a time series database such as InfluxDB.

img

Preparation #

\1. Task scheduling, with an example of 10 seconds:

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
  @Override
  public Thread newThread(Runnable r) {
      return new Thread(r, "rocketMQ metrics collector");
  }
});
executorService.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
      // Method 1 to collect metrics: collectClusterNum()
      // Method 2 to collect metrics: collectMetric2()
  }
}, 60, 10, TimeUnit.SECONDS);


2. MQAdmin is used to obtain Broker TPS, and here is the initialization code:

public DefaultMQAdminExt getMqAdmin() throws MQClientException {
  DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
  defaultMQAdminExt.setNamesrvAddr("x.x.x.x:9876");
  defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
  defaultMQAdminExt.setVipChannelEnabled(false);
  defaultMQAdminExt.start();
  return defaultMQAdminExt;
}


3. Code to start the Producer:

public DefaultMQProducer getMqProducer(){
    DefaultMQProducer producer = new DefaultMQProducer("rt_collect_producer");
    producer.setNamesrvAddr("");
    producer.setVipChannelEnabled(false);
    producer.setClientIP("mq producer-client-id-1");
    try {
        producer.start();
    } catch (MQClientException e) {
        e.getErrorMessage();
    }
    return producer;
}


#### **Collecting the Number of Cluster Nodes**

The following code counts the total number of primary and secondary nodes in the cluster. The collection method is called periodically and the result is recorded in the time series data.

public void collectClusterNum() throws Exception {
  DefaultMQAdminExt mqAdmin = getMqAdmin();
  ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo();
  int brokers = 0;
  Set<Map.Entry<String, BrokerData>> entries = clusterInfo.getBrokerAddrTable().entrySet();
  for (Map.Entry<String, BrokerData> entry : entries) {
  brokers += entry.getValue().getBrokerAddrs().entrySet().size();
  }
  // Storing brokers in the time series database
  System.out.println(brokers);
}


#### **Collecting Node Availability**

The availability of each Broker in the cluster can be achieved by periodically sending messages to the specific topic of that Broker. For example, if the cluster has broker-a, broker-b, broker-c, and broker-d, then each broker has a topic named "broker-a", and the same applies to other nodes. Availability is achieved by sending heartbeats to the topic periodically.

The following two classes, ClusterRtTime and RtTime, are used to populate the data collected from the cluster and the Broker, respectively.

public class ClusterRtTime {
    private String cluster;

    private List<RtTime> times;

    private long timestamp = System.currentTimeMillis();
public long getTimestamp() {
    return timestamp;
}

public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
}

public String getCluster() {
    return cluster;
}

public void setCluster(String cluster) {
    this.cluster = cluster;
}

public List<RtTime> getTimes() {
    return times;
}

public void setTimes(List<RtTime> times) {
    this.times = times;
}
}

public class RtTime {
    private long rt;

    private String brokerName;

    private String status;

    private int result;

    public int getResult() {
        return result;
    }

    public void setResult(int result) {
        this.result = result;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
    public long getRt() {
        return rt;
    }
    public void setRt(long rt) {
        this.rt = rt;
    }
    public String getBrokerName() {
        return brokerName;
    }
    public void setBrokerName(String brokerName) {
        this.brokerName = brokerName;
    }
}

The following method is the implementation of synchronous heartbeat detection. Taking ‘broker-a’ as an example, ’time.setRt’ represents the time it takes to send each heartbeat, and ’time.setResult’ represents the result of sending each heartbeat, whether it was successful or failed.

public void collectRtTime() throws Exception {
    DefaultMQAdminExt mqAdmin = getMqAdmin();
    ClusterRtTime clusterRtTime = new ClusterRtTime();
    ClusterInfo clusterInfo = null;
    try {
        clusterInfo = mqAdmin.examineBrokerClusterInfo();
    } catch (Exception e) {
        e.printStackTrace();
        return;
    }
    clusterRtTime.setCluster("demo_mq");
    List<RtTime> times = Lists.newArrayList();
    for (Map.Entry<String, BrokerData> stringBrokerDataEntry : clusterInfo.getBrokerAddrTable().entrySet()) {
        BrokerData brokerData = stringBrokerDataEntry.getValue();
        String brokerName = brokerData.getBrokerName();
        long begin = System.currentTimeMillis();
        SendResult sendResult = null;
        RtTime time = new RtTime();
        time.setBrokerName(brokerName);
        try {
            byte[] TEST_MSG = "helloworld".getBytes();
            sendResult = getMqProducer().send(new Message(brokerName, TEST_MSG));
            long end = System.currentTimeMillis() - begin;
            SendStatus sendStatus = sendResult.getSendStatus();
            // Record the sending time consumption
            time.setRt(end);
            // Record the sending success or failure
            time.setStatus(sendStatus.name());
            time.setResult(sendStatus.ordinal());
        } catch (Exception e) {
            time.setRt(-1);
            time.setStatus("FAILED");
            time.setResult(5);
        }
        times.add(time);
    }
    clusterRtTime.setTimes(times);
    // Store clusterRtTime information into time series database
}

Collecting Cluster TPS #

Combining with the scheduled task below, the method for collecting cluster TPS is to store it in the time series database. If it is collected every 10 seconds, it can be collected 6 times in 1 minute.

public void collectClusterTps() throws Exception {
    DefaultMQAdminExt mqAdmin = getMqAdmin();
    ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo();
    double totalTps = 0d;
    for (Map.Entry<String, BrokerData> stringBrokerDataEntry : clusterInfo.getBrokerAddrTable().entrySet()) {
        BrokerData brokerData = stringBrokerDataEntry.getValue();
        // Select the Master node
        String brokerAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
        if (StringUtils.isBlank(brokerAddr)) continue;
        KVTable runtimeStatsTable = mqAdmin.fetchBrokerRuntimeStats(brokerAddr);
        HashMap<String, String> runtimeStatus = runtimeStatsTable.getTable();
        Double putTps = Math.ceil(Double.valueOf(runtimeStatus.get("putTps").split(" ")[0]));
        totalTps = totalTps + putTps;
    }
    // Store totalTps into time series database
    System.out.println(totalTps);
}

Calculating the Change Rate of Cluster TPS #

We can use time series database functions to calculate the change rate of cluster TPS. Assuming that the collected cluster TPS is written to the ‘cluster_number_info’ table in InfluxDB, the following statement represents the change rate of the cluster TPS within 5 minutes. In the example, the cluster TPS changed by 12% within 5 minutes. If the change exceeds 50%, or even 200% or 300%, it requires our attention to prevent the cluster from experiencing flow control due to high instantaneous traffic, affecting business timeouts.

> select SPREAD(value)/MEDIAN(value) from cluster_number_info where clusterName='demo_mq' and "name"='totalTps' and "time" > now()-5m ;
name: cluster_number_info
time                spread_median
----                -------------
1572941783075915928 0.12213740458015267