20 Delayed Operation How Is a Broker's Delayed Processing of Requests Implemented

20 DelayedOperation How is a broker’s Delayed Processing of Requests Implemented #

Hello, I am Hu Xi.

In the previous lesson, we learned about the implementation of the hierarchical time wheel in Kafka. Since it is a hierarchical time wheel, it means that the constructed time wheel in the source code has multiple levels. The total duration represented by each level is equal to the number of buckets in that level multiplied by the time range covered by each bucket. Additionally, this total duration becomes the time range covered by an individual bucket in the next level.

For example, currently, the fixed duration of the first level of the Kafka time wheel is 20 milliseconds (interval), which means there are 20 buckets (wheelSize) and each bucket covers a time range of 1 millisecond (tickMs). The total duration of the second level is 400 milliseconds, also with 20 buckets, and each bucket covers 20 milliseconds. Following this pattern, the duration of the third level of the time wheel is 8 seconds because each bucket in this level has a duration of 400 milliseconds, and there are 20 buckets in total.

Based on this design, each delayed request needs to determine which level of the time wheel it should be saved in based on its own timeout. Assuming that the first level of the time wheel is created at t=0, the first bucket of this level would save the delayed requests within the range [0, 1), and the second bucket would save the requests within the range [1, 2). Now, if there are two delayed requests with timeouts at 18.5 milliseconds and 123 milliseconds respectively, the first request should be saved in the 19th bucket (starting from index 1) of the first level, and the second request should be saved in the 6th bucket of the second level of the time wheel.

This is basically the implementation principle of the hierarchical time wheel in Kafka. Kafka continuously advances the clocks of the time wheels at different levels and, according to the tick duration of each time wheel, processes the delayed tasks under each bucket, thus achieving the delayed processing of requests.

However, if you look closely, you will notice that so far, this hierarchical time wheel code and Kafka concepts are not directly related. For example, the hierarchical time wheel does not involve concepts such as topics, partitions, and replicas, nor does it directly interact with Kafka components like the Controller and replica manager. However, delayed request processing is an important feature of Kafka. You may wonder which part of Kafka’s source code is responsible for creating and maintaining this hierarchical time wheel and integrating it into the overall framework. The answer lies in the two classes that will be introduced next: Timer and SystemTimer.

Timer Interface and System Timer #

The source code for these two classes is located in the Timer.scala file in the utils.timer package. The Timer interface defines the methods for managing delayed operations, while SystemTimer is the key implementation for performing delayed operations. When we learn about the DelayedOperation class later, we’ll find that all operations on the hierarchical timer are performed through the SystemTimer class.

Timer Interface #

Let’s take a look at their source code. First, let’s look at the Timer interface, defined as follows:

trait Timer {
  // Insert the given timer task into the timer wheel, and wait for delayed execution
  def add(timerTask: TimerTask): Unit
  // Advance the clock, and execute the delayed tasks that have reached their expiration time
  def advanceClock(timeoutMs: Long): Boolean
  // Get the total number of timer tasks on the timer wheel
  def size: Int
  // Shut down the timer
  def shutdown(): Unit
}

The Timer interface defines four methods:

  • add method: Insert the given timer task into the timer wheel and wait for delayed execution.
  • advanceClock method: Advance the clock and execute the delayed tasks that have reached their expiration time.
  • size method: Get the current total number of timer tasks.
  • shutdown method: Shut down the timer.

Among them, the two most important methods are add and advanceClock, which are the key steps for processing delayed requests. Next, let’s focus on these two methods in the context of the implementation class of the Timer interface, SystemTimer.

SystemTimer Class #

SystemTimer is the implementation class for the Timer interface. It is a timer class that encapsulates the hierarchical timer object and provides delayed request management functionality for the Purgatory. The Purgatory refers to the buffer that stores delayed requests, which are unable to be completed due to not meeting the conditions, but are not yet timed out.

Let’s learn about the SystemTimer class from two perspectives: definition and methods.

Definition #

First, let’s look at the definition of this class:

class SystemTimer(executorName: String,
                  tickMs: Long = 1,
                  wheelSize: Int = 20,
                  startMs: Long = Time.SYSTEM.hiResClockMs) extends Timer {
  // Single-threaded thread pool for asynchronous execution of timer tasks
  private[this] val taskExecutor = Executors.newFixedThreadPool(1,
    (runnable: Runnable) => KafkaThread.nonDaemon("executor-" + executorName, runnable))
  // Delay queue to store all buckets, i.e., all TimerTaskList objects
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  // Total number of timer tasks
  private[this] val taskCounter = new AtomicInteger(0)
  // Timing wheel object
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs,
    wheelSize = wheelSize,
    startMs = startMs,
    taskCounter = taskCounter,
    delayQueue
  )
  // Maintain a thread-safe read-write lock
  private[this] val readWriteLock = new ReentrantReadWriteLock()
  private[this] val readLock = readWriteLock.readLock()
  private[this] val writeLock = readWriteLock.writeLock()
  ...
}

Each instance of the SystemTimer class defines four native fields: executorName, tickMs, wheelSize, and startMs.

tickMs and wheelSize are the basis for building the hierarchical timer, which you should focus on. However, I have already explained them in the previous lesson, and I also used specific numbers to review their purposes at the beginning, so I won’t repeat them here. The other two parameters are not very important, you just need to know their meanings.

  • executorName: The name of the Purgatory. Kafka has different Purgatories, such as the Produce buffer specifically for processing producer delayed requests, the Fetch buffer for processing consumer delayed requests, etc. In this case, Produce and Fetch are executor names.
  • startMs: The start time of the SystemTimer, in milliseconds.

In addition to the native fields, the SystemTimer class also defines some other fields. I’ll explain three important ones. These three fields are closely related to the timing wheel.

  1. delayQueue field: It stores all the Bucket objects managed by this timer. Since it is a DelayQueue, you can only retrieve objects from this queue after the Bucket has expired. The advanceClock method of the SystemTimer class relies on this feature to drive the clock forward. We will discuss this in detail later.
  2. timingWheel field: The TimingWheel class implements the hierarchical timer. The SystemTimer class relies on it to operate the hierarchical timer.
  3. taskExecutor field: It is a single-threaded thread pool used to asynchronously execute scheduled tasks.

Methods #

Now that we’ve covered the class definition and fields, let’s look at the methods of the SystemTimer class.

This class defines six methods: add, addTimerTaskEntry, reinsert, advanceClock, size, and shutdown.

Among them, the size method calculates the total number of delayed requests for the given Purgatory, the shutdown method shuts down the thread pool mentioned earlier, and the addTimerTaskEntry method inserts the given TimerTaskEntry into the timing wheel. If the scheduled task represented by the TimerTaskEntry has not expired or has been canceled, the method also submits the expired timed task to the thread pool for asynchronous execution. As for the reinsert method, it calls addTimerTaskEntry to reinsert the timed task back into the timing wheel. Actually, the most important methods in the SystemTimer class are add and advanceClock, because they are the ones that provide services to the outside world. Let’s start with the add method. The purpose of the add method is to insert the given timer task into the timing wheel for management. The code is as follows:

def add(timerTask: TimerTask): Unit = {
  // Acquire a read lock. If no thread holds the write lock,
  // multiple threads can add timer tasks to the timing wheel simultaneously
  readLock.lock()
  try {
    // Call addTimerTaskEntry to insert the task
    addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
  } finally {
    // Release the read lock
    readLock.unlock()
  }
}

The add method calls the addTimerTaskEntry method to perform the insertion. Here is the code for the addTimerTaskEntry method:

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
  // Determine what logic to execute based on the state of the timerTaskEntry:
  // 1. Not expired and not canceled: add to the timing wheel
  // 2. Canceled: do nothing
  // 3. Expired: submit to the thread pool for execution
  if (!timingWheel.add(timerTaskEntry)) {
    // If the timer task is not canceled, it means that it has expired
    // Otherwise, the timingWheel.add method should return True
    if (!timerTaskEntry.cancelled)
      taskExecutor.submit(timerTaskEntry.timerTask)
  }
}

The add method of the TimingWheel returns False when the timer task is canceled or expired. Otherwise, it adds the timer task to the timing wheel and returns True. Therefore, the behavior of the addTimerTaskEntry method depends on the state of the given timer task:

  1. If the task is neither canceled nor expired, the addTimerTaskEntry method adds it to the timing wheel.
  2. If the task is canceled, the method does nothing and returns directly.
  3. If the task is expired, it submits it to the corresponding thread pool for future execution.

Another key method is advanceClock. As the name suggests, its purpose is to advance the clock. Let’s look at the code:

def advanceClock(timeoutMs: Long): Boolean = {
  // Get the next expired Bucket from the delayQueue
  var bucket = delayQueue.poll(
    timeoutMs, TimeUnit.MILLISECONDS)
  if (bucket != null) {
    // Acquire the write lock
    // Once a thread holds the write lock, any other thread executing the add or advanceClock method will be blocked
    writeLock.lock()
    try {
      while (bucket != null) {
        // Advance the timing wheel's clock to the expiration of the Bucket
        timingWheel.advanceClock(bucket.getExpiration())
        // Reinsert all the timer tasks under this Bucket back into the timing wheel
        bucket.flush(reinsert)
        // Read the next Bucket
        bucket = delayQueue.poll()
      }
    } finally {
      // Release the write lock
      writeLock.unlock()
    }
    true
  } else {
    false
  }
}

Since the logic of the code is quite complex, let me illustrate with a diagram:

The advanceClock method traverses all the Buckets in the delayQueue and advances the clock of the timing wheel to their expiration times, making them expire. Then, it reinserts all the timer tasks under these Buckets back into the timing wheel.

Let me use a diagram to explain this reinsert process.

From this diagram, we can see that at T0, task ① is placed on Level 0 of the timing wheel, while tasks ② and ③ are placed on Level 1. At this time, the clock is advanced to the 0th Bucket of Level 0 and the 0th Bucket of Level 1.

When T19 arrives, the clock is advanced to the 19th Bucket of Level 0, and task ① is executed. However, since one level of the timing wheel has 20 Buckets, the timing wheel at Level 0 has not completed a full rotation at T19. Therefore, the timing wheel at Level 1 remains unchanged.

When T20 arrives, the timing wheel at Level 0 has completed one rotation, and the timing wheel at Level 1 has ticked once, advancing one slot. At this time, Kafka needs to insert tasks ② and ③ into the timing wheel at Level 0, at the 20th and 21st Buckets respectively. This process of inserting tasks from higher level timing wheels into lower level timing wheels is handled by the reinsert method in the advanceClock method.

As for why the tasks need to be reinserted into the lower level timing wheels, it is because as the clock advances, the current time gradually approaches the expiration times of tasks ② and ③. The reduction in the difference between them is sufficient to allow them to be inserted into the lower level timing wheel.

In summary, the SystemTimer class implements the methods of the Timer interface and encapsulates the underlying layered timing wheel, providing convenient methods for higher-level callers to operate on the timing wheel. So, who are the higher-level callers? The answer is the DelayedOperationPurgatory class. This is where we model Purgatory.

However, before we understand DelayedOperationPurgatory, we need to learn about another important class: DelayedOperation. The former is a generic class, and its type parameter is precisely DelayedOperation. Therefore, it is impossible to fully grasp DelayedOperationPurgatory without understanding DelayedOperation.

DelayedOperation Class #

This class is located in the DelayedOperation.scala file under the server package. It is the abstract parent class for all Kafka delayed request classes. Let’s analyze this class from two dimensions: definition and methods.

Definition #

Let’s start with the definition. The code is as follows:

abstract class DelayedOperation(override val delayMs: Long,
                                lockOpt: Option[Lock] = None)
  extends TimerTask with Logging {
  // Indicates whether the delayed operation has been completed
  private val completed = new AtomicBoolean(false)
  // Prevents lock contention when multiple threads attempt to check if the operation is ready to be completed, thereby preventing timeout
  private val tryCompletePending = new AtomicBoolean(false)
  private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
  ......
}

The DelayedOperation class is an abstract class, and the only parameter required in its constructor is the timeout duration. This timeout duration is usually the timeout value for client requests, which is the value of the client parameter request.timeout.ms. This class implements the TimerTask interface that we learned in the previous lesson. Therefore, as a class for modeling delayed operations, it automatically inherits the cancel method of the TimerTask interface, which supports canceling the delayed operation, as well as the getter and setter methods of TimerTaskEntry, which support binding the delayed operation to a specific linked list element in the time wheel’s corresponding bucket.

In addition, the DelayedOperation class defines two additional fields: completed and tryCompletePending.

The former is relatively easy to understand; it is a Boolean variable that indicates whether the delayed operation is completed. I will explain the purpose of tryCompletePending.

This parameter was introduced in version 1.1. Before that, only completed existed. However, this could lead to a problem: when multiple threads simultaneously check if a delayed operation satisfies the completion condition, if one thread acquires the lock (i.e., the lock field above) and performs the condition check, it may find that the condition is not satisfied. At the same time, another thread checks the condition and finds that it is satisfied, but this thread does not acquire the lock. In this case, the delayed operation will never be checked again, resulting in a timeout.

The purpose of adding the tryCompletePending field is to ensure that the thread that acquires the lock has an opportunity to check if the condition is already satisfied. So how is this achieved? I will explain in more detail when we discuss the maybeTryComplete method.

You have learned the definition of the DelayedOperation class. The key is to understand how these fields are used in the methods.

Methods #

The DelayedOperation class has 7 methods. Let me introduce their functions so that you can have a clear understanding while reading the source code.

  • forceComplete: Forcibly completes the delayed operation, regardless of whether it satisfies the completion condition. This method should be called whenever the operation satisfies the completion condition or has expired.
  • isCompleted: Checks whether the delayed operation has been completed. The source code uses this method to determine how to handle the operation next. For example, if the operation has already been completed, it is usually necessary to cancel the operation.
  • onExpiration: Callback method for executing expiration logic after force completion. Only the thread that actually completes the operation is eligible to call this method.
  • onComplete: Processing logic required to complete the delayed operation. This method is only called in the forceComplete method.
  • tryComplete: The top-level method that attempts to complete the delayed operation, which internally calls the forceComplete method.
  • maybeTryComplete: A thread-safe version of the tryComplete method. This method was added to the community later, but it has gradually replaced tryComplete. Currently, external code calls this method.
  • run: Calls the expiration logic after the delayed operation times out, i.e., a combination of forceComplete and onExpiration.

We mentioned that DelayedOperation is an abstract class, and the onExpiration, onComplete, and tryComplete methods have different implementations for different types of delayed requests. Therefore, subclasses need to implement them.

Most of the code in other methods is short and concise, and you can understand them at a glance, so I won’t explain them in detail. I will focus on the maybeTryComplete method. After all, this is the effort made by the community to avoid thread blocking caused by lock contention when multiple threads access it, which in turn causes request timeouts. Let’s first look at the code of the method:

private[server] def maybeTryComplete(): Boolean = {
  var retry = false  // Whether to retry
  var done = false   // Whether the delayed operation has been completed
  do {
    if (lock.tryLock()) {   // Attempt to acquire the lock object
      try {
        tryCompletePending.set(false)
        done = tryComplete()
      } finally {
        lock.unlock()
      }
      // The thread that reaches this point holds the lock. Other threads can only execute the else branch code.
      // If another thread sets maybeTryComplete to true, then retry=true
      // This means that the other thread gives this thread an opportunity to retry
      retry = tryCompletePending.get()
    } else {
      // The thread that reaches this point did not acquire the lock
      // Set tryCompletePending=true to give the thread that holds the lock an opportunity to retry
      retry = !tryCompletePending.getAndSet(true)
    }
  } while (!isCompleted && retry)
  done
}

To help you understand, I have created a flowchart to illustrate its logic:

Flowchart

From the flowchart, we can see that this method may be accessed by multiple threads simultaneously, but different threads will take different code paths at the fork point, which is the if statement of attempting to acquire the lock.

If the lock object is acquired, it then sequentially performs actions such as clearing the tryCompletePending state, completing the delayed request, releasing the lock, and reading the latest retry state. For threads that fail to acquire the lock, they can only set the tryCompletePending state to indirectly affect the retry value, thereby giving the thread that acquires the lock an opportunity to retry. This retry is implemented through the do...while loop.

Alright, this is all about the DelayedOperation class. In addition to these public methods, it would be best for you to examine the implementation of one or two specific subclasses’ methods to understand how the tryComplete method is implemented in concrete delayed request classes. I recommend you start with the tryComplete method of the DelayedProduce class.

We previously mentioned that the PRODUCE request with acks=all is prone to become a delayed request because it must wait for all ISR replicas to synchronize the message before it can be completed. You can investigate how the tryComplete method was implemented in the DelayedProduce class based on this idea.

DelayedOperationPurgatory Class #

Next, let’s fill in the last piece of the “puzzle” of the delayed request module: the analysis of the DelayedOperationPurgatory class.

This class is where the Purgatory is implemented. From the code structure, it is a Scala companion object. In other words, the source code file defines both the DelayedOperationPurgatory Object and Class. The Object only defines the apply factory method and a field named Shards, which is the array length information of the DelayedOperationPurgatory monitoring list. Therefore, we will focus on studying the source code of the DelayedOperationPurgatory Class.

As mentioned earlier, the DelayedOperationPurgatory class is a generic class, with the parameter type being a specific subclass of DelayedOperation. Therefore, in general, each type of delayed request corresponds to a DelayedOperationPurgatory instance. These instances are usually stored in the upper-level manager. For example, the Purgatory instance related to the consumer group’s heartbeat request and join group request is stored in the GroupCoordinator component, while the Purgatory instance related to the producer’s PRODUCE request is stored in the partition object or replica state machine.

Definition #

As for how to learn, let’s follow the usual practice and start with the definition. The code is as follows:

final class DelayedOperationPurgatory[T <: DelayedOperation](
  purgatoryName: String, 
  timeoutTimer: Timer, 
  brokerId: Int = 0, 
  purgeInterval: Int = 1000, 
  reaperEnabled: Boolean = true, 
  timerEnabled: Boolean = true) extends Logging with KafkaMetricsGroup {
  ......
}

There are 6 fields in the definition. Many of the fields have default parameters, such as the last two parameters indicating whether to start the purging thread and whether to enable the hierarchical time wheel. Currently, all types of Purgatory instances in the source code are started by default, so there is no need to pay special attention to them.

The purgeInterval parameter is used to control the frequency at which the purging thread removes expired delayed requests from the Bucket. In most cases, it is once every second. However, for producers, consumers, and the AdminClient that deletes messages, Kafka defines dedicated parameters to adjust this frequency. For example, the producer parameter producer.purgatory.purge.interval.requests is used for this purpose.

In fact, only two parameters need to be passed: purgatoryName and brokerId, which represent the name of this Purgatory and the sequence number of the Broker, respectively.

As for the timeoutTimer, it is the SystemTimer instance we discussed earlier, so I won’t explain it again here.

Watchers and WatcherList #

The DelayedOperationPurgatory also defines two inner classes, Watchers and WatcherList.

Watchers is a monitoring linked list for a delayed request based on the Key. Its main code is as follows:

private class Watchers(val key: Any) {
  private[this] val operations = 
    new ConcurrentLinkedQueue[T]()
  // other methods...
}

Each Watchers instance defines a linked list of delayed requests, and the Key here can be of any type, such as a string representing the consumer group or a TopicPartitionOperationKey representing the topic partition. You don’t need to exhaust all the key types here, but it is important to understand that Watchers is a generic linked list for delayed requests used by Kafka to monitor the completion status of the delayed requests stored in it.

Since the main data structure of Watchers is a linked list, all its methods are essentially linked list operations. For example, the tryCompleteWatched method will traverse the entire linked list and attempt to complete the delayed requests in it. Similarly, the cancel method also traverses the linked list to cancel the delayed requests. As for the watch method, it adds the delayed request to the linked list.

After discussing Watchers, let’s take a look at the WatcherList class. It is very short and concise, and the complete code is as follows:

private class WatcherList {
  // A group of Watchers objects grouped by Key
  val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
  val watchersLock = new ReentrantLock()
  // Returns all Watchers objects
  def allWatchers = {
    watchersByKey.values
  }
}

The most important field of WatcherList is watchersByKey. It is a Pool, which is a pool object defined by Kafka and essentially a ConcurrentHashMap. The Key of watchersByKey can be of any type, and the Value is a group of Watchers objects corresponding to the key.

After discussing the two inner classes of the DelayedOperationPurgatory class, Watchers and WatcherList, we can start learning about the two important methods of this class: tryCompleteElseWatch and checkAndComplete.

The purpose of the former is to check if the operation can be completed, and if not, add it to the WatcherList corresponding to the key. The code of this method is as follow:

def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
assert(watchKeys.nonEmpty, "The watch key list can't be empty")
var isCompletedByMe = operation.tryComplete()
// If the delayed request is completed by the current thread, return true directly
if (isCompletedByMe)
    return true
var watchCreated = false
// Traverse all the keys to be monitored
for(key <- watchKeys) {
    // Check the completion status of the request again. If it is completed, it means it is completed by other threads, return false
    if(operation.isCompleted)
        return false
    // Otherwise, add the operation to the WatcherList belonging to the key
    watchForOperation(key, operation)
    // Set the watchCreated flag to indicate that the task has been added to the WatcherList
    if(!watchCreated) {
        watchCreated = true
        // Update the total number of operations in the Purgatory
        estimatedTotalOperations.incrementAndGet()
    }
}
// Try to complete the delayed request again
isCompletedByMe = operation.maybeTryComplete()
if (isCompletedByMe)
    return true
// If the request still cannot be completed, add it to the expiration queue
if(!operation.isCompleted) {
    if (timerEnabled)
        timeoutTimer.add(operation)
    if(operation.isCompleted) {
        operation.cancel()
    }
}
false

}

The name of this method reflects what it does: it first tries to complete the request, and if it fails, it adds it to the WatcherList for monitoring. Specifically, tryCompleteElseWatch calls the tryComplete method to try to complete the delayed request. If the result is true, it means that the thread executing the tryCompleteElseWatch method has completed the delayed request normally, so there is no need to add it to the WatcherList, and it can simply return true.

Otherwise, the code will traverse all the keys to be monitored and check the completion status of the request again. If it is completed, it means it is completed by other threads, so return false. If it still cannot be completed, add the request to the WatcherList of the key to wait for completion. At the same time, set the watchCreated flag to indicate that the task has been added to the WatcherList and update the total number of requests in the Purgatory.

After traversing all the keys, the source code will try to complete the delayed request again. If it is completed, return true; otherwise, cancel the request and add it to the expiration queue, and finally return false.

In summary, you need to understand the two things this method does:

  1. Try to complete the delayed request first.
  2. If not possible, add it to the WatcherList and wait for further attempts.

Where is the code retrying? This requires the use of the second method, checkAndComplete.

This method checks whether the delayed requests in the WatcherList for the given key meet the completion conditions. If yes, it completes them. Let’s take a look at the source code together:

def checkAndComplete(key: Any): Int = {
  // Get the WatcherList for the given key
  val wl = watcherList(key)
  // Get the Watchers instance corresponding to the key in the WatcherList
  val watchers = inLock(wl.watchersLock) { wl.watchersByKey.get(key) }
  // Try to complete the delayed requests that meet the completion conditions and return the number of requests successfully completed
  val numCompleted = if (watchers == null)
    0
  else
    watchers.tryCompleteWatched()
  debug(s"Request key $key unblocked $numCompleted $purgatoryName operations")
  numCompleted
}

The code is simple. It retrieves the WatcherList object corresponding to the given key and the Watchers instance saved under it. Then it tries to complete the delayed requests that meet the completion conditions and returns the number of requests successfully completed.

It can be seen that an important step is to call the tryCompleteWatched method of Watchers to try to complete the delayed requests that already meet the completion conditions.

Summary #

Today, we focused on the upper-level components of the hierarchical time wheel, including the Timer interface and its implementation class, SystemTimer, the DelayedOperation class, and the DelayedOperationPurgatory class. You can think of them as being called in a hierarchical manner, that is, DelayedOperation calls the SystemTimer class, and DelayedOperationPurgatory manages DelayedOperation. Together, they implement the handling of delayed requests at the broker end, with the basic idea being to complete requests that can be completed immediately, otherwise they are placed in a buffer called Purgatory. The methods of the DelayedOperationPurgatory class will automatically process these delayed requests.

Let’s review the key points.

  • SystemTimer class: A timer class defined by Kafka, which encapsulates the underlying hierarchical time wheel, implements the management of time wheel buckets, and advances the clock. It forms the basis for the automatic processing of delayed requests.
  • DelayedOperation class: An abstract class for delayed requests, providing the implementation for completing requests, as well as callbacks for request completion and expiration.
  • DelayedOperationPurgatory class: The implementation class of Purgatory, which defines the WatcherList object and the methods for manipulating the WatcherList. The WatcherList is a key data structure for implementing the automatic processing of delayed requests.

In general, the delayed request module is a relatively obscure component of Kafka. After all, most requests can be processed immediately. The greatest significance of understanding this module is that you can learn how Kafka, as a distributed system, handles asynchronous looping operations and manages scheduled tasks. This functionality is a challenge that all distributed systems face. Therefore, once you understand the principles and code implementation of this module, it will be much easier when designing similar functional modules in the future.

After-class Discussion #

In the DelayedOperationPurgatory class, a Reaper thread is defined to remove expired delay requests from the data structure. This is actually done by the advanceClock method of DelayedOperationPurgatory. Inside this method, there is a statement like this:

val purged = watcherLists.foldLeft(0) {
  case (sum, watcherList) => sum + watcherList.allWatchers.map(_.purgeCompleted()).sum
}

What do you think this statement is used for?

Feel free to write your thoughts and answers in the comments section, discuss and exchange ideas with me, and share today’s content with your friends.