02 Rocket Mq Core Concepts Primer

02 RocketMQ Core Concepts - Primer #

Before diving into studying RocketMQ, I believe it is necessary to clarify the core concepts of RocketMQ in order to establish a solid foundation for learning RocketMQ.

RocketMQ Deployment Architecture #

1

The main components in RocketMQ are as follows.

NameServer

NameServer cluster, the routing registration center for Topics, provides routing services for clients based on the Topic, guiding clients to send messages to Brokers. Nodes in NameServer do not communicate with each other. The routing information in the NameServer cluster adopts eventual consistency.

Broker

Message storage server, divided into two roles: Master and Slave. The deployed architecture shown in the image is a 2 Master 2 Slave architecture. In RocketMQ, the Master server handles read and write operations, while the Slave server acts as a backup. When the Master server is under pressure, the Slave server can handle read services (message consumption). Every Broker, including its Slave servers, sends heartbeat packets to NameServer every 30 seconds. The heartbeat packet contains the routing information of all Topics on the Broker.

Client

Message client, including Producer (message sender) and Consumer (message consumer). At the same time, the client will only connect to one NameServer, and only tries to connect to another NameServer when an exception occurs. The client initiates a Topic routing information query to NameServer every 30 seconds.

Friendly reminder: NameServer stores the routing information of the Topic in memory, and the place where the routing information of the Topic is persisted is in Broker, that is, ${ROCKETMQ_HOME}/store/config/topics.json.

After RocketMQ version 4.5.0, a multi-replica mechanism was introduced, which means that a replica group (m-s) can evolve into a replica group based on the Raft consensus protocol. The replica group uses the Raft protocol internally to ensure the strong consistency of the data on Broker nodes. This deployment architecture is more commonly used in the financial industry.

Message Subscription Model #

RocketMQ adopts the publish-subscribe model for message consumption.

  • Topic: A collection of messages for a specific category. The message producer sends a certain type of message to a Topic. For example, the order module sends orders to the order_topic, and when a user logs in, the login event is sent to the user_login_topic.
  • ConsumerGroup: A group of message consumers, which is a unit of consumption. When a ConsumerGroup starts, it needs to subscribe to the Topics that need to be consumed. A Topic can be subscribed by multiple ConsumerGroups, and a ConsumerGroup can also subscribe to multiple Topics. A ConsumerGroup has multiple consumers.

The terminology may sound dry and obscure, so next I will give an example to illustrate.

For example, when developing an order system, there is a subsystem: order-service-app. In this project, a ConsumerGroup called order_consumer will be created to subscribe to the order_topic. Based on distributed deployment, the deployment of order-service-app is as follows:

2

That is, order-service-app is deployed on 3 servers, and each JVM process can be seen as one of the consumers in the order_consumer ConsumerGroup.

Consumption Mode #

So how do these three consumers collaborate to consume messages in the order_topic?

RocketMQ supports two consumption modes: broadcast mode and clustering mode.

  • Broadcast mode: Every consumer in the same ConsumerGroup processes every message in the Topic. This is usually used for refreshing in-memory caches.
  • Clustering mode: All consumers in the same ConsumerGroup jointly consume messages in a Topic. This means division of labor and collaboration. Each consumer consumes a portion of the data, achieving load balancing.

Clustering mode is a very common mode that conforms to the basic idea of distributed architecture, that is, horizontal scaling. If the current consumers cannot process messages quickly and timely, the consumption capacity can be quickly improved by increasing the number of consumers, so as to handle the squeezed messages in a timely manner.

Consumption Queue Load Balancing Algorithm and Rebalance Mechanism #

In clustering mode, how are messages allocated to consumers?

For example, in the above example, there are 16 queues in the order_topic. How does a ConsumerGroup with 3 consumers allocate the queues?

In the MQ field, there is an unwritten convention: the same consumer can be assigned multiple queues at the same time, but one queue will only be assigned to one consumer at the same time.

RocketMQ provides a variety of queue load balancing algorithms, among which the two most commonly used are the average allocation algorithms.

  • AllocateMessageQueueAveragely: Average allocation.
  • AllocateMessageQueueAveragelyByCircle: Round-robin average allocation.

To illustrate the allocation rules of these two allocation algorithms, let’s assign numbers to the 16 queues, represented by q0~q15, and use c0~c2 to represent the consumers.

The queue load balancing mechanism of the AllocateMessageQueueAveragely allocation algorithm is as follows:

  • c0: q0 q1 q2 q3 q4 q5
  • c1: q6 q7 q8 q9 q10
  • c2: q11 q12 q13 q14 q15

The characteristic of this algorithm is to divide the total number by the number of consumers, and assign the remainder to the consumers in order. Therefore, c0 will be assigned one more queue, and the queues allocated are continuous.

The queue load balancing mechanism of the AllocateMessageQueueAveragelyByCircle allocation algorithm is as follows:

  • c0: q0 q3 q6 q9 q12 q15
  • c1: q1 q4 q7 q10 q13
  • c2: q2 q5 q8 q11 q14

The characteristic of this allocation algorithm is to assign one by one in turn.

Tips: If the number of queues for a topic is less than the number of consumers, some consumers will not be able to be allocated messages. In RocketMQ, the number of queues for a topic directly determines the maximum number of consumers. However, increasing the number of queues for a topic does not affect the performance of RocketMQ.

In the actual process, it is very common to expand (increase the number of queues) the topic or expand/contract the consumers. So, if a new consumer is added, which queues should the consumer consume? This involves the message consumption queue reassignment, i.e., consumer queue rebalancing mechanism.

In RocketMQ client, it will query the number of queues and consumers for the current topic every 20 seconds, and use the queue load balancing algorithm to redistribute them. Then it compares the result with the previous allocation result. If there is a change, it will proceed with queue reassignment; if there is no change, it will be ignored.

For example, the allocation algorithm shown in the figure below is adopted. Now a new consumer c3 is added. What is the distribution of queues?

3

According to the new allocation algorithm, the final distribution of queues is as follows:

  • c0: q0 q1 q2 q3
  • c1: q4 q5 q6 q7
  • c2: q8 q9 q10 q11
  • c3: q12 q13 q14 q15

The entire process above does not require intervention from the application program; it is completed by RocketMQ. The general approach is to discard the queues that were previously allocated to oneself but no longer belong to oneself, and create new pull tasks for the newly allocated queues.

Consumption Progress #

After a consumer consumes a message, it needs to record the consumption position so that it can continue processing new messages from the last consumed position when it restarts at the consumption side. In RocketMQ, the storage of message consumption positions is based on the consumption group.

In cluster mode, the consumption progress is stored on the broker side. ${ROCKETMQ_HOME}/store/config/consumerOffset.json is the specific storage file for it, and its content is shown in the screenshot below:

4

It can be seen that the key for consumption progress is topic@consumeGroup, and then there is one offset for each queue.

For the broadcast mode, the consumption progress file is stored in the user’s home directory, and the default file full path name is ${USER_HOME}/.rocketmq_offsets.

Consumption Model #

RocketMQ provides two consumption models: concurrent consumption and sequential consumption.

  • Concurrent Consumption: For the messages in a queue, each consumer internally creates a thread pool to process the messages in the queue in a multi-threaded manner, which means that messages with larger offsets may be consumed before messages with smaller offsets.
  • Sequential Consumption: In some scenarios, such as the MySQL binlog scenario, messages need to be consumed in order. In RocketMQ, a queue-based sequential consumption model is provided, which means that although each consumer in a consumer group creates multiple threads, it locks against the same queue.

Tips: In the concurrent consumption model, if message consumption fails, it will be retried 16 times with different intervals each time. In the sequential consumption model, if a message consumption fails, it will be consumed continuously until it is consumed successfully. Therefore, in the use of sequential consumption, the application program needs to differentiate system exceptions from business exceptions. If it is an exception caused by a violation of business rules, retrying multiple times will not make the consumption successful. At that time, an alarm mechanism must be used to intervene in a timely manner; otherwise, the consumption will accumulate.

Transactional Messages #

Transactional messages are not designed to solve distributed transactions but provide consistency between message sending and business persistence. The implementation principle is actually the specific application of a distributed transaction. Please take a look at the following example:

5

In the pseudocode above, storing the order in a relational database and sending the message to RocketMQ are two operations of different media. If we can ensure that either both operations succeed or both operations fail, RocketMQ introduces transactional messages to solve this problem.

Tips: The main purpose of this article is to let everyone understand the concepts of various terms. Due to the use of transactional messages, detailed instructions will be provided in subsequent articles in this series.

Scheduled Messages #

The open-source version of RocketMQ currently does not support arbitrarily precise scheduled messages. The so-called scheduled messages mean sending the messages to a broker, but the consumer will not consume them immediately. They will be consumed by a consumer after a specified delay.

RocketMQ currently supports specified levels of delay, and the delay levels are as follows:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Message Filtering #

Message filtering refers to the consumer being able to filter messages in a topic based on certain conditions, i.e., only consume messages in a topic that meet the filtering criteria.

The main filtering mechanisms provided by RocketMQ are tag-based filtering and message property-based filtering. Message property-based filtering supports SQL92 expressions to filter messages.

Summary #

The main purpose of this article is to introduce the basic concepts of RocketMQ, such as NameServer, Broker, topic, consumer group, consumer, queue load balancing algorithm, queue rebalancing mechanism, concurrent consumption, sequential consumption, consumption progress storage, scheduled messages, transactional messages, message filtering, etc., to lay a solid foundation for the subsequent practical series.

Starting from the next article, we will officially begin the journey of RocketMQ and start learning message sending.