24 Message Queues Publish Subscribe Mechanism

24 Message Queues Publish-Subscribe Mechanism #

Redis provides dedicated types, Publishers and Subscribers, to implement message queues.

Before we begin, let’s introduce a few key concepts in message queues to help you better understand the content of this article.

Firstly, the sender of a message is called the publisher or producer, while the receiver of a message is called the subscriber or consumer, which is responsible for processing the messages sent by the producer.

Publisher-Consumer Basic Concepts-2.png

In addition to publishers and subscribers, there is another important concept in message queues: channels, which represent the name of a specific message queue. Firstly, a consumer needs to subscribe to a channel, and then when a producer sends a message to this channel, the consumer can receive the message normally, as shown in the following diagram:

Channel.png

Simple Subscription and Publish #

A message queue has two important roles: sender and subscriber. The corresponding commands are as follows:

  • Publish a message: publish channel "message"
  • Subscribe to a message: subscribe channel

Next, let’s take a look at the specific command implementation.

Subscribe to a message #

127.0.0.1:6379> subscribe channel # Subscribe to messages in channel
Reading messages...
1) "subscribe"
2) "channel"
3) (integer) 1

Related syntax:

subscribe channel [channel ...]

This command supports subscribing to one or more channels, which means a subscriber can subscribe to multiple channels. For example, if a client subscribes to two channels channel and channel2, and two publishers push messages respectively, the subscriber’s output will be as follows:

127.0.0.1:6379> subscribe channel channel2 # Subscribe to channel and channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2
1) "message"
2) "channel" # Received message in channel
3) "message 1."
1) "message"
2) "channel2" # Received message in channel2
3) "message 2."

It can be seen that this subscriber can receive messages from two channels.

Publish a message #

127.0.0.1:6379> publish channel "hello, redis." # Publish a message
(integer) 1

Related syntax:

publish channel message

The final return value indicates the number of successful deliveries to subscribers. 1 indicates that the message has been successfully sent to one subscriber. This number can be from 0 to n, depending on the number of subscribers.

For example, when there are two subscribers, the result of the push will be 2, as shown in the following code.

Subscriber 1:

127.0.0.1:6379> subscribe channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1

Subscriber 2:

127.0.0.1:6379> subscribe channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1

Publish a message:

127.0.0.1:6379> publish channel "message"
(integer) 2

It can be seen that this message has been successfully sent to both subscribers, and the result has also become 2.

Topic Subscription #

We have introduced the simple subscription and publish mode above, but it is not suitable if we want to subscribe to a specific type of message. For example, if we want to subscribe to a message queue for logs, and their names are all logXXX, then we need to use another feature provided by Redis, Pattern Subscribe, for topic subscription. In this mode, the * symbol can be used to match multiple channels, as shown in the following diagram:

Topic Subscription2.png

The specific implementation code for the topic pattern is as follows. Subscriber:

127.0.0.1:6379> psubscribe log_* # Topic subscription to log_*
1) "psubscribe"
2) "log_*"
3) (integer) 1
1) "pmessage"
2) "log_*"
3) "log_user" # Received message in channel log_user
4) "user message."
1) "pmessage"
2) "log_*"
3) "log_sys" # Received message in channel log_sys
4) "sys message."
1) "pmessage"
2) "log_*"
3) "log_db" # Received message in channel log_db
4) "db message"

From the above execution result, using the command psubscribe log_* enables receiving messages from all channels that contain log_XXX.

Related syntax:

psubscribe pattern [pattern ...]

The code for the producer is as follows:

127.0.0.1:6379> publish log_user "user message."
(integer) 1
127.0.0.1:6379> publish log_sys "sys message."
(integer) 1
127.0.0.1:6379> publish log_db "db message"
(integer) 1

Code Implementation #

Below, we use Jedis to implement both regular pub-sub models and topic subscription.

Regular Model #

The code for the consumer is as follows:

/**
 * Consumer
 */
public static void consumer() {
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    // Receive and process messages
    jedis.subscribe(new JedisPubSub() {
        @Override
        public void onMessage(String channel, String message) {
            // Receive messages and perform business logic
            System.out.println("Channel " + channel + " received message: " + message);
        }
    }, "channel");
}

The code for the producer is as follows:

/**
 * Producer
 */
public static void producer() {
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    // Push messages
    jedis.publish("channel", "Hello, channel.");
}

Running the publisher and subscriber model:

public static void main(String[] args) throws InterruptedException {
    // Create a new thread as the consumer
    new Thread(() -> consumer()).start();
    // Pause for 0.5s to wait for the consumer to initialize
    Thread.sleep(500);
    // Producer sends a message
    producer();
}

The output of the above code is as follows:

Channel channel received message: Hello, channel.

Topic Subscription Model #

The code for the producer in the topic subscription model is the same. The only difference is the code for the consumer, as shown below:

/**
 * Topic Subscription
 */
public static void pConsumer() {
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    // Topic subscription
    jedis.psubscribe(new JedisPubSub() {
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            // Receive messages and perform business logic
            System.out.println(pattern + " | Channel " + channel + " received message: " + message);
        }
    }, "channel*");
}

The code for running the topic model is as follows:

public static void main(String[] args) throws InterruptedException {
    // Topic subscription
    new Thread(() -> pConsumer()).start();
    // Pause for 0.5s to wait for the consumer to initialize
    Thread.sleep(500);
    // Producer sends a message
    producer();
}

The output of the above code is as follows:

channel* | Channel channel received message: Hello, channel.

Considerations #

There are two drawbacks to the publish-subscribe model:

  1. Messages cannot be persistently stored. If the Redis server crashes or restarts, all messages will be lost.
  2. The publish-subscribe model is a “fire-and-forget” working mode. If a subscriber goes offline and reconnects, it cannot consume past historical messages.

However, these drawbacks are completely resolved with the addition of the Stream type in Redis 5.0.

In addition to these drawbacks, there is another consideration for the publish-subscribe model: when there is a certain amount of message backlog on the consumer side, i.e., when the consumer cannot keep up with the messages sent by the producer, if it exceeds 32MB or continuously exceeds 8MB within 60 seconds, the consumer will be forcibly disconnected. This parameter is set in the configuration file and the default value is client-output-buffer-limit pubsub 32mb 8mb 60.

Summary #

This article introduces several terms related to message queues. Producers and consumers refer to the sender and receiver of messages, respectively. It also introduces three commands of the publish-subscribe model:

  • subscribe channel for regular subscriptions
  • publish channel message for publishing messages
  • psubscribe pattern for topic subscriptions

Using these commands, single channel and multiple channel message exchange can be achieved. However, there are some drawbacks to the publish-subscribe model, such as “fire-and-forget” and inability to persist messages. However, these issues will be resolved with the introduction of the Stream type, which will be discussed in detail in future articles.