05 Massive Scheduled Tasks One Time Wheel to Set

05 Massive Scheduled Tasks One Time Wheel to Set #

In many open-source frameworks, there is a need for task scheduling functionality, such as in ZooKeeper, Netty, Quartz, Kafka, and Linux operating systems.

Java Development Kit (JDK) provides tools like java.util.Timer and DelayedQueue to assist in managing simple timed tasks. These tools use a heap data structure as their underlying implementation, with a time complexity of O(nlog(n)) for storage and retrieval operations. They are not suitable for handling a large number of timed tasks. In scenarios where there are a high volume of timed tasks with performance requirements, it is common to use a time wheel approach to reduce the time complexity of storage, retrieval, and cancellation operations to O(1).

A time wheel is an efficient scheduling model for managing timed tasks in bulk. It is typically implemented as a circular structure resembling a clock, divided into multiple slots, with each slot representing a time interval. Timed tasks are stored in a doubly-linked list within each slot. The pointer jumps periodically to a slot, at which point the timed tasks in that slot are executed.

1.png

Illustration of a circular time wheel structure

It is important to note that a single-level time wheel has a limited capacity and precision. For scenarios where high precision, a large time span, or a large number of timed tasks need to be scheduled, a multi-level time wheel combined with persistent storage is typically used.

So how does Dubbo implement a time wheel? In this lesson, we will explore the specific implementation of the time wheel in Dubbo. The time wheel implementation in Dubbo is located in the org.apache.dubbo.common.timer package of the dubbo-common module. Let’s analyze the core interfaces and implementations involved in the time wheel.

Core Interfaces #

In Dubbo, all timed tasks must inherit the TimerTask interface. The TimerTask interface is very simple and only defines a run() method, which takes an object of the Timeout interface as its parameter. The Timeout object corresponds to the TimerTask object one-to-one. Their relationship is similar to that of the Future object returned by a thread pool and the task object submitted to the thread pool. Through the Timeout object, we can not only check the status of the timed task, but also perform operations on the timed task (such as canceling the associated timed task). The methods in the Timeout interface are shown in the following diagram:

image.png

The Timer interface defines the basic behavior of a timer, as shown in the following diagram. Its core is the newTimeout() method, which submits a timed task (TimerTask) and returns the associated Timeout object. This is similar to submitting a task to a thread pool.

image

HashedWheelTimeout #

HashedWheelTimeout is the sole implementation of the Timeout interface and is an inner class of HashedWheelTimer. HashedWheelTimeout plays two roles:

  1. It serves as a node in the doubly-linked list in the time wheel, representing the container of a timed task (TimerTask) in HashedWheelTimer.
  2. It serves as a handle for the timed task (TimerTask) when it is submitted to HashedWheelTimer, allowing inspection and control of the task from outside the time wheel.

The core fields in HashedWheelTimeout are as follows:

  • prev, next (HashedWheelTimeout types): Correspond to the predecessor and successor nodes of the current timed task in the linked list.
  • task (TimerTask type): Represents the actual scheduled task.
  • deadline (long type): Indicates the execution time of the task. This time is specified when creating the HashedWheelTimeout and is calculated using the following formula: currentTime (the time when HashedWheelTimeout was created) + delay (task delay) - startTime (the start time of HashedWheel Timer), in nanoseconds.
  • state (volatile int type): Represents the current state of the timed task. There are three optional states: INIT (0), CANCELLED (1), and EXPIRED (2). In addition, there is a STATE_UPDATER field (AtomicIntegerFieldUpdater type) that ensures atomicity when updating the state.
  • remainingRounds (long type): Represents the number of remaining clock cycles for the current task. The time length that a time wheel can represent is limited. If the time difference between the expiration time of the task and the current time exceeds the duration represented by a single round of the time wheel, the remaining clock cycles will be used to indicate the remaining time.

The core methods in HashedWheelTimeout are:

  • isCancelled(), isExpired(), state() methods: Used to check the current state of HashedWheelTimeout.
  • cancel() method: Sets the state of the current HashedWheelTimeout to CANCELLED and adds it to the cancelledTimeouts queue for destruction.
  • expire() method: Called when the task expires, this method sets the state of the current HashedWheelTimeout to EXPIRED and then calls the run() method of the TimerTask it contains to execute the timed task.
  • remove() method: Removes the current HashedWheelTimeout from the time wheel.

HashedWheelBucket #

HashedWheelBucket is a slot in the time wheel. In the time wheel, a slot is essentially a container used to cache and manage a doubly-linked list. Each node in the doubly-linked list is a HashedWheelTimeout object, which is associated with a TimerTask timed task.

HashedWheelBucket holds the first and last nodes of the doubly-linked list, represented by the head and tail fields, respectively. Each HashedWheelTimeout node also holds references to its predecessor and successor, enabling traversal of the entire doubly-linked list in both directions.

Let us now look at the core methods in HashedWheelBucket:

  • addTimeout() method: Adds the HashedWheelTimeout to the end of the doubly-linked list.
  • pollTimeout() method: Removes and returns the head node of the doubly-linked list.
  • remove() method: Removes the specified HashedWheelTimeout node from the doubly-linked list.
  • clearTimeouts() method: Loops through the doubly-linked list using the pollTimeout() method, processes all timeouts that have not expired or have not been canceled, and returns them.
  • expireTimeouts() method: Traverses all HashedWheelTimeout nodes in the doubly-linked list. When handling expired timed tasks, the remove() method is used to remove them and the expire() method of the corresponding TimerTask is called to execute the timed task. For canceled tasks, they are removed using the remove() method and discarded. For tasks that have not expired, the remainingRounds field (remaining clock cycles) is decremented by one.

HashedWheelTimer #

HashedWheelTimer is an implementation of the Timer interface, which implements a timer using the algorithm of a time wheel. HashedWheelTimer selects the corresponding slot (HashedWheelBucket) based on the current time wheel pointer. It iterates through the doubly linked list starting from the head, calculates each timed task (HashedWheelTimeout), and executes it if it belongs to the current clock cycle. Otherwise, it decrements its remaining clock cycles.

Let’s take a look at the core attributes of the HashedWheelTimer.

  • workerState (volatile int type): The current state of the time wheel, with possible values of init, started, and shutdown. There is also a corresponding AtomicIntegerFieldUpdater implementation to atomically modify the workerState.
  • startTime (long type): The start time of the current time wheel. The deadline field value of the timed tasks submitted to this time wheel is calculated based on this timestamp.
  • wheel (HashedWheelBucket[] type): This array represents the circular queue of the time wheel, and each element is a slot. When the number of slots in the time wheel is specified as n, the actual value taken is the closest power of 2 greater than or equal to n.
  • timeouts, cancelledTimeouts (LinkedBlockingQueue type): The timeouts queue is used to buffer timed tasks submitted to the time wheel externally, and the cancelledTimeouts queue is used to temporarily store cancelled timed tasks. Before processing the doubly linked list of HashedWheelBucket, HashedWheelTimer will first process the data in these two queues.
  • tick (long type): This field is in HashedWheelTimer.Worker and represents the pointer of the time wheel. It is a monotonically increasing counter with a step size of 1.
  • mask (int type): Mask, mask = wheel.length - 1. Performing ticks & mask will locate the corresponding clock slot.
  • ticksDuration (long type): The actual time represented by each increment of the time pointer, in nanoseconds.
  • pendingTimeouts (AtomicLong type): The total number of pending timed tasks in the current time wheel.
  • workerThread (Thread type): The thread used by the time wheel to actually execute timed tasks.
  • worker (Worker type): The logic for actually executing timed tasks is encapsulated in this Runnable object.

The time wheel provides the newTimeout() interface to submit timed tasks externally. Before the timed tasks enter the timeouts queue, the start() method is called to start the time wheel. This completes the following two important steps:

  1. Determine the startTime field of the time wheel.
  2. Start the workerThread thread to begin executing the worker task.

After that, the deadline field of the timed task is calculated based on startTime, and finally, the timed task is wrapped into HashedWheelTimeout and added to the timeouts queue.

Let’s analyze the entire process of one rotation of the time wheel pointer.

  1. The time wheel pointer rotates, and the time wheel cycle begins.
  2. Clean up the actively cancelled timed tasks. These timed tasks are recorded in the cancelledTimeouts queue when the user cancels them. Each time the pointer rotates, the time wheel will clean this queue.
  3. Move the timed tasks cached in the timeouts queue to the corresponding slot in the time wheel.
  4. Locate the corresponding slot based on the current pointer and process the timed tasks in the doubly linked list of that slot.
  5. Check the status of the time wheel. If the time wheel is in the running state, execute the above steps in a loop to continuously execute timed tasks. If the time wheel is in the stopped state, execute the following steps to obtain the unprocessed timed tasks and add them to the unprocessedTimeouts queue: traverse each slot in the time wheel and call clearTimeouts() method; loop through the timeouts queue and invoke poll() for the timed tasks that have not been added to any slot.
  6. Finally, clean up the actively cancelled timed tasks in the cancelledTimeouts queue again.

The above core logic is in the HashedWheelTimer.Worker.run() method. If you are interested, you can take a look at the source code for further analysis.

How to Use Timed Tasks in Dubbo #

In Dubbo, the time wheel is not used directly for periodic operations. Instead, it only submits single timed tasks to the time wheel. When the previous task is completed, the newTimeout() method is invoked to submit the current task again, which will be executed in the next cycle. Even if there are delays or blocks in the task execution process due to GC or I/O blocking, etc., no additional tasks will be continuously submitted, leading to task accumulation.

The application of the time wheel in Dubbo is mainly manifested in the following two aspects:

  • Failed retry: For example, when a Provider fails to register with the registry, retry is performed, or when a Consumer fails to subscribe to the registry, retry is performed.
  • Periodic timed tasks: For example, periodically sending heartbeat requests, handling request timeouts, or reconnecting mechanisms after network disconnection.

Summary #

In this lesson, we focused on introducing the timeline-related content in Dubbo:

  • First, we introduced the problems with the Timer timer provided by the JDK and tools such as DelayedQueue, and explained the solution of the time wheel.
  • Then we discussed the abstraction of the time wheel in Dubbo and its implementation details in depth.
  • Finally, we explained the application scenarios of the time wheel in Dubbo. You will also see the presence of the time wheel in other modules of Dubbo in the following lessons.

Here’s a question for you to think about: If there are a massive number of timed tasks, and the starting time span of these tasks is very long, such as some tasks to be executed in 1 minute, some tasks to be executed in 1 hour, and some tasks to be executed in 1 year, how would you extend the time wheel to handle these timed tasks? Feel free to share your thoughts in the comments, and I look forward to seeing your answer.