26 Message Queue Ultimate Solution Stream Part 1

26 Message Queue Ultimate Solution Stream Part 1 #

Before the release of Redis 5.0 Stream, the implementation of message queues had their own shortcomings, such as:

  • PubSub pattern cannot persist messages, making it unreliable for message retention, and also cannot retrieve historical messages for clients reconnecting after being offline;
  • Implementing message queues with lists does not support duplicate consumption, as a message is deleted once consumed;
  • Implementing message queues with sorted sets does not support storing messages with the same value, and cannot block message consumption.

Furthermore, the above three methods can only store single value data when implementing message queues. This means that if you need to store an object, it must be serialized into a JSON string first, and then deserialized into an object after retrieval. This inconvenience affects user experience. To address these problems, Redis 5.0 introduced the Stream type as its most significant feature in this version. It perfectly implements message queues by drawing inspiration from Kafka’s design principles. Stream supports message persistence and consumption tracking, and supports ack mode for confirming messages, making message queues more stable and reliable.

Next, let’s first understand the features of Stream itself, and then comprehensively combine the features of Stream with Java code to implement a perfect message queue example.

Basics #

Since Stream is a data type, similar to other data types, it has its own set of operations, including:

  • xadd: Add a message;
  • xlen: Get the length of a message;
  • xdel: Delete messages by ID;
  • del: Delete the entire Stream;
  • xrange: Read messages within a range;
  • xread: Read messages after a specific message.

The specific usage is described below.

Adding a Message #

127.0.0.1:6379> xadd key * name redis age 10
"1580880750844-0" # The result is the message ID

“*” represents the Redis rule of automatically generating an ID using a combination of a timestamp and a sequence number. Users can also specify their own ID.

Syntax:

xadd key ID field string [field string ...]

Getting the Length of Messages #

127.0.0.1:6379> xlen key
(integer) 1

Syntax:

xlen key

Deleting Messages #

127.0.0.1:6379> xadd key * name redis
"1580881585129-0" # Message ID
127.0.0.1:6379> xlen key
(integer) 1
127.0.0.1:6379> xdel key 1580881585129-0 # Delete a message by ID
(integer) 1
127.0.0.1:6379> xlen key
(integer) 0

Syntax:

xdel key ID [ID ...]

This command supports deleting one or multiple messages based on their message ID.

Deleting the Entire Stream #

127.0.0.1:6379> del key # Delete the entire Stream
(integer) 1
127.0.0.1:6379> xlen key
(integer) 0

Syntax:

del key [key ...]

This command supports deleting one or multiple Streams.

Querying Messages within a Range #

127.0.0.1:6379> xrange mq - +
1) 1) "1580882060464-0"
   2) 1) "name"
      2) "redis"
      3) "age"
      4) "10"
2) 1) "1580882071524-0"
   2) 1) "name"
      2) "java"
      3) "age"
      4) "20"

Where “-” represents the first message, and “+” represents the last message.

Syntax:

xrange key start end [COUNT count]

Querying Messages after a Specific Message #

127.0.0.1:6379> xread count 1 streams mq 1580882060464-0
1) 1) "mq"
   2) 1) 1) "1580882071524-0"
         2) 1) "name"
            2) "java"
            3) "age"
            4) "20"

In the Stream named mq, query a message after the message ID 1580882060464-0.

Related syntax:

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

This command provides the parameter “block” for blocking reading. We can use it to read new data after the current data. The command is as follows:

127.0.0.1:6379> xread count 1 block 0 streams mq $

where “block 0” means blocking indefinitely, and “$” represents reading from the last position. At this point, open a new command line and insert a new data. The result of this command is as follows:

127.0.0.1:6379> xadd mq * name sql age 20 # New window adds data
"1580890737890-0"
# Newly blocked data
127.0.0.1:6379> xread count 1 block 0 streams mq $
1) 1) "mq"
   2) 1) 1) "1580890737890-0"
         2) 1) "name"
            2) "sql"
            3) "age"
            4) "20"
(36.37s)

Basic Message Queue #

Using Stream consumer groups to implement message queue is similar to using lists. We can use the xadd command and xread command to read the stream in a loop to implement the basic version of a message queue. The specific code is as follows:

import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamExample {
    public static void main(String[] args) throws InterruptedException {
        // Consumer
        new Thread(() -> consumer()).start();
        Thread.sleep(1000);
        // Producer
        producer();
    }
    /**
     * Producer
     */
    public static void producer() throws InterruptedException {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // Push message
        Map<String, String> map = new HashMap<>();
        map.put("name", "redis");
        map.put("age", "10");
        // Add message
        StreamEntryID id = jedis.xadd("mq", null, map);
        System.out.println("Message added successfully. ID: " + id);
    }
    /**
     * Consumer
     */
    public static void consumer() {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        // Consume messages
        while (true) {
            // Get messages, new StreamEntryID().LAST_ENTRY indicates getting newly added messages after the current time
            Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>("mq",
                    new StreamEntryID().LAST_ENTRY);
            // Read one message with blocking (maximum blocking time: 120s)
            List<Map.Entry<String, List<StreamEntry>>> list = jedis.xread(1, 120 * 1000, entry);
            if (list.size() == 1) {
                // Message read
                System.out.println("Message read. ID: " + list.get(0).getValue().get(0).getID());
                // Use Gson to print the message content in JSON format
                System.out.println("Content: " + new Gson().toJson(list.get(0).getValue().get(0).getFields()));
            }
        }
    }
}

The results of the above code are as follows:

Successfully added message. ID: 1580895735148-0
Message read. ID: 1580895735148-0
Content: {"name":"redis","age":"10"}

One thing to note in the code above is that we use new StreamEntryID().LAST_ENTRY to read newly added messages after the current time. If you want to read historical messages from the beginning, you can remove .LAST_ENTRY from this line of code.

Also, when using the jedis.xread() method in the Jedis framework to block and read the message queue, the second parameter “block” must be set to a value greater than 0. If it is set to a value less than 0, the blocking condition will be invalid. Upon reviewing the jedis source code, I found that it only sets the blocking attribute when the value is greater than 0. The source code is as follows:

if (block > 0L) {
    params[streamsIndex++] = Keyword.BLOCK.raw;
    params[streamsIndex++] = Protocol.toByteArray(block);
}

Therefore, we can set a relatively large value for the “block” attribute to block and read messages.

Blocking reading of messages means that it will enter a sleep mode when there are no data in the queue, and resume execution when there is data.

Summary #

This article introduces the basic methods of Stream and uses the xadd command to store messages and the xread command to loop and block read messages to implement a basic version of a message queue. The interaction process is shown in the following diagram:

Stream-Basic.png

However, these are not the most core features of Stream. In the next section, we will guide readers to use consumer groups to implement a perfect message queue.