23 Message Trace ACL and Multi Replica Construction

23 Message Trace ACL and Multi-Replica Construction #

Message Tracing #

Meaning of Message Tracing #

Message tracing refers to the information about when and which machine generated a message, the time it took to send, the message size, the send status, which broker it is stored on, when and where it was stored, when it was consumed, and the consumption status. This information is used to trace the entire lifecycle of a message from its creation to its consumption.

This information is crucial for business users to troubleshoot and locate issues, as sending and consumption often involve different business departments. With message tracing, it becomes clear whether a message was sent, if it was successfully sent, and if it was consumed, reducing the cost of communication between departments.

Using Message Tracing #

1. Broker Configuration

RocketMQ’s default message tracing feature is disabled by default. It can be enabled by setting the traceTopicEnable attribute of the Broker to true. By default, message tracing is stored in the topic RMQ_SYS_TRACE_TOPIC, which can be modified using msgTraceTopicName.

Attribute Default Value
traceTopicEnable false
msgTraceTopicName RMQ_SYS_TRACE_TOPIC

2. Sender Usage

Sender Tracing API

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic){
   this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
}

Explanation: enableMsgTrace determines whether to enable message tracing, default is false; customizedTraceTopic sets the custom topic for collecting message tracing, default is RMQ_SYS_TRACE_TOPIC.

Sender Code Example

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1; i++)
            try {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID111",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }

Explanation: When creating DefaultMQProducer, set enableMsgTrace to true to enable sending message tracing.

3. Consumer Usage

Consumer Tracing API

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}

Explanation: enableMsgTrace determines whether to enable consumption tracing, default is false; customizedTraceTopic sets the custom topic for collecting message tracing, default is RMQ_SYS_TRACE_TOPIC.

Consumer Code Example

public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1", true);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
}

Explanation: When creating DefaultMQPushConsumer, set enableMsgTrace to true to enable consuming message tracing.

4. Message Tracing Result

Send and consume a message and check the message tracing effect in RocketMQ-Console.

Message Content Sent

SendResult [sendStatus=SEND_OK, msgId=A9FE1075810A18B4AAC24A40738B0000, offsetMsgId=A9FE107500002A9F0000000000002147, messageQueue=MessageQueue [topic=TopicTest, brokerName=liangyong, queueId=1], queueOffset=2]

**Message Content**

Receive New Messages: [MessageExt [brokerName=liangyong, queueId=1, storeSize=189, queueOffset=2, sysFlag=0, bornTimestamp=1600135337872, bornHost=/169.254.16.117:65532, storeTimestamp=1600135337883, storeHost=/169.254.16.117:10911, msgId=A9FE107500002A9F0000000000002147, commitLogOffset=8519, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=OrderID111, CONSUME_START_TIME=1600135337915, UNIQ_KEY=A9FE1075810A18B4AAC24A40738B0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]

**Message Trace Display**

In the RocketMQ console, you can retrieve the message content by message key or message ID, as shown below:

![img](../images/20200915100845698.jpg)

Clicking on MESSAGE TRACE DETAIL allows you to view the message trace, as shown below:

![img](../images/20200915101143460.jpg)

#### **Message Trace Principle**

**Sending Trace Principle**: Collect metric information before and after sending a message and asynchronously send it to the trace topic.

![img](../images/20200917223954432.png)

**Consuming Trace Principle**: The trace of consumed messages has two parts. One part is collected before processing the message after pulling it, and the data is asynchronously sent to the trace topic. The other part is collected after processing the message and the data is sent to the trace topic asynchronously.

![img](../images/20200918101244746.png)

#### **Trace Format Explanation**

There are three types of message traces: Pub for sending trace, SubBefore for pre-consumption trace, and SubAfter for post-consumption trace.

**Sending Trace Pub**

Name | Explanation
---|---
TraceType | Pub indicates sending trace
timeStamp | Storage time
regionId | Data center availability zone, default to DefaultRegion (currently unused)
groupName | Producer group name
topic | Topic name
msgId | Message ID generated by the client
tags | Message tag
keys | Message key
storeHost | Message storage Broker IP
bodyLength | Size of the message body
costTime | Time taken to send the message
msgType | Message type: Normal_Msg, Trans_Msg_Half, Trans_msg_Commit, Delay_Msg
offsetMsgId | Message ID generated by the Broker
isSuccess | Whether the send was successful, true for success and false for failure

**Pre-consumption Trace SubBefore**

Name | Explanation
---|---
traceType | SubBefore indicates pre-consumption trace
timeStamp | Message storage time
regionId | Data center availability zone (currently unused)
groupName | Consumer group name
requestId | Request identifier
msgId | Message ID
retryTimes | Retry times
keys | Message key

**Post-consumption Trace SubAfter**

Name | Explanation
---|---
traceType | SubAfter indicates post-consumption trace
requestId | Request identifier
msgId | Message ID
costTime | Time taken to consume the message
isSuccess | Consumption result, true for success and false for failure
keys | Message key
contextCode | Consumption status returned by the Broker: 0 for SUCCESS, 1 for TIME_OUT, 2 for EXCEPTION, 3 for RETURNNULL, 4 for FAILED

#### **Conclusion**

  * When using message trace in a production environment, one Broker in the MQ cluster can be used to collect message traces to avoid impacting the performance of the cluster.
  * The open-source version of message trace does not include the IP information of the consumer, so we cannot query which machine consumed the message.
  * The open-source version of message trace uses char character concatenation for organization and arrays for parsing, which lacks scalability and compatibility.
  * The trace feature is not enabled in the RocketMQ cluster managed by the two authors responsible for this article.

### ACL

#### **Explanation of ACL**

The Access Control List (ACL) describes the access control permissions of users or roles to resources. The ACL in RocketMQ is explained in the table below.

**Explanation of ACL in RocketMQ:**

Meaning | Explanation
---|---
User | Represented by accessKey in the plain_acl.yml configuration file
Role | Admin and other roles
Resource | Topic and consumer group
Permission | DENY indicates no permission, ANY indicates having PUB or SUB permission, PUB indicates having permission to send to the topic, SUB indicates having permission to subscribe to the consumer group

#### **Example of Using ACL**

Add aclEnable = true to the Broker configuration file. In addition, add the ${ROCKETMQ_HOME}/conf/plain_acl.yml file for ACL control.

1. Broker Configuration

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable = true
aclEnable = true

Explanation

Parameter Meaning
aclEnable By default, set to false. It needs to be set to true to enable ACL.
ROCKETMQ_HOME The MQ root directory can be specified through -Drocketmq.home.dir.
ACL fileName By default, set to /conf/plain_acl.yml. It can be specified with -Drocketmq.acl.plain.file.

2. plain_acl.yml Configuration

The ACL configuration file consists of two parts: global whitelist configuration (globalWhiteRemoteAddresses) and account configuration (accounts).

globalWhiteRemoteAddresses:

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - TopicTes1=DENY
  - TopicTest2=PUB|SUB
  groupPerms:
  - consumerTest=DENY

- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  admin: true

Explanation

Parameter Meaning
globalWhiteRemoteAddresses Global whitelist configuration. The strategy is as follows: Empty: Ignore the whitelist and continue with the following validation. Full match mode: Allow all and do not perform further validation. E.g.: * or .. . or _: : : : : : : : Multiple IP mode: Allow whitelisted IPs within the specified range. E.g.: 192.168.0.{1,2} or 192.168.1.1,192.168.1.2 or 192.168.:. or 192.168.1-10.5-50.
accessKey Unique user identifier.
secretKey Access password.
whiteRemoteAddress User-level whitelist, same format as globalWhiteRemoteAddresses.
admin Whether the user is an admin. Admins have access to all resources.
defaultTopicPerm Default topic permission. Default value is DENY.
defaultGroupPerm Default consumer group permission. Default value is DENY.
topicPerms Detailed topic permissions.
groupPerms Detailed consumer group permissions.

3. ACL Sending Example

In the above configuration file, we set TopicTes1 to have DENY permission, which means sending and consuming messages are prohibited. We set TopicTest2 to have PUB|SUB permission, which means sending and subscribing are allowed. The following example attempts to send a message to the TopicTes1 topic to observe whether it can be successful.

Sending Example (Prohibited)

public class AclSendTest {
    private static final String ACL_ACCESS_KEY = "RocketMQ";
    private static final String ACL_SECRET_KEY = "12345678";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        producer();
    }
    public static void producer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1; i++)
            try {
                {
                    Message msg = new Message("TopicTest1",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }
}

Prohibited Sending Screenshot

img

Explanation for Prohibited Sending

When user RocketMQ tries to send a message to the TopicTes1 topic, an AclException is thrown, rejecting access. If the topic in the code is changed to TopicTes2, the message can be sent successfully.

4. ACL Consumption Example

In the above configuration file, we set consumerTest to have DENY permission, which means consuming messages is prohibited. Since TopicTes2 is allowed to be sent, we will attempt to send a message to TopicTes2 and observe whether consumerTest can consume it.

Sending Example (Allowed)

public class AclSendTest {
    private static final String ACL_ACCESS_KEY = "RocketMQ";
    private static final String ACL_SECRET_KEY = "12345678";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        producer();
    }
    public static void producer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1; i++)
            try {
                {
                    Message msg = new Message("TopicTest2",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }
}

Sending Result (Allowed)

SendResult [sendStatus=SEND_OK, msgId=C0A800667FB218B4AAC2663AB66F0000, offsetMsgId=C0A8006600002A9F00000000000085EA, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=2]

Consumption Example (Prohibited)

public class AclConsumeTest {
    private static final String ACL_ACCESS_KEY = "RocketMQ";
    private static final String ACL_SECRET_KEY = "12345678";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        pushConsumer();
    }

    public static void pushConsumer() throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TopicTest2", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20180422221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                printBody(msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }

    private static void printBody(List<MessageExt> msg) {
        if (msg == null || msg.size() == 0)
            return;
        for (MessageExt m : msg) {
            if (m != null) {
                System.out.printf("msgId : %s  body : %s  \n\r", m.getMsgId(), new String(m.getBody()));
            }
        }
    }
}

Prohibition of Consumption Screenshots

img

Prohibition of Consumption Explanation

We successfully sent a message to TopicTest2, but because the consumer group consumerTest has been set to prohibit consumption, the message was not received.

ACL Command Summary #

RocketMQ provides a series of command to dynamically update the Acl configuration file, making the set permissions take effect immediately.

1. Get ACL Configuration Version

Use the clusterAclConfigVersion command to view the version information.

Parameter Description

Parameter Description
-b Broker address, to update a specific Broker
-c Cluster name, to update all Brokers in the cluster
-n Namesrv address

Command Example

$ bin/mqadmin clusterAclConfigVersion -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #Broker Addr            #AclConfigVersionNum  #AclLastUpdateTime
DefaultCluster    broker-a                x.x.x.x:10911      0                     2020-09-20 22:42:59
get cluster's plain access config version success.

2. Get Acl Permission Configuration

Use the getAccessConfigSubCommand to get the configuration information for ACL.

Parameter Description

Parameter Description
-b Broker address, to update a specific Broker
-c Cluster name, to update all Brokers in the cluster
-n Namesrv address

Command Example

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.103.*, 192.168.0.*]

accounts:
  accessKey         : RocketMQ
  secretKey         : 12345678
  whiteRemoteAddress:
  admin             : false
  defaultTopicPerm  : DENY
  defaultGroupPerm  : SUB
  topicPerms        : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
  groupPerms        : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]

  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

3. Update Global White List

Use the updateGlobalWhiteAddr command to modify the global white list globalWhiteRemoteAddresses of ACL.

Parameter Description

Parameter Description
-b Broker address, to update a specific Broker
-c Cluster name, to update all Brokers in the cluster
-n Namesrv address
-g The value of the global white list, for example: 10.10.103. ,192.168.0.

Command Example

$ bin/mqadmin updateGlobalWhiteAddr -n x.x.x.x:9876 -c DefaultCluster -g 10.10.113.*,192.168.20.*
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update global white remote addresses to x.x.x.x:10911 success.

Check Effectiveness

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]

accounts:
  accessKey         : RocketMQ
  secretKey         : 12345678
  whiteRemoteAddress:
  admin             : false
  defaultTopicPerm  : DENY
  defaultGroupPerm  : SUB
  topicPerms        : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
  groupPerms        : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]

  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

Explanation: The global white list has been updated.

4. Update User Configuration

Update the configuration of the user account using the updateAclConfig.

Parameter Description

Parameter Description
-a Specify the accessKey, which user’s configuration to change
-b Broker address, to update a specific Broker
-c Cluster name, to update all Brokers in the cluster
-n Namesrv address
-g Set the groupPerms consumer group permission, format: groupD=DENY,groupD=SUB
-i Set the defaultTopicPerm permission in the Acl file
-m Set the admin permission in the Acl file
-s Set the secretKey value in the Acl file
-t Set the topicPerms topic permission, format: topicA=DENY,topicD=SUB
-u Set the defaultGroupPerm permission in the Acl file
-w Set the whiteRemoteAddress permission in the Acl file for this user

Command Example

$ bin/mqadmin updateAclConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ -s 87654321 -t testTopicA=DENY,testTopicb=SUB
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create or update plain access config to x.x.x.x:10911 success.

Check Effectiveness

$ bin/mqadmin getAccessConfigSubCommand -n uat-mq2.ttbike.com.cn:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]

accounts:
  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

  accessKey         : RocketMQ
  secretKey         : 87654321
  whiteRemoteAddress:
  admin             : false
  defaultTopicPerm  : DENY
  defaultGroupPerm  : SUB
  topicPerms        : [testTopicA=DENY, testTopicb=SUB]
  groupPerms        : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]

Explanation: The secretKey and topicPerms permissions for the user RocketMQ have been successfully updated and take effect.

5. Delete User Configuration

Use the deleteAccessConfig command to delete the ACL configuration information of a specified user.

Parameter Description

Parameter Description
-b Broker address, to update a specific Broker
-c Cluster name, to update all Brokers in the cluster
-n Namesrv address
-a Specify the accessKey of a specific user

Command Example

$ bin/mqadmin deleteAccessConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
delete plain access config account to x.x.x.x:10911 success.

Check Effectiveness

$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.

globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]

accounts:
  accessKey         : rocketmq2
  secretKey         : 12345678
  whiteRemoteAddress: 192.168.1.*
  admin             : true
  defaultTopicPerm  :
  defaultGroupPerm  :
  topicPerms        :
  groupPerms        :

Explanation: The ACL configuration for the user RocketMQ has been deleted.

ACL Principle #

1. Rule Loading

The rules are configured in the plain_acl.yml file and need to be loaded into the Broker cache to take effect. The contents of the acl file will be loaded at startup, and the rule content needs to be dynamically loaded when it changes. The process is as follows.

img

2. Permission Validation

The registered hook program is implemented through NettyServerHandler. When the Broker’s current channel receives a message from the client, the validation logic is executed. The entry point is NettyServerHandler#channelRead0() and processRequestCommand#doBeforeRpcHooks. The entry point for rule validation is the PlainAccessValidator#validate method. The validation process is shown below.

img

Conclusion of ACL #

  • It is usually not necessary to enable ACL unless the data is sensitive, in which case encryption can be used.
  • If ACL is enabled for thousands of resources (topics and consumer groups), the configuration file will be huge.
  • It is recommended to use ACL locally, such as for topics and consumer groups related to funds.
  • ACL is not enabled in the RocketMQ clusters managed by the two authors responsible for this.

Multi-Replica Setup #

Significance of Multi-Replica #

Starting from version 4.5.0, the open-source version of RocketMQ supports multi-replica (DLedger). In previous versions, only master-slave mode was supported.

Problems with master-slave mode:

  • If the master node goes down, it cannot dynamically switch to the slave node, and this group of Broker nodes cannot provide write services.
  • When the master-slave asynchronous replication mode is set and the master node goes down unexpectedly, not all data may be replicated to the slave node, resulting in the risk of data loss.

By using the Raft protocol, multi-replica can automatically elect a leader when a node goes down, improving cluster availability and ensuring data consistency.

Multi-Replica Setup #

Since DLedger is developed based on the Raft protocol, it requires a majority vote. At least 3 nodes are needed to form a Raft group.

broker-n0.conf

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16

broker-n1.conf

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16

broker-n2.conf

brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16

Start three nodes:

nohup bin/mqbroker -c conf/dledger/broker-n0.conf &
nohup bin/mqbroker -c conf/dledger/broker-n1.conf &
nohup bin/mqbroker -c conf/dledger/broker-n2.conf &

Check if the start is successful:

$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster       RaftNode00              0     x.x.x.x:30921     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444663.49 -1.0000
RaftCluster       RaftNode00              1     x.x.x.x:30911     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444663.49 -1.0000
RaftCluster       RaftNode00              3     x.x.x.x:30931     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444663.49 -1.0000

Note: BID 0 indicates Master, the other two are Follower.

Console screenshot:

img

Check message transmission:

img

Note: We have completed the process of setting up multiple replicas using the above steps.

Re-elect the leader #

We will verify the leader selection situation of DLedger by killing the Master. In the screenshot of clusterList above, we see that the Master is x.x.x.x:30921. After killing the process, let’s see what happens.

$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster       RaftNode00              0     x.x.x.x:30931     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444664.03 -1.0000
RaftCluster       RaftNode00              1     x.x.x.x:30911     V4_7_0                   0.00(0,0ms)         0.00(0,0ms)          0 444664.03 -1.0000

Note: After killing the original Master, the automatic leader selection is completed, and the new Master is x.x.x.x:30931.

Parameter Description #

The parameter description of multiple replicas in the configuration file is shown in the following table.

Parameter Description
enableDLegerCommitLog Whether to enable DLedger, default is false
dLegerGroup The Raft group to which the node belongs, recommended to be consistent with the broker
dLegerPeers The information of the cluster nodes, for example: n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId The current node ID. Taken from the beginning of the entry in legerPeers, i.e., n0 in the above example. It is worth noting that only the first character can be English, and the rest of the characters need to be configured as numbers.

Reference #

The learning materials for Raft are available in the following links for learning:

Conclusion #

When using multiple replicas, it is recommended to perform load testing to see if the TPS meets the requirements of the business. The author has found that there is a considerable difference in TPS between load testing of multiple replicas and asynchronous replication of master-slave.

If the TPS meets the requirements, it is recommended to use a multi-replica architecture, especially for payment-related scenarios.

If a default master-slave architecture already exists online, what should be done to upgrade to DLedger mode?

  1. You can refer to the smooth expansion method mentioned earlier to add the Raft group composed of multiple replicas to the original cluster.
  2. Disable the writing permission of the original master-slave architecture nodes.
  3. After the log storage time has passed, take the master-slave architecture nodes offline.