19 Timing Wheel Exploring the Efficient Timing Wheel Algorithm Behind Kafka's Timer

19 TimingWheel Exploring the Efficient Timing Wheel Algorithm Behind Kafka’s Timer #

Hello, I’m Hu Xi. Today, we are going to start learning about the code implementation of delayed requests in Kafka.

Delayed requests, also known as delayed operations, refer to Kafka requests that cannot be processed temporarily due to unmet conditions. For example, requests sent by producers configured with acks=all may not be completed immediately because Kafka must ensure that all replicas in the in-sync replica set (ISR) respond successfully to this write request. Therefore, in most cases, these requests cannot be processed immediately. Only when the conditions are met or when a timeout occurs, Kafka marks the request as completed. This is what we call a delayed request.

Today, our focus is on understanding the mechanism of delayed request processing, which is the layered time wheel algorithm.

The application scope of a time wheel is very wide. Many operating systems’ task schedulers for timers (such as Crontab) and communication frameworks (such as Netty, etc.) utilize the concept of a time wheel. Almost all time task scheduling systems are based on the time wheel algorithm. Kafka manages delayed requests based on the time wheel algorithm, which is concise and decoupled from the business logic code. You can directly use it in your own project without modification.

Introduction to the Timer Wheel #

Before introducing the timer wheel, I would like you to consider this question: “If it were you, how would you implement delayed requests in Kafka?”

Regarding this question, my first reaction is to use Java’s DelayQueue. After all, this class is a naturally provided delay queue in Java, which is very suitable for modeling the processing of delayed objects. In fact, the first version of delayed requests in Kafka was implemented using DelayQueue.

However, DelayQueue has a drawback: the time complexity for inserting and removing elements from the queue is O(logN). For a scenario like Kafka, where it is easy to accumulate tens of thousands of delayed requests, the performance of this data structure becomes a bottleneck. Of course, this version of the design also has other drawbacks, such as being less efficient in clearing expired delay requests, which may lead to memory overflow. Later, the community transformed the implementation mechanism of delayed requests and adopted a time-wheel-based approach.

There are two types of time wheels: Simple Timing Wheel and Hierarchical Timing Wheel. Each has its own advantages and disadvantages, as well as its own use cases. Kafka uses the Hierarchical Timing Wheel, which is the main focus of our study.

There are many rigorous scientific papers about the Hierarchical Timing Wheel. However, most of these papers are difficult to understand and are more focused on theoretical research. However, we do not need to study this mechanism completely. What we are concerned about is how to apply it in practice. To achieve this, it is a good approach to study the source code. You need to pay attention to how Kafka implements the multi-level time wheel at the code level.

The concept of “time wheel” may be a bit abstract, so let me give you an example from everyday life to help you build some initial impressions.

Think about the wristwatch we use in our daily lives. A wristwatch is composed of an hour hand, a minute hand, and a second hand. They each have independent scales, but they are also related to each other: when the second hand completes one rotation, the minute hand advances by one grid; when the minute hand completes one rotation, the hour hand advances by one grid. This is a typical hierarchical time wheel.

Unlike a wristwatch, Kafka has its own specific terminology. In Kafka, the “grid” in the wristwatch is called a “bucket”, and “advancing” corresponds to “ticking” in Kafka. When you read the source code later, you will frequently see the words “bucket” and “tick”. You can understand them as the meanings of “grid” and “advancing” on the dial of a wristwatch.

In addition, each bucket is not a blank slate. It is actually a doubly linked cyclic list that contains a set of delayed requests.

Let me first help you understand the doubly linked cyclic list with a figure.

Doubly Linked Cyclic List

Each node in the figure has a next and prev pointer, which point to the next and previous elements, respectively. The Root is the head node of the list and does not contain any actual data. Its next pointer points to the first element in the list, and the prev pointer points to the last element.

Because it is a doubly linked list structure, code can quickly locate elements using the next and prev pointers. Therefore, the time complexity for inserting and deleting an element under a bucket is O(1). Of course, a doubly linked list requires storing two pointer data at the same time, which saves time but consumes more space. In the field of algorithms, this is a typical optimization idea of using space to save time.

Source Code Hierarchy #

How does Kafka apply hierarchical timing wheels to implement request queues?

In the diagram, there are two levels in the timing wheel, namely Level 0 and Level 1. Each timing wheel has 8 buckets, and each bucket contains a doubly-linked circular list to store delayed requests.

In the Kafka source code, the timing wheel corresponds to the TimingWheel class in the utils.timer package. The linked list in each bucket corresponds to the TimerTaskList class, and the elements in the linked list correspond to the TimerTaskEntry class. The delayed tasks stored in each list element correspond to the TimerTask class.

In these classes, there is a one-to-one relationship between TimerTaskEntry and TimerTask, TimerTaskList contains multiple TimerTaskEntry, and TimingWheel contains multiple TimerTaskList.

I have created a UML diagram to help you understand the correspondence between these classes:

Definitions of each class in the timer wheel #

Now that we have mastered these basic concepts, let’s explain how delayed requests are managed by this layered timer wheel using the source code. Based on the call hierarchy, I will provide their definitions from bottom to top.

TimerTask Class #

Firstly, let’s talk about the TimerTask class. This class is located in the TimerTask.scala file under the utils.timer package. The code for this class is only a few dozen lines long and is very easy to understand.

trait TimerTask extends Runnable {
  val delayMs: Long // typically the value of the request.timeout.ms parameter
  private[this] var timerTaskEntry: TimerTaskEntry = null
  
  def cancel(): Unit = {
    synchronized {
      if (timerTaskEntry != null) timerTaskEntry.remove()
      timerTaskEntry = null
    }
  }
  
  private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {
    synchronized {
      if (timerTaskEntry != null && timerTaskEntry != entry)
        timerTaskEntry.remove()
      timerTaskEntry = entry
    }
  }
  
  private[timer] def getTimerTaskEntry(): TimerTaskEntry = {
    timerTaskEntry
  }
}

From the code, we can see that TimerTask is a Scala trait. Each TimerTask has a delayMs field, which represents the timeout time for this task. Typically, this is the value of the client parameter request.timeout.ms. This class also binds a timerTaskEntry field because each timed task needs to know which element in which Bucket list it is located.

Since this field is bound, corresponding setter and getter methods must be provided. The getter method simply returns this field, while the setter method is slightly more complicated. Before assigning a value to timerTaskEntry, it must first determine whether this timed task is already bound to another timerTaskEntry. If it is, it must unbind it first. Additionally, the entire body of the setter method must be protected by a monitor lock to ensure thread safety.

This class also has a cancel method to cancel the timed task. The principle behind it is simply setting the associated timerTaskEntry to null. In other words, removing the timed task from the linked list.

In summary, TimerTask models the timed tasks in Kafka. Next, let’s take a look at how TimerTaskEntry carries this timed task and achieves bidirectional association in the linked list.

TimerTaskEntry Class #

As mentioned before, TimerTaskEntry represents an element in the Bucket list. The main code for this class is as follows:

private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
  @volatile
  var list: TimerTaskList = null
  var next: TimerTaskEntry = null
  var prev: TimerTaskEntry = null
  
  if (timerTask != null) timerTask.setTimerTaskEntry(this)
  
  def cancelled: Boolean = {
    timerTask.getTimerTaskEntry != this
  }
  
  def remove(): Unit = {
    var currentList = list
    while (currentList != null) {
      currentList.remove(this)
      currentList = list
    }
  }
  ...
}

This class defines the TimerTask and expirationMs fields. The TimerTask field is used to specify the timed task, and it also encapsulates the expiration timestamp field, which defines the expiration time for the timed task.

For example, suppose a PRODUCE request is sent to the Broker at 1:00, with a timeout of 30 seconds. This means that the request must be completed before 1:30, otherwise it will be considered timed out. In this case, 1:30 is the value of expirationMs.

In addition to the TimerTask field, this class also defines three fields: list, next, and prev. They correspond to the Bucket list instance and the next and previous pointers of the element itself, respectively. Note that the list field is volatile. This is because delayed requests in Kafka may be moved from one list to another by other threads. To ensure necessary memory visibility, the code declares the list field as volatile.

The methods in this class are quite straightforward. You can refer to the code comments I have written. Here, I will focus on explaining the implementation principle of the remove method. The logic of remove is to remove the TimerTask itself from the doubly linked list. Therefore, the code calls the remove method of TimerTaskList to do this. This raises a question: “How is the removal considered successful?” In fact, this is determined by whether the list field of the TimerTaskEntry is null. Once this field is set to null, the TimerTaskEntry instance becomes an “orphan” and no longer belongs to any list. From this perspective, setting it to null is equivalent to removal.

It should be noted that the action of setting it to null is performed in the remove method of TimerTaskList, which may be called simultaneously by other threads. Therefore, the code in the first part uses a while loop to ensure that the list field of TimerTaskEntry is indeed set to null. This way, Kafka can safely assume that this list element has been successfully removed.

TimerTaskList class #

After discussing TimerTask and TimerTaskEntry, let’s take a look at the TimerTaskList class. Here is its definition:

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  private[this] val root = new TimerTaskEntry(null, -1)
  root.next = root
  root.prev = root
  private[this] val expiration = new AtomicLong(-1L)
  ......
}

TimerTaskList implements the doubly linked circular list shown in the diagram. It defines a root node and two fields:

  • taskCounter, which is used to identify the total number of scheduled tasks in this list.
  • expiration, which represents the expiration timestamp of the bucket that this list belongs to.

As I mentioned earlier, each bucket corresponds to a slot on a watch face. It has a start time and an end time, so it has the concept of a time interval, which is “end time - start time = time interval”. The time intervals of buckets in the same level are the same. The bucket is considered expired only when the current time has passed the start time of the bucket. Here, the start time is the value of the expiration field in the code.

In addition to the defined fields, the TimerTaskList class also defines some important methods, such as getter and setter methods for expiration, add, remove, and flush methods.

Let’s start with the getter and setter methods for expiration:

// Setter method
def setExpiration(expirationMs: Long): Boolean = {
  expiration.getAndSet(expirationMs) != expirationMs
}

// Getter method
def getExpiration(): Long = {
  expiration.get()
}

I will explain the setter method. The code uses the CAS method getAndSet of AtomicLong to atomically set the expiration timestamp. Then it compares the new expiration timestamp with the old value to see if they are different, and returns the result.

Why do we need to compare the new and old values? This is because Kafka currently uses a DelayQueue to manage all the buckets, which are TimerTaskList objects. As the clock moves forward, existing buckets will expire and become invalid. After these buckets become invalid, the source code will reuse them. The way to reuse them is to reset the expiration time of the bucket and add them back to the DelayQueue. The purpose of the comparison here is to determine whether this bucket should be inserted into the DelayQueue.

In addition, the TimerTaskList class also provides add and remove methods, which implement the logic of inserting and removing a given scheduled task from the list, respectively. The main code of these two methods is basically the same as what we learned in the data structure class for inserting and deleting elements from a linked list, so I won’t go into detail here. You can compare this code with the code in your data structure textbook to see if they look similar.

            taskCounter = taskCounter,
            queue = queue
          )
        }
      }
    }

addOverflowWheel方法的逻辑比较简单,只有在overflowWheel为null时才进行创建,并不断调整它的滴答时长和总时长,直到能够容纳指定的定时任务超时时间为止。

接下来,我们看一下add方法。这个方法的作用是将定时任务添加到适合的Bucket中。代码如下:

private[timer] def add(timerTaskEntry: TimerTaskEntry): Unit = {
      // 计算定时任务的延时时间
      var expiration = timerTaskEntry.expirationMs
      // 如果延时时间小于tickMs,则将其放入当前Bucket中
      if (expiration < currentTime + tickMs) {
        // 计算定时任务的round(偏移轮子数)
        val virtualId = expiration / tickMs
        // 计算Bucket的索引
        val bucketIndex = (virtualId % wheelSize).toInt
        // 获取Bucket
        val bucket = buckets(bucketIndex)
        // 将定时任务放入Bucket中
        bucket.add(timerTaskEntry)
        // 如果是当前时间轮上的Bucket,则调用queue.offer方法将它放入延迟队列。
        if (bucket.setExpiration(virtualId * tickMs))
          queue.offer(bucket)
      } else {
        // 如果延时时间大于tickMs,则尝试将它放入上级时间轮
        var overflowWheelAdded = false
        // 持续尝试添加到上层时间轮中
        var t = this
        breakable {
          for (_ <- 0 until wheelSize - 1) {
            if (t.overflowWheel == null)
              t.addOverflowWheel()
            t = t.overflowWheel
            // 计算定时任务的round(偏移轮子数)
            val virtualId = expiration / t.tickMs
            // 计算Bucket的索引
            val bucketIndex = (virtualId % t.wheelSize).toInt
            // 获取Bucket
            val bucket = t.buckets(bucketIndex)
            // 将定时任务放入Bucket中
            bucket.add(timerTaskEntry)
            // 如果是当前时间轮上的Bucket,则调用queue.offer方法将它放入延迟队列。
            if (bucket.setExpiration(virtualId * t.tickMs))
              queue.offer(bucket)
            // 如果成功放入了上层时间轮中,就跳出循环
            if (!overflowWheelAdded) {
              overflowWheelAdded = true
              break
            }
          }
        }
      }
    }

add方法的逻辑比较复杂,但是核心思路还是比较容易理解的。

首先,计算定时任务的延时时间expiration。如果延时时间小于滴答一次的时长tickMs,则可以直接将定时任务放入当前Bucket中。

接下来,计算定时任务的round(偏移轮子数)virtualId,并根据virtualId计算Bucket的索引bucketId。然后,获取对应的Bucket对象bucket,并将定时任务添加到bucket中。

如果bucket是当前时间轮上的Bucket(即当前时间轮的滴答一次时间),则将其添加到延迟队列中。

如果延时时间大于滴答一次的时长tickMs,那么需要尝试将定时任务添加到上层时间轮。从代码中可以看出,会循环尝试将定时任务添加到上层时间轮中,直到成功放入或者没有上层时间轮为止。

最后,我们来看一下advanceClock方法。这个方法是时间轮推进的核心方法,它用于往前推进一格Bucket,并将过期的Bucket添加到延迟队列中。代码如下:

private[timer] def advanceClock(timeMs: Long): Unit = {
      // 更新当前时间戳
      currentTime = timeMs - (timeMs % tickMs)
      // 如果有下层时间轮,就推进下层时间轮的Bucket
      if (overflowWheel != null) {
        overflowWheel.advanceClock(currentTime)
      }
      // 取出所有已到期的Bucket,将它们添加到延迟队列中
      processExpired()
    }

advanceClock方法首先更新当前时间戳currentTime,然后递归地调用下层时间轮的advanceClock方法。

最后,调用processExpired方法,将所有已到期的Bucket从时间轮中取出,并添加到延迟队列中。

到此为止,我们就完成了Timer类中的主要代码解析。在接下来的一节课中,我们将分析Kafka是如何使用这个Timer类来实现基于时间的调度和任务管理功能的。

taskCounter = taskCounter,
queue
)
}
}
}

This method creates a new instance of TimingWheel, which is the upper-level time wheel. The tick duration is equal to the total duration of the lower-level time wheel, and the number of wheels is the same for each layer. After creating the instance, the code assigns the newly created instance to the overflowWheel field. The method ends here.

Next, let’s study the add and advanceClock methods. First is the add method, the code and comments are as follows:

def add(timerTaskEntry: TimerTaskEntry): Boolean = {
  // Get the expiration timestamp of the timer task
  val expiration = timerTaskEntry.expirationMs
  // If the task has been cancelled, no need to add it, just return false
  if (timerTaskEntry.cancelled) {
    false
  // If the expiration time of the task has already passed
  } else if (expiration < currentTime + tickMs) {
    false
  // If the expiration time of the task is within the coverage of this level of time wheel
  } else if (expiration < currentTime + interval) {
    val virtualId = expiration / tickMs
    // Calculate which Bucket the task should be put into
    val bucket = buckets((virtualId % wheelSize.toLong).toInt)
    // Add the task to the Bucket
    bucket.add(timerTaskEntry)
    // Set the expiration time of the Bucket
    // If this time has changed, it means that the Bucket is newly created or reused, so add it back to the DelayQueue
    if (bucket.setExpiration(virtualId * tickMs)) {
      queue.offer(bucket)
    }
    true
  // If the task cannot be accommodated in this level of time wheel, hand it over to the upper-level time wheel
  } else {
    // Create the upper-level time wheel if it does not exist
    if (overflowWheel == null) addOverflowWheel()
    // Add the task to the upper-level time wheel
    overflowWheel.add(timerTaskEntry)
  }
}

I will explain what the add method does with the help of a diagram:

The first step of the method is to get the expiration timestamp of the timer task. The expiration timestamp is the point in time when the timer task expires.

The second step is to check if the task has been cancelled. If it has been cancelled, there is no need to add it to the time wheel. If it hasn’t been cancelled, then check if the task has already expired. If it has expired, there is no need to add it to the time wheel. If it hasn’t expired, then check if the expiration time of the task can be covered by the time range of this level of the time wheel. If it can be covered, proceed to the next step.

The third step is to calculate the target Bucket index, which is the TimerTaskList where the task needs to be stored. Let me give you an example to explain how the target Bucket is calculated.

As I mentioned before, the first level of the time wheel has 20 Buckets, each with a tick duration of 1 millisecond. So, the tick duration of the second level of the time wheel should be 20 milliseconds, and the total duration should be 400 milliseconds. The time range of the first Bucket of the second level should be [20, 40), and the time range of the second Bucket should be [40, 60), and so on. Let’s say we have a timeout timestamp for a delayed request, and it is 237. In that case, it should be inserted into the eleventh Bucket.

Once the target Bucket index is determined, the code will add the timer task to this Bucket and update the expiration timestamp of the Bucket. In the example I just mentioned, the starting time of the eleventh Bucket should be the largest multiple of 20 that is less than 237, which is 220.

The fourth step is to add the Bucket to the DelayQueue if the Bucket is inserted with a task for the first time. This allows Kafka to easily get expired Buckets and remove them. If the expiration time of the timer task cannot be covered by this level of time wheel, then create the upper level time wheel if necessary, and then execute all the logic mentioned above on the upper level time wheel.

Now that we have covered the add method, let’s take a look at the advanceClock method. As the name suggests, it is used to advance the clock. The code is as follows:

def advanceClock(timeMs: Long): Unit = {
  // The clock needs to be advanced beyond the time range of the Bucket to make it meaningful
  // Update the current time currentTime to the start time of the next Bucket
  if (timeMs >= currentTime + tickMs) {
    currentTime = timeMs - (timeMs % tickMs)
    // Also try to advance the clock for the upper level time wheel
    if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
  }
}

The timeMs parameter represents the point in time to which the clock should be advanced. The clock needs to be advanced beyond the time range of the Bucket to make it meaningful. If it is within the time range of the Bucket, nothing is done because it is still within the current Bucket.

On the other hand, once the time exceeds the time range covered by the Bucket, the code updates the current time currentTime to the start time of the next Bucket, and recursively advances the clock for the upper level time wheel if it exists. The clock advancing operation is triggered by a Kafka-specific Reaper thread running in the background.

Today, I have mentioned the deletion of expired Buckets several times, which is done by this Reaper thread. In the next lesson, we will talk about this Reaper thread.

Summary #

Today, I briefly introduced the concept of a timing wheel mechanism and explained the implementation of a hierarchical timing wheel in Kafka using code examples. Kafka uses this hierarchical timing wheel mechanism to handle delayed requests. At the source code level, Kafka defines four classes to build the entire hierarchical timing wheel system.

  • TimerTask class: Models the delayed requests in Kafka. It is a Runnable class, and Kafka uses a separate thread to asynchronously add delayed requests to the timing wheel.
  • TimerTaskEntry class: Models the elements of the linked list of delayed requests under a timing wheel bucket. It encapsulates the TimerTask object and the expiration timestamp of the timed task.
  • TimerTaskList class: Models the doubly linked list of delayed requests under a timing wheel bucket, providing O(1) time complexity for request insertion and deletion.
  • TimingWheel class: Models the timing wheel type, managing all the buckets and timed tasks under it.

In the next lecture, we will continue to learn about Kafka’s delayed requests and the source code of the DelayedOperation family that manages them. Only by understanding the code of DelayedOperation and its specific implementation subclasses, can we fully understand how Kafka handles requests that cannot be processed in a timely manner.

In distributed systems, how to elegantly and efficiently handle delayed tasks is one of the challenges that designers face. I recommend you to study the application code of this implementation mechanism in Kafka thoroughly, try to apply it in practice, and add it to your toolbox.

After-class Discussion #

Why is the overflowWheel variable in the TimingWheel class defined as volatile?

Please feel free to leave your comments and discuss with me in the comment section. You are also welcome to share today’s content with your friends.