30 Complete Case Realizing Delayed Queues With Two Methods

30 Complete Case Realizing Delayed Queues with Two Methods #

Delayed queue refers to postponing the current task to be done after a certain period of time.

Delayed queues are commonly used in actual work and interviews. There are many ways to implement delayed queues, and each implementation method has its advantages and disadvantages. Let’s take a look at them next.

Common Use Cases for Delayed Queues #

Common use cases for delayed queues include:

  1. Orders that have not been paid for more than 30 minutes will be cancelled.
  2. Orders that have not been accepted by takeaway merchants for more than 5 minutes will be cancelled.
  3. Users who have registered on the platform but have not logged in within 30 days will receive reminder messages.

Similar application scenarios can all be implemented using delayed queues.

Common Implementation Methods #

The implementation methods of delayed queues in Redis have three categories: program-based implementation, message queue (MQ) framework-based implementation, and Redis-based implementation.

Program-based Implementation #

The DelayQueue class built-in in JDK is used to implement delayed queues. The code is as follows:

public class DelayTest {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue delayQueue = new DelayQueue();
        delayQueue.put(new DelayElement(1000));
        delayQueue.put(new DelayElement(3000));
        delayQueue.put(new DelayElement(5000));
        System.out.println("Start time: " +  DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()){
            System.out.println(delayQueue.take());
        }
        System.out.println("End time: " +  DateFormat.getDateTimeInstance().format(new Date()));
    }

    static class DelayElement implements Delayed {
        // Delay deadline (unit: milliseconds)
        long delayTime = System.currentTimeMillis();
        public DelayElement(long delayTime) {
            this.delayTime = (this.delayTime + delayTime);
        }
        @Override
        // Get remaining time
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        @Override
        // Sorting basis for elements in the queue
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }
        @Override
        public String toString() {
            return DateFormat.getDateTimeInstance().format(new Date(delayTime));
        }
    }
}

Result of program execution:

Start time: 2019-6-13 20:40:38
2019-6-13 20:40:39
2019-6-13 20:40:41
2019-6-13 20:40:43
End time: 2019-6-13 20:40:43

Advantages

  1. Convenient for development, can be directly used in code.
  2. Simple implementation.

Disadvantages

  1. Does not support persistent storage.
  2. Does not support distributed systems.

MQ-based Implementation #

RabbitMQ itself does not support delayed queues, but a plugin called rabbitmq-delayed-message-exchange can be added to achieve delayed queue functionality.

Advantages

  1. Supports distributed systems.
  2. Supports persistent storage.

Disadvantages

The framework is relatively heavy and requires building and configuring MQ.

Redis-based Implementation #

Redis implements the delayed message queue using sorted sets (ZSet), where each element in the set has a score attribute used to store the execution time of the delayed task.

Advantages

  1. Flexible and convenient. Redis is the standard for Internet companies, so there is no need to set up additional related environments.
  2. Can perform message persistence, greatly improving the reliability of the delayed queue.
  3. Supports distributed systems, unlike JDK’s DelayQueue.
  4. High availability, leveraging Redis’s own high availability solution, enhancing system robustness.

Disadvantages

Needs to use an infinite loop to execute task checks, which consumes a small amount of system resources.

Taking into account the advantages and disadvantages mentioned above, we decided to use Redis to implement the delayed queue. The specific implementation code is as follows.

Code Implementation #

In this article, we use Java to implement the delayed queue. There are two ways to implement the delayed queue: the first way is to use zrangebyscore to query all pending tasks that meet the conditions and execute them in a loop. The second way is to query the earliest message each time, and check whether the execution time of this message is less than or equal to the current time. If it is, execute the task; otherwise, continue looping checks.

Method 1 #

Query all tasks that meet the conditions at one time, and execute them in a loop. The code is as follows:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;

import java.time.Instant;
import java.util.Set;

/**
 * Delayed Queue
 */
public class DelayQueueExample {
    // zset key
    private static final String _KEY = "myDelayQueue";

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = JedisUtils.getJedis();
        // Delay execution by 30s (time after 30s)
        long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
        jedis.zadd(_KEY, delayTime, "order_1");
        // Continue adding test data
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
        jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
        jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
        // Start delayed queue
        doDelayQueue(jedis);
    }

    /**
     * Delayed queue consumer
     * @param jedis Redis client
     */
    public static void doDelayQueue(Jedis jedis) throws InterruptedException {
        while (true) {
            // Current time
            Instant nowInstant = Instant.now();
            long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // Last second time
            long nowSecond = nowInstant.getEpochSecond();
            // Query all tasks for the current time
            Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);
            for (String item : data) {
                // Consume task
                System.out.println("Consumed: " + item);
            }
            // Remove executed tasks
            jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);
            Thread.sleep(1000); // Poll every second
        }
    }
}

The execution result of the above program is as follows:

Consumed: order_2
Consumed: order_3
Consumed: order_4
Consumed: order_5
Consumed: order_1

Approach 2 #

Each time, query the earliest task, compare it with the current time, and decide whether to execute or not. The implementation code is as follows:

import redis.clients.jedis.Jedis;
import utils.JedisUtils;

import java.time.Instant;
import java.util.Set;

/**
 * Delayed Queue
 */
public class DelayQueueExample {
    // zset key
    private static final String _KEY = "myDelayQueue";

    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = JedisUtils.getJedis();
        // Delay execution by 30s (time after 30s)
        long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
        jedis.zadd(_KEY, delayTime, "order_1");
        // Continue adding test data
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
        jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
        jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
        jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
        // Start delayed queue
        doDelayQueue2(jedis);
    }

    /**
     * Delayed queue consumer (Approach 2)
     * @param jedis Redis client
     */
    public static void doDelayQueue2(Jedis jedis) throws InterruptedException {
        while (true) {
            // Current time
            long nowSecond = Instant.now().getEpochSecond();
            // Query one message at a time and check the execution time of this message
            Set<String> data = jedis.zrange(_KEY, 0, 0);
            if (data.size() == 1) {
                String firstValue = data.iterator().next();
                // Message execution time
                Double score = jedis.zscore(_KEY, firstValue);
                if (nowSecond >= score) {
                    // Consume message (perform business logic processing)
                    System.out.println("Consumed Message: " + firstValue);
                    // Remove executed tasks
                    jedis.zrem(_KEY, firstValue);
                }
            }
            Thread.sleep(100); // Execution interval
        }
    }
}

The execution result of the above program is the same as Approach 1, as follows:

Consumed: order_2
Consumed: order_3
Consumed: order_4
Consumed: order_5
Consumed: order_1

The code Thread.sleep(100) for the execution interval can be removed or configured based on the actual business scenario.

Summary #

In this article, we introduced the use cases of delayed queues and various implementation approaches. Among them, the Redis approach is the most suitable for our requirements. It mainly uses the score property of the sorted set to store the delayed execution time. Then, an infinite loop is started to check if there are any tasks that meet the requirements. If there are, execute the related logic; if not, continue to loop for detection.