27 Message Queue Ultimate Solution Stream Part 2 #
Before using message grouping, we need to manually create a group. Here are several commands related to Stream grouping that we will learn how to use.
Message grouping commands #
Create a consumer group #
127.0.0.1:6379> xgroup create mq group1 0-0
OK
Syntax:
xgroup create stream-key group-key ID
Where:
mq
is the key of the Stream;group1
is the name of the group;0-0
represents reading from the first message.
To read from the current last message and onwards, use $
in the command as follows:
127.0.0.1:6379> xgroup create mq group2 $
OK
Read messages #
127.0.0.1:6379> xreadgroup group group1 c1 count 1 streams mq >
1) 1) "mq"
2) 1) 1) "1580959593553-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"
Syntax:
xreadgroup group group-key consumer-key streams stream-key
Where:
>
signifies reading the next message;group1
represents the group name;c1
represents the consumer name.
The xreadgroup command is similar to xread and can also be used for blocking reads. The command is as follows:
127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
1) 1) "mq"
2) 1) 1) "1580959606181-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"
127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
(nil) #No more messages in the queue
127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq > #Blocking read
At this point, open another command prompt and use xadd to add a message. The result of the blocking command execution is as follows:
127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq >
1) 1) "mq"
2) 1) 1) "1580961475368-0"
2) 1) "name"
2) "sql"
3) "age"
4) "20"
(86.14s)
Message consumption acknowledgment #
After receiving a message, we need to manually acknowledge it (ack). The command is as follows:
127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1
Syntax:
xack key group-key ID [ID ...]
Consumption acknowledgment increases the reliability of messages. Typically, it is necessary to execute ack to confirm that the message has been consumed after business processing is complete. The execution of the entire process is shown in the following diagram:
Query unacknowledged message queue #
127.0.0.1:6379> xpending mq group1
1) (integer) 1 #There is 1 unacknowledged message
2) "1580994063971-0"
3) "1580994063971-0"
4) 1) 1) "c1"
2) "1"
127.0.0.1:6379> xack mq group1 1580994063971-0 #Confirm consumption
(integer) 1
127.0.0.1:6379> xpending mq group1
1) (integer) 0 #There are no unacknowledged messages
2) (nil)
3) (nil)
4) (nil)
xinfo query commands #
1. Query stream information
127.0.0.1:6379> xinfo stream mq
1) "length"
2) (integer) 2 #There are two messages in the queue
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1 #One consumer group
9) "last-generated-id"
10) "1580959606181-0"
11) "first-entry"
12) 1) "1580959593553-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"
13) "last-entry"
14) 1) "1580959606181-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"
Syntax:
xinfo stream stream-key
2. Query consumer group messages
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
2) "group1" #Consumer group name
3) "consumers"
4) (integer) 1 #One consumer client
5) "pending"
6) (integer) 1 #One unacknowledged message
7) "last-delivered-id"
8) "1580959593553-0" # Last message ID read
Syntax:
xinfo groups stream-key
3. View consumer group member information
127.0.0.1:6379> xinfo consumers mq group1
1) 1) "name"
2) "c1" # Consumer name
3) "pending"
4) (integer) 0 # Unacknowledged messages
5) "idle"
6) (integer) 481855
Syntax:
xinfo consumers stream group-key
Delete consumer #
127.0.0.1:6379> xgroup delconsumer mq group1 c1
(integer) 1
Syntax:
xgroup delconsumer stream-key group-key consumer-key
Delete consumer group #
127.0.0.1:6379> xgroup destroy mq group1
(integer) 1
Syntax:
xgroup destroy stream-key group-key
Coding examples #
Next, let’s use Jedis to implement a Stream group message queue. The code is as follows:
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import utils.JedisUtils;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StreamGroupExample {
private static final String _STREAM_KEY = "mq"; // Stream key
private static final String _GROUP_NAME = "g1"; // Group name
private static final String _CONSUMER_NAME = "c1"; // Consumer 1 name
private static final String _CONSUMER2_NAME = "c2"; // Consumer 2 name
public static void main(String[] args) {
// Producer
producer();
// Create consumer group
createGroup(_STREAM_KEY, _GROUP_NAME);
// Consumer 1
new Thread(() -> consumer()).start();
// Consumer 2
new Thread(() -> consumer2()).start();
}
/**
* Create consumer group
* @param stream Stream key
* @param groupName Group name
*/
public static void createGroup(String stream, String groupName) {
Jedis jedis = JedisUtils.getJedis();
jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
}
/**
* Producer
*/
public static void producer() {
Jedis jedis = JedisUtils.getJedis();
// Add message 1
Map<String, String> map = new HashMap<>();
map.put("data", "redis");
StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
System.out.println("Message added successfully ID: " + id);
// Add message 2
Map<String, String> map2 = new HashMap<>();
map2.put("data", "java");
StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
System.out.println("Message added successfully ID: " + id2);
}
/**
* Consumer 1
*/
public static void consumer() {
Jedis jedis = JedisUtils.getJedis();
// Consume messages
while (true) {
// Read message
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// Block and read one message (maximum blocking time is 120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// Read message
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // Message content
System.out.println("Consumer 1 read message ID: " + list.get(0).getValue().get(0).getID() +
", content: " + new Gson().toJson(content));
}
}
}
/**
* Consumer 2
*/
public static void consumer2() {
Jedis jedis = JedisUtils.getJedis();
// Consume messages
while (true) {
// Read message
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// Block and read one message (maximum blocking time is 120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// Read message
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // Message content
System.out.println("Consumer 2 read message ID: " + list.get(0).getValue().get(0).getID() +
", content: " + new Gson().toJson(content));
}
}
}
}
The result of running the above code is as follows:
Message added successfully ID: 1580971482344-0
Message added successfully ID: 1580971482415-0
Consumer 1 read message ID: 1580971482344-0, content: {"data":"redis"}
Consumer 2 read message ID: 1580971482415-0, content: {"data":"java"}
In the code, the fifth parameter of jedis.xreadGroup() method, noAck, indicates whether messages are automatically acknowledged. If set to true, the received messages will be automatically acknowledged, otherwise they need to be manually acknowledged.
Note: Use the latest version of the Jedis framework. In older versions, when the block parameter is set to a value larger than 0, there is a bug that throws a connection timeout exception.
As can be seen, multiple consumers within the same group will read different messages, and different consumers will not read the same message from the group.
Summary #
In this article, we introduced the concept of Stream groups and used the xreadGroup() method in Jedis to implement blocking message consumption. The method also includes a noAck parameter, which enables automatic message acknowledgment. Through this article, we have learned that multiple consumers within the same group will poll the messages in the message queue, and a single message will not be read by multiple consumers.
If you still have trouble understanding the knowledge in this article, it’s because you haven’t combined it with practice. So if you still have questions about this article, follow along and practice step by step.