27 Message Queue Ultimate Solution Stream Part 2

27 Message Queue Ultimate Solution Stream Part 2 #

Before using message grouping, we need to manually create a group. Here are several commands related to Stream grouping that we will learn how to use.

Message grouping commands #

Create a consumer group #

127.0.0.1:6379> xgroup create mq group1 0-0 
OK

Syntax:

xgroup create stream-key group-key ID

Where:

  • mq is the key of the Stream;
  • group1 is the name of the group;
  • 0-0 represents reading from the first message.

To read from the current last message and onwards, use $ in the command as follows:

127.0.0.1:6379> xgroup create mq group2 $
OK

Read messages #

127.0.0.1:6379> xreadgroup group group1 c1 count 1 streams mq >
1) 1) "mq"
   2) 1) 1) "1580959593553-0"
         2) 1) "name"
            2) "redis"
            3) "age"
            4) "10"

Syntax:

xreadgroup group group-key consumer-key streams stream-key

Where:

  • > signifies reading the next message;
  • group1 represents the group name;
  • c1 represents the consumer name.

The xreadgroup command is similar to xread and can also be used for blocking reads. The command is as follows:

127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
1) 1) "mq"
   2) 1) 1) "1580959606181-0"
         2) 1) "name"
            2) "java"
            3) "age"
            4) "20"
127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
(nil) #No more messages in the queue
127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq > #Blocking read

At this point, open another command prompt and use xadd to add a message. The result of the blocking command execution is as follows:

127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq >
1) 1) "mq"
   2) 1) 1) "1580961475368-0"
         2) 1) "name"
            2) "sql"
            3) "age"
            4) "20"
(86.14s)

Message consumption acknowledgment #

After receiving a message, we need to manually acknowledge it (ack). The command is as follows:

127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1

Syntax:

xack key group-key ID [ID ...]

Consumption acknowledgment increases the reliability of messages. Typically, it is necessary to execute ack to confirm that the message has been consumed after business processing is complete. The execution of the entire process is shown in the following diagram:

image.png

Query unacknowledged message queue #

127.0.0.1:6379> xpending mq group1
1) (integer) 1 #There is 1 unacknowledged message
2) "1580994063971-0"
3) "1580994063971-0"
4) 1) 1) "c1"
      2) "1"
127.0.0.1:6379> xack  mq group1 1580994063971-0 #Confirm consumption
(integer) 1
127.0.0.1:6379> xpending mq group1
1) (integer) 0 #There are no unacknowledged messages
2) (nil)
3) (nil)
4) (nil)

xinfo query commands #

1. Query stream information

127.0.0.1:6379> xinfo stream mq
 1) "length"
 2) (integer) 2 #There are two messages in the queue
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1 #One consumer group
 9) "last-generated-id"
10) "1580959606181-0"
11) "first-entry"
12) 1) "1580959593553-0"
    2) 1) "name"
       2) "redis"
       3) "age"
       4) "10"
13) "last-entry"
14) 1) "1580959606181-0"
    2) 1) "name"
       2) "java"
       3) "age"
       4) "20"

Syntax:

xinfo stream stream-key

2. Query consumer group messages

127.0.0.1:6379> xinfo groups mq
1) 1) "name"
   2) "group1" #Consumer group name
   3) "consumers"
   4) (integer) 1 #One consumer client
   5) "pending"
   6) (integer) 1 #One unacknowledged message
   7) "last-delivered-id"
   8) "1580959593553-0" # Last message ID read

Syntax:

xinfo groups stream-key

3. View consumer group member information

127.0.0.1:6379> xinfo consumers mq group1
1) 1) "name"
   2) "c1" # Consumer name
   3) "pending"
   4) (integer) 0 # Unacknowledged messages
   5) "idle"
   6) (integer) 481855

Syntax:

xinfo consumers stream group-key

Delete consumer #

127.0.0.1:6379> xgroup delconsumer mq group1 c1
(integer) 1

Syntax:

xgroup delconsumer stream-key group-key consumer-key

Delete consumer group #

127.0.0.1:6379> xgroup destroy mq group1
(integer) 1

Syntax:

xgroup destroy stream-key group-key

Coding examples #

Next, let’s use Jedis to implement a Stream group message queue. The code is as follows:

import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import utils.JedisUtils;

import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamGroupExample {
    private static final String _STREAM_KEY = "mq"; // Stream key
    private static final String _GROUP_NAME = "g1"; // Group name
    private static final String _CONSUMER_NAME = "c1"; // Consumer 1 name
    private static final String _CONSUMER2_NAME = "c2"; // Consumer 2 name
    public static void main(String[] args) {
        // Producer
        producer();
        // Create consumer group
        createGroup(_STREAM_KEY, _GROUP_NAME);
        // Consumer 1
        new Thread(() -> consumer()).start();
        // Consumer 2
        new Thread(() -> consumer2()).start();
    }
    /**
     * Create consumer group
     * @param stream    Stream key
     * @param groupName Group name
     */
    public static void createGroup(String stream, String groupName) {
        Jedis jedis = JedisUtils.getJedis();
        jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
    }
    /**
     * Producer
     */
    public static void producer() {
        Jedis jedis = JedisUtils.getJedis();
        // Add message 1
        Map<String, String> map = new HashMap<>();
        map.put("data", "redis");
        StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
        System.out.println("Message added successfully ID: " + id);
        // Add message 2
        Map<String, String> map2 = new HashMap<>();
        map2.put("data", "java");
        StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
        System.out.println("Message added successfully ID: " + id2);
    }
    /**
     * Consumer 1
     */
    public static void consumer() {
        Jedis jedis = JedisUtils.getJedis();
        // Consume messages
        while (true) {
            // Read message
            Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            // Block and read one message (maximum blocking time is 120s)
            List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
                    120 * 1000, true, entry);
            if (list != null && list.size() == 1) {
                // Read message
                Map<String, String> content = list.get(0).getValue().get(0).getFields(); // Message content
                System.out.println("Consumer 1 read message ID: " + list.get(0).getValue().get(0).getID() +
                        ", content: " + new Gson().toJson(content));
            }
        }
    }
    /**
     * Consumer 2
     */
    public static void consumer2() {
        Jedis jedis = JedisUtils.getJedis();
        // Consume messages
        while (true) {
            // Read message
            Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
                    new StreamEntryID().UNRECEIVED_ENTRY);
            // Block and read one message (maximum blocking time is 120s)
            List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
                    120 * 1000, true, entry);
            if (list != null && list.size() == 1) {
                // Read message
                Map<String, String> content = list.get(0).getValue().get(0).getFields(); // Message content
                System.out.println("Consumer 2 read message ID: " + list.get(0).getValue().get(0).getID() +
                        ", content: " + new Gson().toJson(content));
            }
        }
    }
}

The result of running the above code is as follows:

Message added successfully ID: 1580971482344-0
Message added successfully ID: 1580971482415-0
Consumer 1 read message ID: 1580971482344-0, content: {"data":"redis"}
Consumer 2 read message ID: 1580971482415-0, content: {"data":"java"}

In the code, the fifth parameter of jedis.xreadGroup() method, noAck, indicates whether messages are automatically acknowledged. If set to true, the received messages will be automatically acknowledged, otherwise they need to be manually acknowledged.

Note: Use the latest version of the Jedis framework. In older versions, when the block parameter is set to a value larger than 0, there is a bug that throws a connection timeout exception.

As can be seen, multiple consumers within the same group will read different messages, and different consumers will not read the same message from the group.

Summary #

In this article, we introduced the concept of Stream groups and used the xreadGroup() method in Jedis to implement blocking message consumption. The method also includes a noAck parameter, which enables automatic message acknowledgment. Through this article, we have learned that multiple consumers within the same group will poll the messages in the message queue, and a single message will not be read by multiple consumers.

If you still have trouble understanding the knowledge in this article, it’s because you haven’t combined it with practice. So if you still have questions about this article, follow along and practice step by step.