25 Other Implementations of Message Queues

25 Other Implementations of Message Queues #

Before Redis 5.0, there were many different ways to implement message queues. In addition to the publish-subscribe pattern we mentioned earlier, there are two other common implementations: List and ZSet.

The List and ZSet implementations solve the problem of lack of persistence in the publish-subscribe pattern. However, these two methods also have their own drawbacks. Let’s learn about them. Let’s start with the List implementation of the message queue.

List-based Message Queue #

The List method is the simplest and most direct way to implement a message queue. It mainly uses lpush and rpop to store and retrieve messages, as shown in the following diagram:

list_store_retrieve.png

List uses the following commands to implement the message queue:

127.0.0.1:6379> lpush mq "hello" # Push the message "hello"
(integer) 1
127.0.0.1:6379> lpush mq "msg" # Push the message "msg"
(integer) 2
127.0.0.1:6379> rpop mq # Receive the message "hello"
"hello"
127.0.0.1:6379> rpop mq # Receive the message "msg"
"msg"

Here, mq is equivalent to the channel name. lpush is used to produce messages, and rpop is used to consume messages.

Code Implementation #

Next, let’s implement the List-based message queue in Java code. Here is the source code:

import redis.clients.jedis.Jedis;

public class ListMQExample {
    public static void main(String[] args){
        // Consumer
        new Thread(() -> consumer()).start();
        // Producer
        producer();
    }
    
    /**
     * Producer
     */
    public static void producer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // Push message
        jedis.lpush("mq", "Hello, List.");
    }
    
    /**
     * Consumer
     */
    public static void consumer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // Consume messages
        while (true) {
            // Get message
            String msg = jedis.rpop("mq");
            if (msg != null) {
                // Received a message
                System.out.println("Received message: " + msg);
            }
        }
    }
}

The output of the above program is:

Received message: Hello, List.

We use an infinite loop to retrieve data from the queue in real-time. However, this brings another problem. If there is no data in the queue, the infinite loop will continue to consume system resources. In such cases, we can use brpop instead of rpop to perfectly solve this problem.

brpop is short for blocking pop, which means it will enter a sleep state when there is no data in the queue. It will only wake up and execute the retrieval task when new data is added to the queue. This solves the problem of continuously consuming system resources caused by the while loop. The improved version of the code is as follows:

import redis.clients.jedis.Jedis;

public class ListMQExample {
    public static void main(String[] args) throws InterruptedException {
        // Consumer (Improved Version)
        new Thread(() -> bConsumer()).start();
        // Producer
        producer();
    }

    /**
     * Producer
     */
    public static void producer() throws InterruptedException {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // Push message
        jedis.lpush("mq", "Hello, List.");
        Thread.sleep(1000);
        jedis.lpush("mq", "message 2.");
        Thread.sleep(2000);
        jedis.lpush("mq", "message 3.");
    }

    /**
     * Consumer (Blocking Version)
     */
    public static void bConsumer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        while (true) {
            // Blocking pop
            for (String item : jedis.brpop(0,"mq")) {
                // Process the retrieved data
                System.out.println(item);
            }
        }
    }
}

In this case, the brpop method’s first parameter is the timeout value. Setting it to 0 means it will block until data is available.

Analysis of Pros and Cons #

Advantages of List:

  • Messages can be persisted by leveraging Redis’ persistence mechanisms (AOF, RDB, or hybrid).
  • Consumers can backlog messages without being forcibly disconnected due to a large number of messages on the client.

Disadvantages of List:

  • Messages cannot be consumed more than once; once consumed, they are deleted.
  • Does not have the function of subscribing to topics.

ZSet-based Message Queue #

Compared to the previous two methods (List and publish-subscribe), the ZSet-based message queue is more complex to implement. However, ZSet has the advantage of an additional score attribute, which allows for more functionalities. For example, we can use it to store timestamps and implement delayed message queues.

The implementation approach is the same as List, using zadd and zrangebyscore for storing and retrieving. Here, we won’t repeat the details. Readers can practice using the List implementation as a reference to see if they can achieve similar functionalities. If you cannot, don’t worry. In the following sections of this course, when we introduce delayed queues, we will use ZSet to implement it.

Analysis of Pros and Cons #

Advantages of ZSet:

  • Supports message persistence.
  • More convenient for querying compared to List. ZSet makes it easy to retrieve data using the score attribute, while List requires traversing the entire list to find a specific value.

Disadvantages of ZSet:

  • ZSet cannot store duplicate elements, meaning if a message is duplicate, it can only be inserted once in the sorted set.
  • ZSet is sorted based on score values and does not maintain the insertion order like List.
  • ZSet does not have the blocking pop function like List’s brpop.

Summary #

This article introduced two other implementations of message queues: List and ZSet. Both use their own methods to store data in the queue and retrieve messages with an infinite loop to achieve the functionality of a message queue. Compared to the publish-subscribe pattern, the advantage of these two methods is that they support message persistence. However, they each have their own issues. So, we look forward to the introduction of Streams in the next lesson, which can solve these problems.