15 Message Queues Verification What Are Redis's Solutions

15 Message Queues Verification What are Redis’s Solutions #

Most of today’s internet applications are designed using a distributed system architecture, and one essential software for many distributed systems is a message queue.

A message queue should be able to support fast read and write of messages between components. Redis, with its high-speed data access capabilities, is well-suited to meet the performance requirements of a message queue. However, apart from performance, there are other requirements for a message queue, so many people are concerned about a question: “Is Redis suitable for a message queue?”

In fact, behind this question, there are two underlying core questions:

  • What are the requirements for message storage and retrieval in a message queue?
  • How does Redis meet the requirements of a message queue?

In this lesson, we will discuss the characteristics of message queues and the message queue solutions provided by Redis. Only by connecting this knowledge and practical experience can we fully understand the technical implementation of a message queue based on Redis. In the future, when you need to select a message queue solution for a distributed system component, you can choose the appropriate Redis message queue solution based on the requirements of component communication volume and message communication speed.

Let’s first look at the first question: What are the requirements for message retrieval in a message queue?

Message Queue Message Storage Requirements #

First, let me explain the process of storing and retrieving messages in a message queue. In a distributed system, when two components need to communicate using a message queue, one component will pass the data to be processed as a message to the message queue. Then, this component can continue executing other operations. The remote component reads the message from the message queue and processes it locally.

To help you understand, let me explain with an example.

Let’s say component 1 needs to perform a sum calculation on collected data and write it into a database. However, the speed at which messages arrive is fast, and component 1 cannot do data collection, calculation, and database writing in a timely manner. Therefore, we can use message queue-based communication, where component 1 saves data x and y as JSON-formatted messages and sends them to the message queue. This way, it can continue receiving new data. Component 2 asynchronously reads the data from the message queue, performs the sum calculation on server 2, and then writes it into the database. The process is shown in the following diagram:

In general, when using a message queue, consumers can asynchronously read messages from producers and then process them. In this way, even if the speed at which producers send messages far exceeds the speed at which consumers process messages, the messages already sent by the producers can be cached in the message queue, avoiding blocking the producers. This is a major advantage of message queues as a means of communication between distributed components.

However, when storing messages in a message queue, three requirements must be met: message ordering, handling duplicate messages, and ensuring message reliability.

Requirement 1: Message Ordering #

Although consumers process messages asynchronously, they still need to process messages in the order in which producers send them to avoid processing later-sent messages first. For scenarios that require message ordering, if messages are processed out of order, it may lead to incorrect execution of business logic, causing losses to the business.

Let’s consider a scenario of updating product inventory.

Suppose the producer is responsible for receiving inventory update requests and the consumer is responsible for actually updating the inventory. The current inventory level is 10. The producer sends message 1 to update the inventory record of product X to 5, and message 2 to update the inventory record of product X to 3. If message 1 and message 2 cannot be ordered correctly in the message queue and message 2 is processed before message 1, it is clear that the inventory update will be incorrect. This is unacceptable for the business application.

In this situation, you may think of a solution: instead of sending the updated inventory level as a message, send the amount to deduct from the inventory as the content of the message. In this case, message 1 deducts 5 from the inventory, and message 2 deducts 2 from the inventory. If there are no inventory query requests between message 1 and message 2, even if the consumer processes message 2 first and then processes message 1, this solution can ensure that the final inventory level is correct, which is 3.

However, we still need to consider another situation: suppose the consumer receives three messages in the following order: message 1 deducts 5 from the inventory, message 2 reads the inventory level, and message 3 deducts 2 from the inventory. In this case, if the consumer processes message 3 first (deducting 2 from the inventory), the inventory level becomes 8. Then, when the consumer processes message 2, the current inventory level read is 8. This will result in an incorrect inventory query. From the perspective of the business application, message 1, 2, and 3 should be executed in order. Therefore, the inventory level queried by message 2 should be the inventory level after deducting 5, not after deducting 2. Therefore, the solution of using the deduction value as the message will cause incorrect data reading in scenarios where both read and write operations are included in the message. Moreover, this solution also faces a problem of handling duplicate messages.

Requirement 2: Handling Duplicate Messages #

When a consumer reads messages from a message queue, there may be cases where messages are retransmitted due to network congestion. In this case, the consumer may receive multiple duplicate messages. If the consumer processes duplicated messages multiple times, it may result in multiple executions of the same business logic. If the business logic is intended to modify data, this will lead to data being modified multiple times.

Using the inventory update example again, suppose the consumer receives message 1 to deduct 5 from the inventory, and then receives message 1 again. If the consumer cannot identify that these two messages are actually the same, it will perform two deductions of 5 from the inventory. As a result, the inventory will be incorrect. This is also unacceptable.

Requirement 3: Message Reliability Guarantee #

In addition, when a consumer processes messages, there may be cases where messages are not processed completely due to failures or crashes. In this case, the message queue needs to provide reliability guarantee, meaning that when the consumer restarts, it can reread the messages and process them again. Otherwise, there will be an issue of missed message processing.

Redis’s List and Streams data types can satisfy these three requirements of a message queue. Let’s first understand the implementation method of a message queue based on Lists.

List-based Message Queue Solution #

List itself stores data in a first-in-first-out order, so if we use List as a message queue to store messages, it can already satisfy the requirement for message ordering.

Specifically, the producer can use the LPUSH command to write the messages to be sent into the List in sequence, and the consumer can use the RPOP command to read the messages from the other end of the List in the order they were written and process them.

As shown in the figure below, the producer uses LPUSH to write two inventory messages, 5 and 3, indicating that the inventory should be updated to 5 and 3 respectively. The consumer then uses RPOP to read the two messages in sequence and process them accordingly.

However, there is a potential performance risk when the consumer reads the data.

When the producer writes data to the List, the List does not actively notify the consumer of new messages being written. In order for the consumer to handle messages in a timely manner, it needs to continuously call the RPOP command in the program (e.g., using a while(1) loop). If there is a new message written, the RPOP command will return a result; otherwise, it will return an empty value and continue the loop.

Therefore, even if there are no new messages written to the List, the consumer still needs to continuously call the RPOP command, which will cause the consumer program’s CPU to be consumed in executing the RPOP command, resulting in unnecessary performance loss.

To solve this problem, Redis provides the BRPOP command. The BRPOP command, also known as blocking read, automatically blocks the client when no data is read from the queue until new data is written to the queue, and then starts reading the new data. Compared to the consumer program continuously calling the RPOP command by itself, this approach can save CPU overhead.

Now that we’ve solved the problem of message ordering, next we need to consider handling duplicate messages. There is a requirement here: The consumer program itself should be able to identify and handle duplicate messages.

On one hand, the message queue needs to provide a globally unique ID for each message; on the other hand, the consumer program needs to record the IDs of the messages that have already been processed.

After receiving a message, the consumer program can compare the received message ID with the recorded IDs of the messages that have been processed to determine whether the current message has been processed. If it has been processed, the consumer program will not process it again. This processing feature is also known as idempotence, which means that for the same message, the result of processing it once is the same as processing it multiple times.

However, List itself does not generate an ID for each message, so the globally unique ID of a message needs to be generated by the producer program before sending the message. After the ID is generated, we need to include this globally unique ID in the message when using the LPUSH command to insert the message into the List.

For example, by executing the following command, we insert a message with global ID 101030001 and inventory quantity 5 into the message queue:

LPUSH mq "101030001:stock:5"
(integer) 1

Lastly, let’s take a look at how the List type ensures message reliability.

Once the consumer program reads a message from the List, the List will no longer retain that message. So if the consumer program fails or crashes while processing a message, it will result in the message not being processed completely. When the consumer program is restarted, it cannot read the message from the List again.

To retain the message, the List type provides the BRPOPLPUSH command. This command allows the consumer program to read a message from one List, while Redis inserts the same message into another List (which can be called a backup List) for retention. This way, if the consumer program reads the message but fails to process it properly, it can reread the message from the backup List and process it again after restarting.

I have drawn a diagram below to illustrate the process of using the BRPOPLPUSH command to retain messages and the process of the consumer reading the messages again. Take a look.

The producer first uses LPUSH to insert the messages “5” and “3” into the message queue mq. The consumer program uses the BRPOPLPUSH command to read the message “5”, and at the same time, Redis inserts the message “5” into the mqback queue. If the consumer program crashes while processing the message “5”, it can reread the message “5” from mqback after restarting and continue processing.

Alright, at this point, you can see that based on the List type, we can satisfy the three major requirements for a distributed component’s message queue. However, when using List as a message queue, we may encounter another problem: the producer sends messages quickly while the consumer processes messages relatively slowly, which leads to a growing number of messages in the List and puts a lot of pressure on Redis’s memory.

At this point, we hope to start multiple consumer programs to form a consumer group and share the task of processing messages in the List. However, the List type does not support the implementation of consumer groups. So, is there a more suitable solution? This is where Redis Streams data type comes into play, which has been available since Redis version 5.0.

Compared to List, Streams can also satisfy the three major requirements of a message queue. Moreover, it supports reading messages in the form of consumer groups. Next, let’s learn about how to use Streams.

Stream-based Message Queue Solution #

Streams is a data type in Redis specifically designed for message queues, providing a rich set of commands for message queue operations.

  • XADD: Inserts a message into the queue, ensuring order and automatically generating a globally unique ID.
  • XREAD: Reads messages, allowing data to be read based on ID.
  • XREADGROUP: Reads messages in the context of consumer groups.
  • XPENDING and XACK: The XPENDING command is used to query messages that have been read by consumers but are not yet acknowledged, while the XACK command is used to acknowledge that message processing has been completed by a consumer.

First, let’s learn the XADD operation for storing messages in a Streams.

The XADD command is used to insert new messages into the message queue. The format of a message is a key-value pair. For each inserted message, Streams automatically generates a globally unique ID.

For example, by executing the following command, we can insert a message into a message queue named ‘mqstream’ with the key ‘repo’ and value ‘5’. The ‘’ after the message queue name indicates that Redis will generate a globally unique ID for the inserted data, such as “1599203861727-0”. Of course, we can also specify our own ID number after the message queue name, as long as this ID number is globally unique. However, using ‘’ is more convenient and efficient compared to manually setting an ID number.

XADD mqstream * repo 5
"1599203861727-0"

The globally unique ID of the message consists of two parts. The first part, “1599203861727”, represents the current server time in milliseconds when the data was inserted. The second part represents the message sequence number within the current millisecond, starting from 0. For example, “1599203861727-0” represents the first message within the millisecond “1599203861727”.

When a consumer needs to read messages, it can directly use the XREAD command to read from the message queue.

When using XREAD to read messages, you can specify a message ID and start reading from the message following that ID.

For example, by executing the following command, we can start reading all subsequent messages (3 in total) starting from the message ID “1599203861727-0”.

XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

In addition, consumers can also specify the block configuration option when calling XREAD to achieve blocking read operations similar to BRPOP. When there are no messages in the message queue, if the block configuration option is set, XREAD will block for the specified duration. The duration can be set in the block configuration option.

For example, let’s take a look at the following command. The “$” symbol at the end of the command indicates that we want to read the latest message. We also set the block configuration option to 10000, where the unit is milliseconds, indicating that if there are no messages when XREAD reads the latest message, it will block for 10000 milliseconds (10 seconds) and then return. In this example, since there are no messages in the mqstream message queue, XREAD will return empty (nil) after 10 seconds.

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

The operations mentioned earlier are also supported by Lists. Next, let’s learn about the unique features of Streams.

Streams themselves can use XGROUP to create consumer groups. After creating a consumer group, Streams can use the XREADGROUP command to allow consumers within the group to read messages.

For example, by executing the following command, we create a consumer group named ‘group1’ that consumes messages from the mqstream message queue.

XGROUP create mqstream group1 0
OK

Then, by executing the following command, we let the consumer ‘consumer1’ in group1 read all messages from the mqstream message queue. The “>” parameter at the end of the command indicates that reading starts from the first unacknowledged message. Since no other consumer in group1 has read any messages before consumer1 reads, consumer1 will receive all the messages in the mqstream message queue (a total of 4 messages).

XREADGROUP GROUP group1 consumer1 STREAMS mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1599274925823-0"
    1. “repo”
    2. “2”
    1. “1599274927910-0”
      1. “repo”
      2. “1”

It should be noted that once a message in the message queue is read by a consumer in a consumer group, other consumers in the same group can no longer read it. For example, after executing the XREADGROUP command earlier, if we try to let consumer2 in group1 read the message, it will receive an empty value because the message has already been read by consumer1, as shown below:

XREADGROUP group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)

The purpose of using consumer groups is to distribute the reading of messages among multiple consumers in the group. Therefore, we usually assign each consumer to read a part of the messages, thus achieving a balanced distribution of message reading load among multiple consumers. For example, we can execute the following commands to let consumer1, 2, and 3 in group2 each read one message:

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"

XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"

XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"

To ensure that consumers can still read unprocessed messages after a failure or restarting, Streams automatically uses an internal queue called the PENDING List to retain the messages each consumer in the consumer group has read until the consumer sends an XACK command to notify Streams that the message has been processed. If a consumer fails to process a message and does not send an XACK command to Streams, the message will still be retained. In this case, the consumer can use the XPENDING command to check the messages that have been read but not confirmed as processed after restarting.

For example, let’s check the number of messages that have been read but not confirmed by each consumer in group2. The second and third lines of the XPENDING result represent the minimum and maximum message IDs read by all consumers in group2 respectively.

XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

If we want to further check the specific data read by a consumer, we can execute the following command:

XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
   2) "consumer2"
   3) (integer) 513336
   4) (integer) 1

As we can see, the ID of the message already read by consumer2 is 1599274912765-0.

Once the message 1599274912765-0 is processed by consumer2, consumer2 can use the XACK command to notify Streams, and then this message will be deleted. When we use the XPENDING command to check again, we can see that consumer2 no longer has any read but unconfirmed messages.

XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

Now, we have learned how to use Streams to implement a message queue. I would like to emphasize again that Streams is a data type specifically designed for message queue scenarios in Redis 5.0. If you are using Redis 5.0 or later versions, you can consider using Streams as a message queue.

Summary #

In this lesson, we learned about the three major requirements for using message queues in distributed systems: message ordering, duplicate message handling, and message reliability. These three requirements can be further translated into three requirements for message queues: ordered access to message data, globally unique identifiers for message data, and deletion of message data after consumption.

I created a table summarizing the characteristics and differences of using Lists and Streams to implement message queues. Of course, in practice, you can also supplement and improve this table based on new knowledge.

In fact, there has always been debate in the industry about whether Redis is suitable for message queues. Many people believe that if you want to use a message queue, you should use dedicated software like Kafka or RabbitMQ that are designed specifically for messaging scenarios, and Redis is more suitable for caching.

Based on years of experience in Redis development, my view is that Redis is a very lightweight key-value database. Deploying a Redis instance means starting a process, and deploying a Redis cluster means deploying multiple Redis instances. On the other hand, deploying Kafka or RabbitMQ involves additional components, such as the need to deploy ZooKeeper for Kafka. Compared to Redis, Kafka and RabbitMQ are generally considered heavyweight message queues.

Therefore, the question of whether to use Redis as a message queue cannot be generalized. We need to consider the data volume at the business level and the requirements for performance, reliability, and scalability. If the message communication volume between components in a distributed system is not large, Redis can meet the message storage requirements with limited memory space. Furthermore, Redis’s high-performance characteristics can support fast message read and write operations, making it a good solution for message queues.

One Question per Lesson #

As usual, I have a small question for you. If a producer sends a message to a message queue that needs to be read and processed by multiple consumers (for example, a message is data collected from a business system that needs to be read by consumer 1 for real-time calculation and by consumer 2 to be stored in the distributed file system HDFS for later historical queries), which Redis data type would you use to solve this problem?

Please write down your thoughts and answer in the comments section. If you found today’s content helpful, feel free to share it with others. See you in the next lesson.