21 Technique Series, the Deity of Delayed Tasks Handling Time Wheel Hashed Wheel Timer

21 Technique series, the deity of delayed tasks handling - Time wheel HashedWheelTimer #

There are many scenarios in Netty that rely on timed tasks, such as timeout control for client connections and heartbeat detection for communication between parties. When studying the Netty reactor thread model, we learned that NioEventLoop is responsible for handling I/O events as well as executing tasks in the task queue, including timed tasks. In order to achieve high-performance scheduling of timed tasks, Netty introduces the hashed wheel algorithm to drive the execution of timed tasks. But what exactly is a hashed wheel? Why does Netty use a hashed wheel to handle timed tasks? Can’t the native implementation of the JDK meet the requirements? In this lesson, I will step by step analyze the principles of the hashed wheel and how Netty implements the hashed wheel algorithm.

Note: This article refers to Netty version 4.1.42.Final.

Basic Knowledge of Timed Tasks #

First of all, let’s understand what a timed task is. Timers have many use cases, and we encounter them frequently in our daily work, such as generating monthly statistical reports, financial reconciliation, member points settlement, and email notifications. Timers generally have three forms: fixed-period scheduling, delayed execution after a certain period of time, and execution at a specific time.

The essence of a timer is to design a data structure that can store and schedule a collection of tasks, with tasks closer to their deadlines having higher priority. So how does the timer know if a task has expired? The timer needs to implement polling, checking whether tasks have expired every time slice.

So the internal structure of the timer generally needs a task queue and an asynchronous polling thread, and it should provide three basic operations:

  • Schedule: add a task to the task collection.
  • Cancel: cancel a task.
  • Run: execute expired tasks.

The JDK provides three commonly used implementations of timers, namely Timer, DelayedQueue, and ScheduledThreadPoolExecutor. Let’s introduce each of them one by one.

Timer #

Timer is an implementation in the earlier versions of the JDK. It can implement tasks with fixed periods and delayed tasks. Timer starts an asynchronous thread to execute expired tasks. The tasks can be scheduled to be executed only once or repeatedly at fixed intervals. Let’s first look at how to use Timer. Here is an example code:

Timer timer = new Timer();

timer.scheduleAtFixedRate(new TimerTask() {

    @Override

    public void run() {

        // do something

    }

}, 10000, 1000); // Schedule a task with a period of 1s after 10s

As you can see, tasks are implemented with the TimerTask class, which is an abstract class that implements the Runnable interface. Timer is responsible for scheduling and executing TimerTask. Now let’s take a look at the internal construction of Timer:

public class Timer {

    private final TaskQueue queue = new TaskQueue();

    private final TimerThread thread = new TimerThread(queue);
    
    public Timer(String name) {

        thread.setName(name);

        thread.start();

    }

}

TaskQueue is implemented with an array structure as a min-heap, where tasks with the closest deadlines are at the top of the heap, and queue[1] is always the task with the highest priority to be executed. Therefore, using a min-heap data structure, the time complexity of the Run operation is O(1), and the time complexity of Schedule and Cancel operations is O(logn).

Timer starts an asynchronous thread called TimerThread, which is responsible for processing tasks regardless of how many tasks are added to the array. TimerThread periodically polls the tasks in the TaskQueue. If the top task’s deadline has arrived, it will be executed. If it is a periodic task, after being executed, the next task’s deadline will be recalculated and it will be put back into the min-heap. If it is a single execution task, it will be deleted from the TaskQueue after execution.

DelayedQueue #

DelayedQueue is a blocking queue in the JDK that can delay retrieving objects. Internally, it uses a priority queue (PriorityQueue) to store objects. Each object in the DelayQueue must implement the Delayed interface and override the compareTo and getDelay methods. Here is an example of how to use DelayQueue:

public class DelayQueueTest {

    public static void main(String[] args) throws Exception {

        BlockingQueue<SampleTask> delayQueue = new DelayQueue<>();

        long now = System.currentTimeMillis();

        delayQueue.put(new SampleTask(now + 1000));

        delayQueue.put(new SampleTask(now + 2000));

        delayQueue.put(new SampleTask(now + 3000));

        for (int i = 0; i < 3; i++) {

            System.out.println(new Date(delayQueue.take().getTime()));

        }

    }

    static class SampleTask implements Delayed {

        long time;

        public SampleTask(long time) {

            this.time = time;

        }

        public long getTime() {

            return time;

        }

        @Override

        public int compareTo(Delayed o) {

            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}

@Override

public long getDelay(TimeUnit unit) {

    return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

}

DelayQueue provides blocking methods for put() and take(), which can be used to add and retrieve objects from the queue. After an object is added to the DelayQueue, it will be sorted based on the compareTo() method. The getDelay() method is used to calculate the remaining time of the message delay. Only when getDelay <= 0, the object can be retrieved from the DelayQueue.

The most common scenario for using DelayQueue in daily development is to implement retry mechanisms. For example, after an interface call fails or a request times out, the current request object can be placed in the DelayQueue, and then retrieved by an asynchronous thread using the take() method to continue with the retry. If the request still fails, it can be put back into the DelayQueue. To limit the retry frequency, you can set the maximum number of retries and use the exponential backoff algorithm to set the object’s deadline, such as 2s, 4s, 8s, 16s, and so on.

Compared to Timer, DelayQueue only implements the task management functionality and needs to be used in conjunction with asynchronous threads. DelayQueue uses a priority queue to sort tasks by priority, and the time complexity for adding a schedule and canceling a task is O(log n).

ScheduledThreadPoolExecutor #

The Timer introduced earlier is not recommended for use as it has several design flaws.

  • Timer operates in single-threaded mode. If a TimerTask takes a long time to execute, it will affect the scheduling of other tasks.
  • Timer’s task scheduling is based on the system’s absolute time, which can cause problems if the system time is incorrect.
  • If a TimerTask throws an exception during execution, Timer will not catch it, causing the thread to terminate and other tasks will never be executed.

To address the design flaws of Timer, JDK provides ScheduledThreadPoolExecutor, which offers more features. ScheduledThreadPoolExecutor provides the ability to schedule tasks periodically and with a delay. Let’s take a look at how ScheduledThreadPoolExecutor is used through an example.

public class ScheduledExecutorServiceTest {

    public static void main(String[] args) {

        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);

        executor.scheduleAtFixedRate(() -> System.out.println("Hello World"), 1000, 2000, TimeUnit.MILLISECONDS); // Delay execution by 1 second and repeat every 2 seconds

    }

}

ScheduledThreadPoolExecutor inherits from ThreadPoolExecutor, so it has the ability to asynchronously handle tasks in a thread pool. The thread pool is mainly responsible for creating and managing threads, constantly fetching tasks from its blocking queue for execution. The thread pool has two important roles: tasks and the blocking queue. ScheduledThreadPoolExecutor builds upon ThreadPoolExecutor by redesigning the ScheduledFutureTask task and the DelayedWorkQueue blocking queue. ScheduledFutureTask inherits from FutureTask and overrides the run() method to enable periodic task execution. The DelayedWorkQueue uses a priority queue internally, where the task with the nearest deadline is at the head of the queue. For tasks that are executed periodically, their time is reset after execution and they are put back into the queue. The implementation principle of ScheduledThreadPoolExecutor can be illustrated by the following diagram.

Image

Above, we have briefly introduced the three ways in which JDK implements timers. It can be said that their implementation ideas are very similar and revolve around three roles: tasks, task management, and task scheduling. The time complexity for adding and canceling tasks in these three timers is O(log n). However, when faced with scenarios involving massive task insertion and deletion, these three timers will encounter significant performance bottlenecks. Therefore, for scenarios with high performance requirements, the time wheel algorithm is generally used. Next, let’s continue the analysis of how the time wheel algorithm solves the problem of massive task insertion and deletion.

Analysis of the Time Wheel Principle #

Sometimes, technology originates from daily life. For example, the concept of queues can be derived from queuing to buy tickets, and the hierarchical structure of a company can be understood as a tree. The design concept of the time wheel algorithm is based on a clock. As shown in the figure below, a time wheel can be understood as a ring-shaped structure, divided into multiple slots just like a clock. Each slot represents a time period, and multiple tasks can be stored in each slot. The tasks about to be executed in each slot are stored in a linked list. The time wheel rotates one slot at a time with the passage of time, and executes all the tasks in the current slot.


![图片22.png](../images/CgpVE1_okKiAGl0gAAMLshtTq-M933.png)

How are tasks added to the time wheel? Tasks can be distributed to different slots based on their expiration time by taking modulo of the task's expiration time. As shown in the above diagram, the time wheel is divided into 8 slots, with each slot representing 1 second. The current pointer is pointing to slot 2. For example, if a task needs to be scheduled to execute after 3 seconds, it should be added to slot 2 + 3 = 5. If a task needs to be scheduled after 12 seconds, it should wait for the pointer to complete a full round of 4 slots, and be added to slot (2 + 12) % 8 = 6.

Now, how do we distinguish whether each task in slot 6 should be executed immediately or wait for the next round or even longer? This is why we need to save the round information in the task. For example, in the linked list of slot 6 in the diagram, there are 3 tasks. The first task has round=0 and needs to be executed immediately. The second task has round=1 and needs to wait for 1 * 8 = 8 seconds before execution. The third task has round=2 and needs to wait for 2 * 8 = 16 seconds before execution. So when the pointer moves to the corresponding slot, only the task with round=0 is executed, and the round of the remaining tasks in the slot should be decremented, waiting for execution in the next round.

The basic theory of the time wheel algorithm has been introduced above, and it can be seen that the time wheel is somewhat similar to a HashMap. If multiple tasks correspond to the same slot, the conflict resolution method used is chaining. In scenarios with a large number of tasks, increasing the number of slots in the time wheel appropriately can reduce the number of tasks traversed when the pointer moves.

The biggest advantage of the time wheel timer is that adding and canceling tasks both have O(1) time complexity, and only one thread is needed to drive the time wheel. HashedWheelTimer is the implementation class of the time wheel algorithm in Netty. Below, I will analyze the implementation principles of the time wheel algorithm in detail based on the source code of HashedWheelTimer.

### Analysis of Netty HashedWheelTimer Source Code

Before diving into the source code of HashedWheelTimer, it is necessary to understand the interface definition and related components of HashedWheelTimer in order to use it better.

#### Interface Definition

HashedWheelTimer implements the Timer interface, and the Timer interface is a good entry point for studying HashedWheelTimer. Let's take a look at the definition of the Timer interface:

public interface Timer { Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); Set stop(); }

The Timer interface provides two methods, newTimeout() to create a task and stop() to stop all tasks that have not been executed. From the method definitions, we can see that Timer can be regarded as an upper-level time wheel scheduler. With the newTimeout() method, a task TimerTask can be submitted and a Timeout object is returned. What are the roles of TimerTask and Timeout? Let's take a look at the interface definitions of TimerTask and Timeout respectively:

public interface TimerTask { void run(Timeout timeout) throws Exception; }

public interface Timeout { Timer timer(); TimerTask task(); boolean isExpired(); boolean isCancelled(); boolean cancel(); }

Timeout holds references to Timer and TimerTask, and can perform cancellation operations through the Timeout interface. The relationship between Timer, Timeout, and TimerTask is shown in the following diagram:

![图片1.png](../images/CgpVE1_okNGAJA8SAAJSJkBDij0471.png)

After understanding the concept of HashedWheelTimer's interface definition and related components, we can start using it.

#### Quick Start
# Using HashedWheelTimer

Let's see how `HashedWheelTimer` is used through this simple example.

```java
public class HashedWheelTimerTest {

    public static void main(String[] args) {

        Timer timer = new HashedWheelTimer();

        Timeout timeout1 = timer.newTimeout(new TimerTask() {

            @Override

            public void run(Timeout timeout) {

                System.out.println("timeout1: " + new Date());

            }

        }, 10, TimeUnit.SECONDS);

        if (!timeout1.isExpired()) {

            timeout1.cancel();

        }

        timer.newTimeout(new TimerTask() {

            @Override

            public void run(Timeout timeout) throws InterruptedException {

                System.out.println("timeout2: " + new Date());

                Thread.sleep(5000);

            }

        }, 1, TimeUnit.SECONDS);

        timer.newTimeout(new TimerTask() {

            @Override

            public void run(Timeout timeout) {

                System.out.println("timeout3: " + new Date());

            }

        }, 3, TimeUnit.SECONDS);

    }

}

The output of running the code is:

timeout2: Mon Nov 09 19:57:04 CST 2020

timeout3: Mon Nov 09 19:57:09 CST 2020

With just a few lines of code, we have demonstrated most of the usage of HashedWheelTimer. In the example, we start three TimerTasks using the newTimeout() method. As timeout1 is canceled, it is not executed. timeout2 and timeout3 should be executed after 1s and 3s respectively. However, as seen from the output, there is a 5s difference between the execution of timeout2 and timeout3. This is because timeout2 blocks for 5s. Therefore, it is evident that the execution of tasks in the time wheel is serial. When one task takes a long time to execute, it affects the scheduling and execution of subsequent tasks and can lead to task accumulation.

Thus, we have gained a basic understanding of how to use HashedWheelTimer. Now we will delve into the implementation principles of HashedWheelTimer.

Internal Structure #

Let’s start with the constructor of HashedWheelTimer to understand the internal implementation structure of HashedWheelTimer, combined with the previously introduced time wheel algorithm.

public HashedWheelTimer(

        ThreadFactory threadFactory,

        long tickDuration, 

        TimeUnit unit, 

        int ticksPerWheel, 

        boolean leakDetection,

        long maxPendingTimeouts) {

    // ...

    wheel = createWheel(ticksPerWheel); // Create the circular array structure of the time wheel

    mask = wheel.length - 1; // Mask for fast modulo calculation

    long duration = unit.toNanos(tickDuration); // Convert to nanoseconds

    // ...

    workerThread = threadFactory.newThread(worker); // Create the worker thread

    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // Enable memory leak detection

    this.maxPendingTimeouts = maxPendingTimeouts; // Maximum allowed number of pending tasks, an exception is thrown if the tasks exceed this threshold

    // Print an error log if the number of `HashedWheelTimer` instances exceeds 64

    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&

        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {

        reportTooManyInstances();

    }

}

The constructor of HashedWheelTimer lists several core attributes clearly:

  • threadFactory: The thread pool, but only a single thread is created.
  • tickDuration: The time it takes for the clock hand to tick, i.e., how long it takes to move to the next slot.
  • unit: The time unit of tickDuration.
  • ticksPerWheel: The total number of slots on the time wheel, defaulting to 512. The more slots allocated, the more memory space consumed.
  • leakDetection: Whether to enable memory leak detection.
  • maxPendingTimeouts: Maximum allowed number of pending tasks.

Next, let’s see how HashedWheelTimer is created. Let’s directly follow the source code of the createWheel() method:

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {

    // ...

    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);

    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];

    for (int i = 0; i < wheel.length; i ++) {

        wheel[i] = new HashedWheelBucket();

    }

    return wheel;

}

private static int normalizeTicksPerWheel(int ticksPerWheel) {

    int normalizedTicksPerWheel = 1;

    while (normalizedTicksPerWheel < ticksPerWheel) {

        normalizedTicksPerWheel <<= 1;

    }

    return normalizedTicksPerWheel;

}

private static final class HashedWheelBucket {

    private HashedWheelTimeout head;

    private HashedWheelTimeout tail;

    // ...

}

The creation of the time wheel is to create an array of HashedWheelBuckets. Each HashedWheelBucket represents a slot on the time wheel. From the structure definition of HashedWheelBucket, it can be seen that the HashedWheelBucket contains a doubly-linked list structure internally. Each node of the doubly-linked list holds a HashedWheelTimeout object, representing a timed task. Each HashedWheelBucket contains the head and tail of the doubly-linked list HashedWheelTimeout nodes, which allows traversal in different directions. The specific functionalities of HashedWheelBucket and HashedWheelTimeout will be further explained later.

Since the time wheel needs to use & for modulo calculation, the length of the array needs to be a power of 2. The purpose of the normalizeTicksPerWheel() method is to find the smallest power of 2 that is not less than ticksPerWheel. This method is not well implemented. To optimize its performance, you can refer to the implementation of tableSizeFor() in JDK’s HashMap capacity expansion. However, normalizeTicksPerWheel() is only used during initialization and does not affect the program execution.

We have finished introducing the main work of the initialization of HashedWheelTimer. Its internal structure is similar to the time wheel algorithm introduced above, as shown in the following diagram:

图片2.png

Next, let’s analyze how HashedWheelTimer implements the three basic operations of the timer: adding tasks, executing tasks, and canceling tasks.

Adding Tasks #

After the initialization of HashedWheelTimer, how do we add tasks to it? Naturally, we think of the newTimeout() method provided by HashedWheelTimer.

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

    // ...

    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {

        pendingTimeouts.decrementAndGet();

        throw new RejectedExecutionException("Number of pending timeouts ("

            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
int idx = (int) (tick & mask); // 2. 获取当前 tick 在 HashedWheelBucket 数组中对应的下标

接下来是第三步流程,移除被取消的任务。取消任务的操作是通过遍历所有的 HashedWheelBucket,然后调用 HashedWheelBucket#removeTimeout() 方法处理。具体的实现细节可以参考源码。

第四步流程是从 Mpsc Queue 中取出任务加入到对应的 HashedWheelBucket 中,通过调用 transferTimeoutsToBuckets() 方法实现。该方法的主要逻辑是从 Mpsc Queue 中取出任务,然后根据任务的 deadline 计算出在时间轮中的位置,并将任务添加到指定的 HashedWheelBucket 中。

最后一步流程是执行当前 HashedWheelBucket 中的到期任务,通过调用 HashedWheelBucket#expireTimeouts() 方法实现。在 expireTimeouts() 方法中,会遍历当前 HashedWheelBucket 中的任务,并依次执行任务的逻辑。执行完任务后,会检查任务是否重复执行或者已被取消,然后从时间轮中删除任务,并将未被取消的任务添加到未处理任务列表中。

Worker 的核心执行流程分析完毕,这样我们就基本上掌握了 HashedWheelTimer 的整体原理。有关 Mpsc Queue 的具体实现原理,后面的文章会单独进行详细分析。

小结 #

HashedWheelTimer 是 Netty 中非常常用的一个定时任务调度器,它是基于时间轮算法实现的。通过维护一个时间轮和工作线程来完成任务的调度和执行。它的核心思想是将任务装入指定的时间槽中,然后通过工作线程不断地轮询时间轮,执行到期的任务。HashedWheelTimer 具有高性能和高准确度的特点,广泛应用于 Netty 中的各种定时任务。 Next, the processCancelledTasks() method will be called by the Worker to handle the cancelled tasks. All cancelled tasks will be added to the cancelledTimeouts queue, and the Worker will dequeue tasks from the queue and remove them from the corresponding HashedWheelBucket through basic linked list operations. The source code for processCancelledTasks() is relatively simple, so we will not expand on it here.

Previously, we had a question about when tasks in the Mpsc Queue are added to the time wheel. The answer lies in the transferTimeoutsToBuckets() method.

private void transferTimeoutsToBuckets() {

    // The worker thread will process at most 100,000 tasks per tick to prevent blocking

    for (int i = 0; i < 100000; i++) {

        HashedWheelTimeout timeout = timeouts.poll();

        if (timeout == null) {

            break;

        }

        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {

            continue;

        }

        long calculated = timeout.deadline / tickDuration; // Calculate how many ticks the task needs to go through

        timeout.remainingRounds = (calculated - tick) / wheel.length; // Calculate how many rounds the task needs to go through in the time wheel

        final long ticks = Math.max(calculated, tick); // If the task has already passed its execution time in the timeouts queue, it is added to the current HashedWheelBucket

        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];

        bucket.addTimeout(timeout);

    }

}

The main function of transferTimeoutsToBuckets() is to dequeue tasks from the Mpsc Queue and add them to the corresponding HashedWheelBucket in the time wheel. The worker thread will process at most 100,000 tasks per tick to prevent the task retrieval operation from taking too long and to prevent the worker thread from being blocked by executing too many tasks.

Based on the deadline set by the user for each task, the number of ticks the task needs to go through before it can begin execution and the number of rounds it needs to go through in the time wheel (remainingRounds) can be calculated. The remainingRounds will be recorded in the HashedWheelTimeout object and used when executing the task. Since the tasks in the time wheel cannot be guaranteed to be executed in a timely manner, if a task takes a particularly long time to execute and has already passed its execution time in the timeouts queue, it will be added directly to the current HashedWheelBucket. Therefore, expired tasks will not be missed.

After the tasks are added to the time wheel, the main flow returns to Worker#run(), and the next step is to execute the expired tasks in the current HashedWheelBucket. Let’s look at the source code for HashedWheelBucket#expireTimeouts():

public void expireTimeouts(long deadline) {

    HashedWheelTimeout timeout = head;

    while (timeout != null) {

        HashedWheelTimeout next = timeout.next;

        if (timeout.remainingRounds <= 0) {

            next = remove(timeout);

            if (timeout.deadline <= deadline) {

                timeout.expire(); // Execute the task

            } else {

                throw new IllegalStateException(String.format(

                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));

            }

        } else if (timeout.isCancelled()) {

            next = remove(timeout);

        } else {

            timeout.remainingRounds --; // The execution time has not yet arrived, decrease remainingRounds by 1

        }

        timeout = next;

    }

}

The execution of the task is relatively simple - it starts traversing the doubly linked list in the HashedWheelBucket from the head. If remainingRounds <= 0, it calls expire() to execute the task. Internally, timeout.expire() calls the run() method of TimerTask. If the task has been cancelled, it is removed from the linked list. Otherwise, it means that the execution time of the task has not yet arrived, so remainingRounds is decreased by 1, and it waits for the next round.

At this point, we have finished explaining the core logic of the do-while loop in the worker thread. After the time wheel exits, the Worker will perform some post-processing work. The Worker will retrieve unexecuted and uncanceled tasks from each HashedWheelBucket, as well as tasks that can still be added to the HashedWheelBucket, and add them to the list of unprocessed tasks for unified processing by the stop() method.

Stopping the Time Wheel #

Returning to the Timer interface, we have already analyzed the newTimeout() method earlier. Next, let’s start with the stop() method to see what the time wheel does when it stops.

@Override

public Set<Timeout> stop() {

    // The time wheel cannot be stopped from the worker thread

    if (Thread.currentThread() == workerThread) {

        throw new IllegalStateException(

                HashedWheelTimer.class.getSimpleName() +

                        ".stop() cannot be called from the worker thread");

    }
                            TimerTask.class.getSimpleName());
    }

    // Try to update the worker thread's status to SHUTDOWN state using CAS operation.
    if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
        if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
            INSTANCE_COUNTER.decrementAndGet();
            if (leak != null) {
                boolean closed = leak.close(this);
                assert closed;
            }
            return Collections.emptySet();
        }
    }
    
    try {
        boolean interrupted = false;
        while (workerThread.isAlive()) {
            workerThread.interrupt(); // Interrupt the worker thread
            try {
                workerThread.join(100); // Wait for the worker thread to terminate
            } catch (InterruptedException ignored) {
                interrupted = true;
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    } finally {
        INSTANCE_COUNTER.decrementAndGet();
        if (leak != null) {
            boolean closed = leak.close(this);
            assert closed;
        }
    }
    return worker.unprocessedTimeouts(); // Return the list of unprocessed tasks
}

If the current thread is the worker thread, it is not allowed to initiate the operation of stopping the timer, in order to prevent malicious operations of stopping the timer by timed tasks. Stopping the timer mainly does three things: first, try to update the status of the worker thread to SHUTDOWN state using CAS operation; then interrupt the worker thread; finally, return the list of unprocessed tasks to the upper layer.

By now, we have finished analyzing the implementation principle of HashedWheelTimer. Let’s review a few core members of HashedWheelTimer:

  • HashedWheelTimeout: Encapsulates a task, including properties such as the deadline of the task and the number of remaining rounds it needs to go through.
  • HashedWheelBucket: Represents each slot in the wheel. Internally, it uses a doubly linked list to store the list of HashedWheelTimeouts that need to be executed.
  • Worker: The core worker engine of HashedWheelTimer, responsible for handling timed tasks.

Advanced Applications of the Timer Wheel #

In Netty, the timer wheel is driven by a fixed time interval called tickDuration. If there are no expired tasks for a long time, the timer wheel may be advanced unnecessarily, causing a certain performance overhead. Additionally, if the expiration time span of tasks is large, for example, task A is scheduled to execute after 1 second and task B is scheduled to execute after 6 hours, this will also cause unnecessary advancements.

So, is there a solution to these problems? When studying Kafka, I found that Kafka also uses a timer wheel, but its implementation approach differs from Netty’s. Kafka is designed to handle more demanding scenarios, in which timed tasks with various time granularities may exist. How does Kafka handle the time span problem? Let’s briefly introduce Kafka’s optimization approach.

The internal structure of Kafka’s timer wheel is similar to Netty’s, as shown in the following diagram. Kafka’s timer wheel also stores timed tasks using a circular array. Each slot in the array represents a bucket, and each bucket contains a list of timed tasks called TimerTaskList, which is implemented using a doubly linked list. Each node in the linked list represents an actual timed task called TimerTaskEntry.

Image

To solve the problem of unnecessary advancements, Kafka utilizes JDK’s DelayQueue to drive the timer wheel. DelayQueue stores each bucket in the timer wheel and sorts them based on their expiration time. The bucket with the closest expiration time is placed at the head of the DelayQueue. Kafka has a thread that reads the task lists from the DelayQueue. If there is no task ready for execution, the DelayQueue will be blocked until a task is ready, thus solving the unnecessary advancement problem. You may wonder, isn’t the performance of DelayQueue for insertion and removal not very good? In fact, Kafka adopts a balanced strategy by using DelayQueue in the right place. DelayQueue only stores buckets, and the number of buckets is not large compared to the impact caused by unnecessary advancements.

To solve the problem of large time spans of tasks, Kafka introduces a hierarchical timer wheel, as shown in the following diagram. When the deadline of a task exceeds the representation range of the current level, it will be attempted to be added to the upper-level timer wheel, similar to the rotation rules of the hour, minute, and second hands of a clock.

Image

From the diagram, we can see that the first level of the timer wheel has a time granularity of 1 ms, and the overall span of the timer wheel is 20 ms. The second level of the timer wheel has a time granularity of 20 ms, and the overall span is 400 ms. The third level of the timer wheel has a time granularity of 400 ms, and the overall span is 8000 ms. Each level of the timer wheel has its own pointer. After completing a full circle, the upper-level timer wheel will be advanced by one slot.

Let’s assume that a task is scheduled to expire after 450 ms, so it should be placed in the first slot of the third level of the timer wheel. As time passes, when the pointer points to that slot, it finds that there are still 50 ms remaining until the task’s expiration time. This triggers a downgrading operation, adding the task back to the timer wheel. At this point, it is found that the overall span of the first level of the timer wheel is not enough, so it needs to be placed in the third slot of the second level. After another 40 ms, the task will trigger another downgrading operation, placing it in the first level. Finally, when 10 ms pass, the task will be executed.

From this we can see that Kafka’s hierarchical timer wheel provides better control over time granularity and can handle more complex scenarios of timed task processing, making it more versatile.

Summary #

The source code of HashedWheelTimer is easy to understand, and its design principles are worth learning from. If you have a similar task processing mechanism in your development work, you can try to apply the working mode of HashedWheelTimer.

HashedWheelTimer is not without its flaws. When using it, you need to be aware of the following issues:

  • If there are no expired tasks for a long time, the timer wheel may be advanced unnecessarily.
  • It is only suitable for handling short-duration tasks. Since the Worker runs in a single thread, if a task takes too long to execute, it will block the Worker thread.
  • Compared to traditional timer implementations, it consumes more memory.