23 Message Trace ACL and Multi-Replica Construction #
Message Tracing #
Meaning of Message Tracing #
Message tracing refers to the information about when and which machine generated a message, the time it took to send, the message size, the send status, which broker it is stored on, when and where it was stored, when it was consumed, and the consumption status. This information is used to trace the entire lifecycle of a message from its creation to its consumption.
This information is crucial for business users to troubleshoot and locate issues, as sending and consumption often involve different business departments. With message tracing, it becomes clear whether a message was sent, if it was successfully sent, and if it was consumed, reducing the cost of communication between departments.
Using Message Tracing #
1. Broker Configuration
RocketMQ’s default message tracing feature is disabled by default. It can be enabled by setting the traceTopicEnable
attribute of the Broker to true
. By default, message tracing is stored in the topic RMQ_SYS_TRACE_TOPIC
, which can be modified using msgTraceTopicName
.
Attribute | Default Value |
---|---|
traceTopicEnable | false |
msgTraceTopicName | RMQ_SYS_TRACE_TOPIC |
2. Sender Usage
Sender Tracing API
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic){
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
}
Explanation: enableMsgTrace
determines whether to enable message tracing, default is false
; customizedTraceTopic
sets the custom topic for collecting message tracing, default is RMQ_SYS_TRACE_TOPIC
.
Sender Code Example
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", true);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try {
Message msg = new Message("TopicTest",
"TagA",
"OrderID111",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
Explanation: When creating DefaultMQProducer
, set enableMsgTrace
to true
to enable sending message tracing.
3. Consumer Usage
Consumer Tracing API
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
Explanation: enableMsgTrace
determines whether to enable consumption tracing, default is false
; customizedTraceTopic
sets the custom topic for collecting message tracing, default is RMQ_SYS_TRACE_TOPIC
.
Consumer Code Example
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1", true);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
Explanation: When creating DefaultMQPushConsumer
, set enableMsgTrace
to true
to enable consuming message tracing.
4. Message Tracing Result
Send and consume a message and check the message tracing effect in RocketMQ-Console.
Message Content Sent
SendResult [sendStatus=SEND_OK, msgId=A9FE1075810A18B4AAC24A40738B0000, offsetMsgId=A9FE107500002A9F0000000000002147, messageQueue=MessageQueue [topic=TopicTest, brokerName=liangyong, queueId=1], queueOffset=2]
**Message Content**
Receive New Messages: [MessageExt [brokerName=liangyong, queueId=1, storeSize=189, queueOffset=2, sysFlag=0, bornTimestamp=1600135337872, bornHost=/169.254.16.117:65532, storeTimestamp=1600135337883, storeHost=/169.254.16.117:10911, msgId=A9FE107500002A9F0000000000002147, commitLogOffset=8519, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, KEYS=OrderID111, CONSUME_START_TIME=1600135337915, UNIQ_KEY=A9FE1075810A18B4AAC24A40738B0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
**Message Trace Display**
In the RocketMQ console, you can retrieve the message content by message key or message ID, as shown below:
![img](../images/20200915100845698.jpg)
Clicking on MESSAGE TRACE DETAIL allows you to view the message trace, as shown below:
![img](../images/20200915101143460.jpg)
#### **Message Trace Principle**
**Sending Trace Principle**: Collect metric information before and after sending a message and asynchronously send it to the trace topic.
![img](../images/20200917223954432.png)
**Consuming Trace Principle**: The trace of consumed messages has two parts. One part is collected before processing the message after pulling it, and the data is asynchronously sent to the trace topic. The other part is collected after processing the message and the data is sent to the trace topic asynchronously.
![img](../images/20200918101244746.png)
#### **Trace Format Explanation**
There are three types of message traces: Pub for sending trace, SubBefore for pre-consumption trace, and SubAfter for post-consumption trace.
**Sending Trace Pub**
Name | Explanation
---|---
TraceType | Pub indicates sending trace
timeStamp | Storage time
regionId | Data center availability zone, default to DefaultRegion (currently unused)
groupName | Producer group name
topic | Topic name
msgId | Message ID generated by the client
tags | Message tag
keys | Message key
storeHost | Message storage Broker IP
bodyLength | Size of the message body
costTime | Time taken to send the message
msgType | Message type: Normal_Msg, Trans_Msg_Half, Trans_msg_Commit, Delay_Msg
offsetMsgId | Message ID generated by the Broker
isSuccess | Whether the send was successful, true for success and false for failure
**Pre-consumption Trace SubBefore**
Name | Explanation
---|---
traceType | SubBefore indicates pre-consumption trace
timeStamp | Message storage time
regionId | Data center availability zone (currently unused)
groupName | Consumer group name
requestId | Request identifier
msgId | Message ID
retryTimes | Retry times
keys | Message key
**Post-consumption Trace SubAfter**
Name | Explanation
---|---
traceType | SubAfter indicates post-consumption trace
requestId | Request identifier
msgId | Message ID
costTime | Time taken to consume the message
isSuccess | Consumption result, true for success and false for failure
keys | Message key
contextCode | Consumption status returned by the Broker: 0 for SUCCESS, 1 for TIME_OUT, 2 for EXCEPTION, 3 for RETURNNULL, 4 for FAILED
#### **Conclusion**
* When using message trace in a production environment, one Broker in the MQ cluster can be used to collect message traces to avoid impacting the performance of the cluster.
* The open-source version of message trace does not include the IP information of the consumer, so we cannot query which machine consumed the message.
* The open-source version of message trace uses char character concatenation for organization and arrays for parsing, which lacks scalability and compatibility.
* The trace feature is not enabled in the RocketMQ cluster managed by the two authors responsible for this article.
### ACL
#### **Explanation of ACL**
The Access Control List (ACL) describes the access control permissions of users or roles to resources. The ACL in RocketMQ is explained in the table below.
**Explanation of ACL in RocketMQ:**
Meaning | Explanation
---|---
User | Represented by accessKey in the plain_acl.yml configuration file
Role | Admin and other roles
Resource | Topic and consumer group
Permission | DENY indicates no permission, ANY indicates having PUB or SUB permission, PUB indicates having permission to send to the topic, SUB indicates having permission to subscribe to the consumer group
#### **Example of Using ACL**
Add aclEnable = true
to the Broker configuration file. In addition, add the ${ROCKETMQ_HOME}/conf/plain_acl.yml
file for ACL control.
1. Broker Configuration
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
traceTopicEnable = true
aclEnable = true
Explanation
Parameter | Meaning |
---|---|
aclEnable | By default, set to false. It needs to be set to true to enable ACL. |
ROCKETMQ_HOME | The MQ root directory can be specified through -Drocketmq.home.dir . |
ACL fileName | By default, set to /conf/plain_acl.yml . It can be specified with -Drocketmq.acl.plain.file . |
2. plain_acl.yml Configuration
The ACL configuration file consists of two parts: global whitelist configuration (globalWhiteRemoteAddresses
) and account configuration (accounts
).
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTes1=DENY
- TopicTest2=PUB|SUB
groupPerms:
- consumerTest=DENY
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
admin: true
Explanation
Parameter | Meaning |
---|---|
globalWhiteRemoteAddresses | Global whitelist configuration. The strategy is as follows: Empty: Ignore the whitelist and continue with the following validation. Full match mode: Allow all and do not perform further validation. E.g.: * or .. . or _: : : : : : : : Multiple IP mode: Allow whitelisted IPs within the specified range. E.g.: 192.168.0.{1,2} or 192.168.1.1,192.168.1.2 or 192.168.:. or 192.168.1-10.5-50. |
accessKey | Unique user identifier. |
secretKey | Access password. |
whiteRemoteAddress | User-level whitelist, same format as globalWhiteRemoteAddresses . |
admin | Whether the user is an admin. Admins have access to all resources. |
defaultTopicPerm | Default topic permission. Default value is DENY . |
defaultGroupPerm | Default consumer group permission. Default value is DENY . |
topicPerms | Detailed topic permissions. |
groupPerms | Detailed consumer group permissions. |
3. ACL Sending Example
In the above configuration file, we set TopicTes1
to have DENY
permission, which means sending and consuming messages are prohibited. We set TopicTest2
to have PUB|SUB
permission, which means sending and subscribing are allowed. The following example attempts to send a message to the TopicTes1
topic to observe whether it can be successful.
Sending Example (Prohibited)
public class AclSendTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest1",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
}
Prohibited Sending Screenshot
Explanation for Prohibited Sending
When user RocketMQ
tries to send a message to the TopicTes1
topic, an AclException
is thrown, rejecting access. If the topic in the code is changed to TopicTes2
, the message can be sent successfully.
4. ACL Consumption Example
In the above configuration file, we set consumerTest
to have DENY
permission, which means consuming messages is prohibited. Since TopicTes2
is allowed to be sent, we will attempt to send a message to TopicTes2
and observe whether consumerTest
can consume it.
Sending Example (Allowed)
public class AclSendTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException, InterruptedException {
producer();
}
public static void producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest2",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
}
Sending Result (Allowed)
SendResult [sendStatus=SEND_OK, msgId=C0A800667FB218B4AAC2663AB66F0000, offsetMsgId=C0A8006600002A9F00000000000085EA, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=2]
Consumption Example (Prohibited)
public class AclConsumeTest {
private static final String ACL_ACCESS_KEY = "RocketMQ";
private static final String ACL_SECRET_KEY = "12345678";
public static void main(String[] args) throws MQClientException, InterruptedException {
pushConsumer();
}
public static void pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest", getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest2", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
private static void printBody(List<MessageExt> msg) {
if (msg == null || msg.size() == 0)
return;
for (MessageExt m : msg) {
if (m != null) {
System.out.printf("msgId : %s body : %s \n\r", m.getMsgId(), new String(m.getBody()));
}
}
}
}
Prohibition of Consumption Screenshots
Prohibition of Consumption Explanation
We successfully sent a message to TopicTest2, but because the consumer group consumerTest has been set to prohibit consumption, the message was not received.
ACL Command Summary #
RocketMQ provides a series of command to dynamically update the Acl configuration file, making the set permissions take effect immediately.
1. Get ACL Configuration Version
Use the clusterAclConfigVersion command to view the version information.
Parameter Description
Parameter | Description |
---|---|
-b | Broker address, to update a specific Broker |
-c | Cluster name, to update all Brokers in the cluster |
-n | Namesrv address |
Command Example
$ bin/mqadmin clusterAclConfigVersion -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #Broker Addr #AclConfigVersionNum #AclLastUpdateTime
DefaultCluster broker-a x.x.x.x:10911 0 2020-09-20 22:42:59
get cluster's plain access config version success.
2. Get Acl Permission Configuration
Use the getAccessConfigSubCommand to get the configuration information for ACL.
Parameter Description
Parameter | Description |
---|---|
-b | Broker address, to update a specific Broker |
-c | Cluster name, to update all Brokers in the cluster |
-n | Namesrv address |
Command Example
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.103.*, 192.168.0.*]
accounts:
accessKey : RocketMQ
secretKey : 12345678
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
groupPerms : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
3. Update Global White List
Use the updateGlobalWhiteAddr command to modify the global white list globalWhiteRemoteAddresses of ACL.
Parameter Description
Parameter | Description |
---|---|
-b | Broker address, to update a specific Broker |
-c | Cluster name, to update all Brokers in the cluster |
-n | Namesrv address |
-g | The value of the global white list, for example: 10.10.103. ,192.168.0. |
Command Example
$ bin/mqadmin updateGlobalWhiteAddr -n x.x.x.x:9876 -c DefaultCluster -g 10.10.113.*,192.168.20.*
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
update global white remote addresses to x.x.x.x:10911 success.
Check Effectiveness
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]
accounts:
accessKey : RocketMQ
secretKey : 12345678
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [topicA=DENY, topicB=PUB|SUB, topicC=SUB]
groupPerms : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
Explanation: The global white list has been updated.
4. Update User Configuration
Update the configuration of the user account using the updateAclConfig.
Parameter Description
Parameter | Description |
---|---|
-a | Specify the accessKey, which user’s configuration to change |
-b | Broker address, to update a specific Broker |
-c | Cluster name, to update all Brokers in the cluster |
-n | Namesrv address |
-g | Set the groupPerms consumer group permission, format: groupD=DENY,groupD=SUB |
-i | Set the defaultTopicPerm permission in the Acl file |
-m | Set the admin permission in the Acl file |
-s | Set the secretKey value in the Acl file |
-t | Set the topicPerms topic permission, format: topicA=DENY,topicD=SUB |
-u | Set the defaultGroupPerm permission in the Acl file |
-w | Set the whiteRemoteAddress permission in the Acl file for this user |
Command Example
$ bin/mqadmin updateAclConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ -s 87654321 -t testTopicA=DENY,testTopicb=SUB
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create or update plain access config to x.x.x.x:10911 success.
Check Effectiveness
$ bin/mqadmin getAccessConfigSubCommand -n uat-mq2.ttbike.com.cn:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]
accounts:
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
accessKey : RocketMQ
secretKey : 87654321
whiteRemoteAddress:
admin : false
defaultTopicPerm : DENY
defaultGroupPerm : SUB
topicPerms : [testTopicA=DENY, testTopicb=SUB]
groupPerms : [groupA=DENY, groupB=PUB|SUB, groupC=SUB]
Explanation: The secretKey and topicPerms permissions for the user RocketMQ have been successfully updated and take effect.
5. Delete User Configuration
Use the deleteAccessConfig command to delete the ACL configuration information of a specified user.
Parameter Description
Parameter | Description |
---|---|
-b | Broker address, to update a specific Broker |
-c | Cluster name, to update all Brokers in the cluster |
-n | Namesrv address |
-a | Specify the accessKey of a specific user |
Command Example
$ bin/mqadmin deleteAccessConfig -n x.x.x.x:9876 -c DefaultCluster -a RocketMQ
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
delete plain access config account to x.x.x.x:10911 success.
Check Effectiveness
$ bin/mqadmin getAccessConfigSubCommand -n x.x.x.x:9876 -c DefaultCluster
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
globalWhiteRemoteAddresses: [10.10.113.*, 192.168.20.*]
accounts:
accessKey : rocketmq2
secretKey : 12345678
whiteRemoteAddress: 192.168.1.*
admin : true
defaultTopicPerm :
defaultGroupPerm :
topicPerms :
groupPerms :
Explanation: The ACL configuration for the user RocketMQ has been deleted.
ACL Principle #
1. Rule Loading
The rules are configured in the plain_acl.yml file and need to be loaded into the Broker cache to take effect. The contents of the acl file will be loaded at startup, and the rule content needs to be dynamically loaded when it changes. The process is as follows.
2. Permission Validation
The registered hook program is implemented through NettyServerHandler. When the Broker’s current channel receives a message from the client, the validation logic is executed. The entry point is NettyServerHandler#channelRead0() and processRequestCommand#doBeforeRpcHooks. The entry point for rule validation is the PlainAccessValidator#validate method. The validation process is shown below.
Conclusion of ACL #
- It is usually not necessary to enable ACL unless the data is sensitive, in which case encryption can be used.
- If ACL is enabled for thousands of resources (topics and consumer groups), the configuration file will be huge.
- It is recommended to use ACL locally, such as for topics and consumer groups related to funds.
- ACL is not enabled in the RocketMQ clusters managed by the two authors responsible for this.
Multi-Replica Setup #
Significance of Multi-Replica #
Starting from version 4.5.0, the open-source version of RocketMQ supports multi-replica (DLedger). In previous versions, only master-slave mode was supported.
Problems with master-slave mode:
- If the master node goes down, it cannot dynamically switch to the slave node, and this group of Broker nodes cannot provide write services.
- When the master-slave asynchronous replication mode is set and the master node goes down unexpectedly, not all data may be replicated to the slave node, resulting in the risk of data loss.
By using the Raft protocol, multi-replica can automatically elect a leader when a node goes down, improving cluster availability and ensuring data consistency.
Multi-Replica Setup #
Since DLedger is developed based on the Raft protocol, it requires a majority vote. At least 3 nodes are needed to form a Raft group.
broker-n0.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
broker-n1.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
broker-n2.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
### must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
Start three nodes:
nohup bin/mqbroker -c conf/dledger/broker-n0.conf &
nohup bin/mqbroker -c conf/dledger/broker-n1.conf &
nohup bin/mqbroker -c conf/dledger/broker-n2.conf &
Check if the start is successful:
$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster RaftNode00 0 x.x.x.x:30921 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000
RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000
RaftCluster RaftNode00 3 x.x.x.x:30931 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444663.49 -1.0000
Note: BID 0 indicates Master, the other two are Follower.
Console screenshot:
Check message transmission:
Note: We have completed the process of setting up multiple replicas using the above steps.
Re-elect the leader #
We will verify the leader selection situation of DLedger by killing the Master. In the screenshot of clusterList above, we see that the Master is x.x.x.x:30921. After killing the process, let’s see what happens.
$ bin/mqadmin clusterList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
RaftCluster RaftNode00 0 x.x.x.x:30931 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444664.03 -1.0000
RaftCluster RaftNode00 1 x.x.x.x:30911 V4_7_0 0.00(0,0ms) 0.00(0,0ms) 0 444664.03 -1.0000
Note: After killing the original Master, the automatic leader selection is completed, and the new Master is x.x.x.x:30931.
Parameter Description #
The parameter description of multiple replicas in the configuration file is shown in the following table.
Parameter | Description |
---|---|
enableDLegerCommitLog | Whether to enable DLedger, default is false |
dLegerGroup | The Raft group to which the node belongs, recommended to be consistent with the broker |
dLegerPeers | The information of the cluster nodes, for example: n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
dLegerSelfId | The current node ID. Taken from the beginning of the entry in legerPeers, i.e., n0 in the above example. It is worth noting that only the first character can be English, and the rest of the characters need to be configured as numbers. |
Reference #
The learning materials for Raft are available in the following links for learning:
Conclusion #
When using multiple replicas, it is recommended to perform load testing to see if the TPS meets the requirements of the business. The author has found that there is a considerable difference in TPS between load testing of multiple replicas and asynchronous replication of master-slave.
If the TPS meets the requirements, it is recommended to use a multi-replica architecture, especially for payment-related scenarios.
If a default master-slave architecture already exists online, what should be done to upgrade to DLedger mode?
- You can refer to the smooth expansion method mentioned earlier to add the Raft group composed of multiple replicas to the original cluster.
- Disable the writing permission of the original master-slave architecture nodes.
- After the log storage time has passed, take the master-slave architecture nodes offline.