18 Source Code Series, Decrypting Netty's Reactor Threading Model

18 Source code series, decrypting Netty’s Reactor threading model #

Through the study of the basic course of Netty in the first chapter, we know that the Reactor thread model is the core of Netty’s high performance. In Netty, EventLoop is the core processing engine of the Reactor thread model. How is EventLoop implemented? How does it ensure high performance and thread safety? Let’s explore it together in this lesson.

Note: The version of Netty source code referenced in this article is 4.1.42.Final.

Main Process of Reactor Thread Execution #

In the course “Event Dispatching Layer: Why is EventLoop the Essence of Netty”, we introduced the overview of EventLoop. Because Netty is based on NIO implementation, it is recommended to use NioEventLoop. Let’s review the main process of Netty Reactor thread model execution through the core entry point run() method of NioEventLoop, and based on this, continue to investigate the logic details of NioEventLoop.

protected void run() {

    for (;;) {

        try {

            try {

                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

                case SelectStrategy.CONTINUE:

                    continue;

                case SelectStrategy.BUSY_WAIT:

                case SelectStrategy.SELECT:

                    select(wakenUp.getAndSet(false)); // Poll I/O events

                    if (wakenUp.get()) {

                        selector.wakeup();

                    }

                default:

                }

            } catch (IOException e) {

                rebuildSelector0();

                handleLoopException(e);

                continue;

            }

            cancelledKeys = 0;

            needsToSelectAgain = false;

            final int ioRatio = this.ioRatio;

            if (ioRatio == 100) {

                try {

                    processSelectedKeys(); // Process I/O events

                } finally {

                    runAllTasks(); // Process all tasks

                }

            } else {

                final long ioStartTime = System.nanoTime();

                try {

                    processSelectedKeys(); // Process I/O events

                } finally {

                    final long ioTime = System.nanoTime() - ioStartTime;

                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // Process the asynchronous task queue after processing I/O events

                }

            }

        } catch (Throwable t) {

            handleLoopException(t);

        }

        try {

            if (isShuttingDown()) {

                closeAll();

                if (confirmShutdown()) {

                    return;

                }

            }

        } catch (Throwable t) {

            handleLoopException(t);

        }

    }

}

The run() method of NioEventLoop is an infinite loop with no exit condition. It continuously performs the following three things, which can be graphically represented as follows.

Lark20201216-164824.png

  • Poll I/O events (select): Poll all the I/O events of the channels registered in the selector.
  • Process I/O events (processSelectedKeys): Process the ready-to-use I/O events.
  • Process the asynchronous task queue (runAllTasks): The Reactor thread also has a very important responsibility, which is to process non-I/O tasks in the task queue. Netty provides the ioRatio parameter to adjust the ratio of time for handling I/O events and tasks.

Now let’s take a detailed look at the three steps of NioEventLoop.

Poll I/O events #

First, let’s focus on the key code fragments of polling I/O events.

case SelectStrategy.CONTINUE:

    continue;

case SelectStrategy.BUSY_WAIT:

case SelectStrategy.SELECT:

    select(wakenUp.getAndSet(false));

    if (wakenUp.get()) {

        selector.wakeup();
}

NioEventLoop loops through registered I/O events by using the core method select(). When there are no I/O events, in order to prevent the NioEventLoop thread from spinning in a loop indefinitely, the thread needs to be blocked until an I/O event or asynchronous task occurs before it can be awakened. NioEventLoop uses the wakeUp variable to indicate whether the selector is awakened. Netty sets wakeUp to false before each new loop begins.

Netty provides a selection strategy object called SelectStrategy, which controls the behavior of the select loop. It includes three strategies: CONTINUE, SELECT, and BUSY_WAIT. Since NIO does not support BUSY_WAIT, the execution logic of BUSY_WAIT is the same as SELECT. Netty determines which strategy to use during the I/O event loop based on the following criteria:

// DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
}

// NioEventLoop#selectNow
int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

If there are asynchronous tasks in the current NioEventLoop thread, selectSupplier.get() will eventually call the selectNow() method, which is non-blocking and returns immediately after execution. If there are ready I/O events, it will bypass the select() method and proceed to process the I/O events in processSelectedKeys and the tasks in the asynchronous task queue in runAllTasks. Therefore, in scenarios where there are asynchronous tasks, NioEventLoop prioritizes ensuring that the CPU can process the tasks promptly.

When there are no asynchronous tasks, meaning the task queue is empty, the SELECT strategy is returned, and the select(boolean oldWakenUp) method is called. Let’s see how the select() method is implemented internally:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // Calculate the deadline for the select blocking operation
        long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
        if (nextWakeupTime != normalizedDeadlineNanos) {
            nextWakeupTime = normalizedDeadlineNanos;
        }
        for (;;) {
            // ------ 1. Check if the select blocking operation has exceeded the deadline ------
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            // ------ 2. If a task is generated during the polling process, interrupt the current polling ------
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            // ------ 3. Select blocking and wait for I/O events ------
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt++;
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }
            // ------ 4. Resolve the notorious JDK epoll busy spin bug ------
            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);

selectCnt = 1;

break;

}

currentTimeNanos = time;

}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {

if (logger.isDebugEnabled()) {

logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",

selectCnt - 1, selector);

}

}

} catch (CancelledKeyException e) {

if (logger.isDebugEnabled()) {

logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",

selector, e);

}

}

}

Netty, in order to solve the infamous JDK epoll busy-spinning bug, has a relatively complex select() method. I will break it down into four parts and analyze them one by one.

**First step, check if select blocking operation exceeds the deadline.** Before entering the infinite loop, Netty first records the current time `currentTimeNanos` and the execution time `selectDeadLineNanos` of the most recent pending task in the timer task queue. In Netty, the timer task queue is arranged in ascending order of delay time, and by calling the `delayNanos(currentTimeNanos)` method, we can get the delay time of the first pending task. Then the code enters an infinite loop. First, it determines whether `currentTimeNanos` exceeds `selectDeadLineNanos` by more than 0.5ms. If it does, it means there are pending tasks in the queue that need to be executed immediately, so it will exit the infinite loop. Before exiting, if the select operation has never been executed, it will immediately perform a non-blocking `selectNow` operation. At this point, there is a question as to why a time window of 0.5ms is left. In the case where the task queue is empty, the select operation may not receive any I/O events and stop blocking and return immediately.

One point that is easy to confuse is that Netty's task queue includes ordinary tasks, timer tasks, and tail tasks. `hasTask()` checks whether the ordinary task queue and the tail task queue are empty, while the `delayNanos(currentTimeNanos)` method gets the delay time of the timer task.

**Second step, handle the tasks generated during polling in a timely manner.** In order to ensure that the tasks can be executed in a timely manner, Netty will immediately perform a non-blocking `selectNow` operation and then jump out of the loop to return to the main flow of the event loop, ensuring that `runAllTasks` can be executed first.

**Third step, select blocking and wait for I/O events.** When performing the blocking `select` operation, it means that the task queue is already empty and the first pending task to be executed has not reached the deadline for task execution, so it needs to block and wait for the timeout specified by `timeoutMillis`. Let's assume an extreme case where the deadline for the pending task is very far away. doesn't it mean that the `select` operation will block indefinitely and cause Netty to be unable to work? Therefore, when adding tasks in an external thread, Netty can wake up the blocking `select` operation. The specific source code is as follows:

```java
// SingleThreadEventExecutor#execute

public void execute(Runnable task) {

    // other code omitted

    if (!addTaskWakesUp && wakesUpForTask(task)) {

        wakeup(inEventLoop); 

    }

}

// NioEventLoop#wakeup

protected void wakeup(boolean inEventLoop) {

    // If it is an external thread, set wakenUp to true and wake up the blocking select operation

    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {

        selector.wakeup();

}

}

The cost of the selector.wakeup() operation is very high, so Netty does not call it directly every time. Before each call, it first executes wakenUp.compareAndSet(false, true), and only executes the selector.wakeup() operation if the setting is successful.

Step 4, solving the infamous JDK epoll empty polling bug. Netty has introduced the solution in the previous lesson, and here we will review it in combination with the overall select operation. Actually, Netty did not solve the problem at its root, but cleverly circumvented it. Netty introduces a counter variable called selectCnt to record the number of select operations. If the event polling time is less than timeoutMillis and there have been more than SELECTOR_AUTO_REBUILD_THRESHOLD (default 512) empty polls in this time period, it indicates that the epoll empty polling bug may have been triggered. Netty rebuilds a new Selector object and re-registers all the SelectionKeys in the exceptional Selector to the newly created Selector. After the reconstruction is complete, the exceptional Selector can be discarded.

The process of NioEventLoop polling I/O events using select has been explained, and we will briefly summarize what the select process does. The select operation is also an infinite loop, checking whether the task queue is empty before event polling to ensure that the tasks waiting to be executed in the task queue can be executed in a timely manner. If the task queue is already empty, then execute the select blocking operation to wait for the I/O events to be obtained. Netty introduces a counter variable and counts the number of execution of select operations within a certain time window, identifying the possible existence of exceptional Selector objects, and then cleverly avoids the JDK epoll empty polling problem by rebuilding the Selector.

Handling I/O events #

After the select process, we have obtained the ready I/O events, and now we need to call the processSelectedKeys() method to handle the I/O events. Before handling the I/O events, Netty controls the ratio of time spent on I/O event handling and task handling through the ioRatio parameter, which is set to ioRatio = 50 by default. If ioRatio = 100, it means that after each I/O event is processed, all tasks will be executed. If ioRatio < 100, the I/O events will be processed first, and then the asynchronous task queue will be processed. Therefore, regardless of the situation, processSelectedKeys() will always be executed first. Next, let’s follow the processSelectedKeys() source code:

private void processSelectedKeys() {

    if (selectedKeys != null) {

        processSelectedKeysOptimized();

    } else {

        processSelectedKeysPlain(selector.selectedKeys());

    }

}

There are two options for handling I/O events: one is to handle Netty’s optimized selectedKeys, and the other is the normal processing logic. The use of these two strategies depends on whether selectedKeys has been set or not, and the selectedKeys collections used by these two strategies are different. The Netty optimized selectedKeys is of the SelectedSelectionKeySet type, while the normal logic uses the JDK HashSet type. Below, we will introduce the implementation of these two strategies one by one.

1. processSelectedKeysPlain

First, let’s look at the source code of the normal processing logic processSelectedKeysPlain:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {

    if (selectedKeys.isEmpty()) {

        return;

    }

    Iterator<SelectionKey> i = selectedKeys.iterator();

    for (;;) {

        final SelectionKey k = i.next();

        final Object a = k.attachment();

        i.remove();

        if (a instanceof AbstractNioChannel) {

            // I/O events are handled by Netty

            processSelectedKey(k, (AbstractNioChannel) a);

        } else {

            // User-defined task

            @SuppressWarnings("unchecked")

            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

            processSelectedKey(k, task);

        }
    }
        int batchSize = Math.min(4096, readyOps.size());

        for (int i = 0; i < batchSize; i++) {

            SelectionKey k = readyOps.keys[i];

            readyOps.keys[i] = null;

            processSelectedKey(k, (AbstractNioChannel) k.attachment());

        }

        if (readyOps.size > batchSize) {

            readyOps.compact();

        } else {

            break;

        }
    }
}

可以看到,优化后的 processSelectedKeysOptimized 方法对 readyOps 的处理采用了批量处理的方式。首先从 readyOps 中取出一批 SelectionKey,然后依次处理这批 SelectionKey,最后根据批量处理的结果来决定是继续处理后续的 SelectionKey 还是跳出循环。因为优化后采用了数组的方式存储 SelectionKey,可以通过下标直接获取,所以性能会有一定的提升。

综上所述,Netty 的 Reactor 线程在处理 I/O 事件时,主要分为两步:首先通过选择器轮询处理准备就绪的 I/O 事件,然后按照事件类型分别进行处理。实际上,Netty 还对选择器轮询处理做了一些优化,通过 SelectedSelectionKeySet 对象以批量处理的方式来提高事件处理的性能。

private void processSelectedKeysOptimized() {

    for (int i = 0; i < selectedKeys.size; ++i) {

        final SelectionKey k = selectedKeys.keys[i];

        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {

            processSelectedKey(k, (AbstractNioChannel) a);

        } else {

            @SuppressWarnings("unchecked")

            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

            processSelectedKey(k, task);

        }

        if (needsToSelectAgain) {

            selectedKeys.reset(i + 1);

            selectAgain();

            i = -1;

        }

    }

}

We can see that the code structure of processSelectedKeysOptimized and processSelectedKeysPlain is very similar. The most important difference is the way selectedKeys is traversed, so let's take a look at the source code of SelectedSelectionKeySet to explore further.

```java
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;

    int size;

    SelectedSelectionKeySet() {

        keys = new SelectionKey[1024];

    }

    @Override

    public boolean add(SelectionKey o) {

        if (o == null) {

            return false;

        }

        keys[size++] = o;

        if (size == keys.length) {

            increaseCapacity();

        }

        return true;

    }
    
    // Omitted other code
    
}

Because SelectedSelectionKeySet internally uses an array of SelectionKey, processSelectedKeysOptimized can directly retrieve the I/O events by traversing the array, which is more efficient than traversing a HashSet in the JDK. SelectedSelectionKeySet internally records the logical length of the data through the size variable. Each time the add operation is executed, the object will be added to the end of the SelectionKey[] array. When size equals the actual length of SelectionKey[], the SelectionKey[] will be expanded. Unlike HashSet, SelectionKey[] does not need to consider hash conflicts, so it can achieve O(1) time complexity for the add operation.

When is SelectedSelectionKeySet generated? By finding references to SelectedSelectionKeySet, we can locate the openSelector method in NioEventLoop. The core source code snippet is as follows:

```java
private SelectorTuple openSelector() {

    // Omitted other code

    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {

        @Override

        public Object run() {

            try {

                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");

                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {

                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);

                    long publicSelectedKeysFieldOffset =

                            PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {

                        PlatformDependent.putObject(

                                unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);

                        PlatformDependent.putObject(

                                unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);

                        return null;

                    }

                }

                // Omitted other code

            } catch (NoSuchFieldException e) {

                return e;

            } catch (IllegalAccessException e) {

                return e;

            }

        }

    });    

    // Omitted other code
}
Netty replaces the selectedKeys and publicSelectedKeys fields inside the Selector object with SelectedSelectionKeySet through reflection. The original selectedKeys and publicSelectedKeys fields are both of the HashSet type. This is a great little trick, as it is rare to see optimizations in the JDK's underlying implementation. Netty's pursuit of optimization in the details is worth learning.

So far, we have covered the second step of the main process in the Reactor thread, which is processing the I/O events in processSelectedKeys. In summary, there are two options for processing I/O events: one is to process the optimized selectedKeys in Netty, and the other is the normal processing logic. The processing logic for both strategies is similar - they determine the type of SelectionKey by accessing the attachment mounted on the SelectionKey and then call different handling methods based on the type. Finally, the events are propagated through the Pipeline. The optimized selectedKeys in Netty use an array to store SelectionKey, which is more efficient than the HashSet used by JDK for traversing. In addition, processSelectedKeys performs additional optimizations. If it detects that more than the default threshold of 256 Channels have been removed from the Selector object, it will perform another polling operation to ensure the validity of the keySet.

#### Handling the Asynchronous Task Queue

Let's continue analyzing the last step in the main process of the Reactor thread, which is handling the asynchronous task queue runAllTasks. How does Netty ensure the thread safety of Channel operations? This is attributed to Netty's task mechanism. Here, we will introduce Netty's task mechanism from two aspects: task addition and task execution.

  * **Task Addition**
  
Inside NioEventLoop, there are two important asynchronous task queues: the ordinary task queue and the scheduled task queue. NioEventLoop provides the execute() and schedule() methods to add tasks to different queues. execute() is used to add ordinary tasks, while schedule() is used to add scheduled tasks.

First, let's see how to add an ordinary task. NioEventLoop inherits from SingleThreadEventExecutor, which provides the execute() method to add ordinary tasks. Here is the source code:

```java
public void execute(Runnable task) {

    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    addTask(task);

    if (!inEventLoop) {
        startThread();

        if (isShutdown()) {
            boolean reject = false;

            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {}

            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    if (!offerTask(task)) {
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }

    return taskQueue.offer(task);
}

By following the addTask(task) method, it can be observed that the task is finally added to the taskQueue. In SingleThreadEventExecutor, taskQueue is the ordinary task queue. The taskQueue uses the Mpsc Queue by default, which can be understood as a multiple-producers single-consumer queue. We will have a separate lesson on Mpsc Queue, so we won’t go into details here. In addition, in the context of task processing, inEventLoop() always returns true because it is always executed in the Reactor thread. Since it is executed serially in the Reactor thread, it can ensure thread safety. However, why do we still need the Mpsc Queue? Let’s continue to see.

Here, let’s consider a common scenario. For example, after processing a business request in the RPC business thread pool, you can get the associated Channel based on the user request and write data back to the client. So, how does Netty operate the Channel’s related methods when an external thread calls them? Let’s follow the source code of channel.write():

// #AbstractChannel#write
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

// AbstractChannelHandlerContext#write
private void write(Object msg, boolean flush, ChannelPromise promise) {
    // Omitted other code
    
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();

    if (executor.inEventLoop()) { // Invoked from within the Reactor thread
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else { // External thread will enter this branch
        final AbstractWriteTask task;

        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);

}

if (!safeExecute(executor, task, promise, m)) {

    task.cancel();

}

}

}

// AbstractChannelHandlerContext#safeExecute

private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {

try {

executor.execute(runnable);

return true;

} catch (Throwable cause) {

try {

    promise.setFailure(cause);

} finally {

    if (msg != null) {

        ReferenceCountUtil.release(msg);

    }

}

return false;

}

}

If the Reactor thread initiates the call to the channel.write() method and inEventLoop() returns true. Event processing will be directly delegated to the Pipeline within the Reactor thread. If an external thread calls, it will go to the else branch. At this time, the write operation will be encapsulated into a WriteTask and then executed through safeExecute(). It can be seen that safeExecute() is the call to the SingleThreadEventExecutor#execute() method and the task will finally be added to the taskQueue. Because multiple external threads may operate on the same channel concurrently, the Mpsc Queue can ensure thread safety.

Next, let’s analyze the process of adding a scheduled task. Similar to regular tasks, scheduled tasks also have two scenarios: within the Reactor thread and outside threads. We directly go to the core code of AbstractScheduledEventExecutor#schedule() and find the following code:

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {

    if (inEventLoop()) { // Within the Reactor thread

        scheduledTaskQueue().add(task.setId(nextTaskId++));

    } else { // Outside thread

        executeScheduledRunnable(new Runnable() {

            @Override

            public void run() {

                scheduledTaskQueue().add(task.setId(nextTaskId++));

            }

        }, true, task.deadlineNanos());

    }

    return task;

}

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {

    if (scheduledTaskQueue == null) {

        scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(

                SCHEDULED_FUTURE_TASK_COMPARATOR,

                11);

    }

    return scheduledTaskQueue;

}

void executeScheduledRunnable(Runnable runnable,

                                        @SuppressWarnings("unused") boolean isAddition,

                                        @SuppressWarnings("unused") long deadlineNanos) {

    execute(runnable);

}

The scheduledTaskQueue in AbstractScheduledEventExecutor is the queue for scheduled tasks. It can be seen that the default implementation of scheduledTaskQueue is the priority queue DefaultPriorityQueue, which can conveniently sort the tasks in the queue by time. However, DefaultPriorityQueue is not thread safe. If it is called within the Reactor thread, it will not have thread safety issues because it executes in a serial manner. If an external thread adds a scheduled task, Netty wraps the operation of adding a scheduled task into another task to be handled by executeScheduledRunnable(). In executeScheduledRunnable(), it calls the execute() method of a regular task again, cleverly using the Mpsc Queue used to address the thread safety issue of external thread adding a regular task.

  • Task Execution

After introducing the process of adding different tasks in Netty, let’s go back and analyze how the Reactor thread executes these tasks. Through the analysis of the main process of the Reactor thread, we know that there are two implementations for handling asynchronous task queues: runAllTasks() and runAllTasks(long timeoutNanos). The first one handles all the tasks, and the second one handles tasks with a timeout time to prevent I/O events from being blocked if the Reactor thread takes too long to handle tasks. We focus on the source code of runAllTasks(long timeoutNanos):

protected boolean runAllTasks(long timeoutNanos) {

    fetchFromScheduledTaskQueue(); // 1. Merge scheduled tasks into normal task queue

    // 2. Retrieve tasks from the normal task queue and handle them

    Runnable task = pollTask();

    if (task == null) {

        afterRunningAllTasks();

        return false;

    }

    // Calculate the timeout for task execution

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

    long runTasks = 0;

    long lastExecutionTime;

    for (;;) {

        safeExecute(task); // Execute the task

        runTasks ++;

        // Check if timeout every 64 tasks executed

        if ((runTasks & 0x3F) == 0) {

            lastExecutionTime = ScheduledFutureTask.nanoTime();

            if (lastExecutionTime >= deadline) {
```java
break;

}

}

task = pollTask(); // Continue to retrieve the next task

if (task == null) {

lastExecutionTime = ScheduledFutureTask.nanoTime();

break;

}

}

// 3. Finishing work

afterRunningAllTasks();

this.lastExecutionTime = lastExecutionTime;

return true;

}


The process of handling asynchronous tasks runAllTasks can be divided into three steps: merging scheduled tasks into the regular task queue, retrieving tasks from the regular task queue and processing them, and finally performing finishing work. Let's look at how these three steps are implemented.

In the first step, merging scheduled tasks into the regular task queue, we have the fetchFromScheduledTaskQueue() method.

```java
private boolean fetchFromScheduledTaskQueue() {

if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {

return true;

}

long nanoTime = AbstractScheduledEventExecutor.nanoTime();

for (;;) {

Runnable scheduledTask = pollScheduledTask(nanoTime); // Retrieve scheduled tasks from the scheduled task queue with deadline time less than or equal to the current time

if (scheduledTask == null) {

return true;

}

if (!taskQueue.offer(scheduledTask)) {

// If the regular task queue is full, put the scheduled task back

scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);

return false;

}

}

}

protected final Runnable pollScheduledTask(long nanoTime) {

assert inEventLoop();

Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;

ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();

// If the deadlineNanos of the scheduled task is less than the current time, retrieve it

if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {

return null;

}

scheduledTaskQueue.remove();

return scheduledTask;

}

Only when the deadline time deadlineNanos of the scheduled task is less than the current time, the scheduled task can be retrieved and merged into the regular task. Since the scheduled tasks are sorted in ascending order based on the deadline time deadlineNanos, if the retrieved scheduled task does not meet the merging condition, then none of the remaining tasks in the scheduled task queue will meet the condition, and the merging operation is complete and exit.

In the second step, retrieving tasks from the regular task queue and processing them, we can refer back to the source code of runAllTasks(long timeoutNanos) for the second part, which I have annotated. The actual task processing in safeExecute() is very simple, it directly calls the run() method of Runnable. Since the asynchronous task processing has a timeout, Netty adopts a strategy of periodic checking. It checks whether the timeout has exceeded after every 64 tasks are executed. This is also a trade-off consideration for performance, as if there are a large number of short-term tasks in the asynchronous queue, checking the timeout after each execution will reduce the performance.

In the third step, the finishing work is done, which is implemented in the afterRunningAllTasks() method.

protected void afterRunningAllTasks() {

runAllTasksFrom(tailTasks);

}

protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {

Runnable task = pollTaskFrom(taskQueue);

if (task == null) {

return false;

}

for (;;) {

safeExecute(task);

task = pollTaskFrom(taskQueue);

if (task == null) {

return true;

}

}

}

The tail queue tailTasks is the lower priority queue compared to the regular task queue, it can be understood as the finishing tasks. After each execution of tasks in the taskQueue, it will retrieve and execute the tasks in the tail queue. Tail queue is not commonly used, but can be used in scenarios where you want to track the running status of Netty, such as real-time updates of statistical data such as task loop time consumption and physical memory usage.

So far, we have completed the explanation of the process of the Netty handling asynchronous task queue. Let’s summarize. Asynchronous tasks are mainly divided into regular tasks and scheduled tasks. When adding and executing tasks, we need to consider both internal and external thread scenarios. When an external thread adds a scheduled task, Netty cleverly solves the thread safety problem during concurrent operations by using the regular task Mpsc Queue. Netty will merge scheduled tasks that meet the conditions into the regular task queue before execution, and the regular task queue is responsible for executing these tasks. Every 64 tasks, Netty will check whether the timeout has exceeded.

Summary #

The Reactor thread model is the core of Netty, and I have spent a lot of space explaining it in this lesson. NioEventLoop, as the implementation of Netty Reactor thread, has a very ingenious design principle, which is worth reading and thinking about repeatedly. We should always remember the three things NioEventLoop does in its infinite loop: polling I/O events, processing I/O events, and handling asynchronous task queues.

There are several high-frequency interview questions about the Netty Reactor thread model. After reading this lesson, are you clear about them?

  • How does Netty’s NioEventLoop implement it? How does it ensure that the operations on the Channel are thread-safe?
  • How does Netty solve the JDK epoll empty polling bug?
  • How does NioEventLoop achieve lock-free?

Feel free to leave your comments in the comment section. Looking forward to seeing more of your understanding and thoughts on the Reactor thread model.