19 Source Code Series, a Journey of a Network Request in Netty

19 Source code series, a journey of a network request in Netty #

Through the study of the source code in the previous two sections, we learned that when Netty starts on the server side, it creates a NioServerSocketChannel, and when a new client connection is established, it creates a NioSocketChannel. Whether it is a server-side or client-side channel, they both initialize their own ChannelPipeline when created. If we liken Netty to a production workshop, then the Reactor thread is undoubtedly the central control system of the workshop. The ChannelPipeline can be seen as the assembly line of the workshop, processing the raw materials step by step in order to form a complete product. In this chapter, I will guide you through the complete process of how a network request is processed in Netty, in order to deepen your understanding of the content in the previous two chapters, and focus on explaining the working principle of ChannelPipeline.

Note: The version of Netty source code referred to in this article is 4.1.42.Final.

Review of Event Handling Mechanism #

First, let’s take the example of a server accepting a new client connection, and combine it with the knowledge learned in the previous two sections, to review the event handling process of Netty. The following diagram shows the event handling flow in Netty.

Drawing 0.png

After Netty starts on the server side, the BossEventLoopGroup is responsible for listening to Accept events from clients. When a new client connection is established, the NioEventLoop in the BossEventLoopGroup first creates a new client channel, and then triggers the channelRead event propagation in the NioServerSocketChannel. The NioServerSocketChannel contains a special handler called ServerBootstrapAcceptor, which ultimately assigns the newly created client channel to the WorkerEventLoopGroup through the channelRead() method of ServerBootstrapAcceptor. The WorkerEventLoopGroup contains multiple NioEventLoops, and it selects one of the NioEventLoops to bind with the newly created client channel.

Once the registration of the client connection is complete, the server can start receiving data from the client. When the client sends data to the server, the NioEventLoop listens for the OP_READ event and assigns a ByteBuf to read the data. After the reading is completed, the data is passed to the pipeline for processing. Generally speaking, the data will propagate from the first ChannelHandler in the ChannelPipeline, and the processed messages will be passed to the next ChannelHandler. The whole process is executed sequentially.

In the previous two sections, we introduced how the server accepts a new client connection and the working process of NioEventLoop. Next, we will focus on how ChannelPipeline drives the event handling in Netty, so that the entire event handling process of Netty can be connected as a main line.

Initialization of the Pipeline #

We know that ChannelPipeline is created when a Channel is created, and it is a very important member variable of Channel. Let’s go back to the constructor of AbstractChannel and see how ChannelPipeline is constructed step by step.

// AbstractChannel

protected AbstractChannel(Channel parent) {

    this.parent = parent;

    id = newId();

    unsafe = newUnsafe();

    pipeline = newChannelPipeline();

}

// AbstractChannel#newChannelPipeline

protected DefaultChannelPipeline newChannelPipeline() {

    return new DefaultChannelPipeline(this);

}

// DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {

    this.channel = ObjectUtil.checkNotNull(channel, "channel");

    succeededFuture = new SucceededChannelFuture(channel, null);

    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);

    head = new HeadContext(this);

    head.next = tail;

    tail.prev = head;

}

After the ChannelPipeline is initialized, it forms a bidirectional linked list composed of ChannelHandlerContext objects. The minimum structure of the ChannelPipeline in the initialized state only contains two special nodes: HeadContext and TailContext, as shown in the following diagram.

Drawing 1.png

HeadContext and TailContext are two special nodes in ChannelPipeline, and they both inherit from AbstractChannelHandlerContext. According to the source code, we can see which implementation classes AbstractChannelHandlerContext has, as shown in the following diagram. In addition to HeadContext and TailContext, there is also a default implementation class DefaultChannelHandlerContext, and we can guess that DefaultChannelHandlerContext encapsulates the user-defined business handlers added in the Netty startup configuration class, and it will be inserted between HeadContext and TailContext.

图片3.png

Next, let’s compare the internal structures of these three AbstractChannelHandlerContext implementation classes. We find that they all include the reference to the current ChannelPipeline and the handler ChannelHandler. The only difference is that the HeadContext node also includes an unsafe object used to manipulate the underlying data read and write. For inbound events, it starts to propagate from the HeadContext node, so the unsafe object can be seen as the initiator of inbound events; for outbound events, the data will be returned to the client through the HeadContext node. At this time, the unsafe object can be seen as the processor of outbound events.

Next, let’s continue to see how user-defined handlers are added to the bidirectional linked list of ChannelPipeline.

Adding Handlers to the Pipeline #

When Netty client or server starts, it needs to configure the user-defined business handlers. Let’s first look at an example code snippet of a server startup class:

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

        .channel(NioServerSocketChannel.class)

        .childHandler(new ChannelInitializer<SocketChannel>() {

            @Override

            public void initChannel(SocketChannel ch) {

                ch.pipeline().addLast(new SampleInboundA());

                ch.pipeline().addLast(new SampleInboundB());

                ch.pipeline().addLast(new SampleOutboundA());

                ch.pipeline().addLast(new SampleOutboundB());

            }

        });

We know that ChannelPipeline can be divided into two types of handlers: inbound ChannelInboundHandler and outbound ChannelOutboundHandler. Both of them are wrapped by ChannelHandlerContext. Regardless of the type of handler, they are ultimately connected through a bidirectional linked list. The structure of the ChannelPipeline formed by the code example is as follows.

图片4.png

So how does ChannelPipeline differentiate between inbound and outbound handlers when adding handlers? Let’s follow the source code of ch.pipeline().addLast() to locate the core code.

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {

    final AbstractChannelHandlerContext newCtx;

    synchronized (this) {

        // 1. Check if the Handler is added repeatedly

        checkMultiplicity(handler);

        // 2. Create a new DefaultChannelHandlerContext node

        newCtx = newContext(group, filterName(name, handler), handler);

        // 3. Add the new DefaultChannelHandlerContext node to the ChannelPipeline

         addLast0(newCtx);

        // Omitted other code

    }

    // 4. Callback to the user method

    callHandlerAdded0(newCtx);

    return this;
}

addLast() mainly does the following four things:

  1. Check if the handler is added repeatedly.
  2. Create a new DefaultChannelHandlerContext node.
  3. Add the new DefaultChannelHandlerContext node to the ChannelPipeline.
  4. Callback to the user method. The first three steps are completed by locking using synchronized to prevent concurrent operations on the underlying bidirectional linked list of ChannelPipeline when multiple threads are accessing. Now let’s break it down step by step.

Firstly, when adding a handler, ChannelPipeline checks whether the handler has been added. If a non-thread-safe handler is added to ChannelPipeline, it can cause thread safety issues when multiple threads access it. The logic for checking duplicity in Netty is implemented in the checkMultiplicity() method:

private static void checkMultiplicity(ChannelHandler handler) {

    if (handler instanceof ChannelHandlerAdapter) {

        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;

        if (!h.isSharable() && h.added) {

            throw new ChannelPipelineException(

                    h.getClass().getName() +

                    " is not a @Sharable handler, so can't be added or removed multiple times.");

        }

        h.added = true;

    }

}

User-defined handlers usually inherit from ChannelHandlerAdapter. The added variable in ChannelHandlerAdapter is used to mark whether the handler has been added. If the currently added handler is not sharable and has already been added, an exception will be thrown. Otherwise, the current handler will be marked as added.

h.isSharable() is used to determine whether the handler is sharable. A sharable handler can be added to different ChannelPipelines repeatedly and must ensure thread safety. To implement a sharable handler, you just need to add the @Sharable annotation to the handler, as shown below:

@ChannelHandler.Sharable

public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {}

Next, let’s analyze the second step of addLast(), which is creating a new DefaultChannelHandlerContext node. Before executing the newContext() method, a unique name is generated for the handler using filterName(). Now let’s see how Netty generates the name:

private String filterName(String name, ChannelHandler handler) {

    if (name == null) {

        return generateName(handler);

    }

    checkDuplicateName(name);

    return name;

}

private String generateName(ChannelHandler handler) {

    Map<Class<?>, String> cache = nameCaches.get();

    Class<?> handlerType = handler.getClass();

    String name = cache.get(handlerType);

    if (name == null) {

        name = generateName0(handlerType);

        cache.put(handlerType, name);

    }

    if (context0(name) != null) {

        String baseName = name.substring(0, name.length() - 1);

        for (int i = 1;; i ++) {

            String newName = baseName + i;

            if (context0(newName) == null) {

                name = newName;

                break;

            }

        }

    }

    return name;

}

private static String generateName0(Class<?> handlerType) {

    return StringUtil.simpleClassName(handlerType) + "#0";

}

Netty uses FastThreadLocal to cache the mapping between the handler and its name. Before generating the default name for the handler, it first checks whether the mapping already exists in the cache. If not, it calls generateName0() to generate the default name and adds it to the cache. The default naming rule in Netty is "simple class name#0", for example, the default name for HeadContext is "DefaultChannelPipeline$HeadContext#0".

After generating the default name for the handler, it checks whether the generated name conflicts with existing names in the ChannelPipeline using the context0() method. The duplication check process is simple - it performs a linear search on the bidirectional linked list. If a conflict exists, Netty extracts the sequence number from the end of the name and keeps incrementing it until a non-conflicting name is generated, for example, "simple class name#1", "simple class name#2", "simple class name#3", and so on.

Now, let’s go back to the process of creating a new node in newContext(). We can locate the constructor of AbstractChannelHandlerContext:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,

                              String name, Class<? extends ChannelHandler> handlerClass) {

    this.name = ObjectUtil.checkNotNull(name, "name");

    this.pipeline = pipeline;

    this.executor = executor;

    this.executionMask = mask(handlerClass);

    ordered = executor == null || executor instanceof OrderedEventExecutor;

}

AbstractChannelHandlerContext has an executionMask attribute that may not be easy to understand. Let’s see how the mask() method generates the mask:

private static int mask0(Class<? extends ChannelHandler> handlerType) {

    int mask = MASK_EXCEPTION_CAUGHT;

    try {

        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {

            // If it is a ChannelInboundHandler instance, set all inbound events to 1

            mask |= MASK_ALL_INBOUND;

            // Exclude inbound events that the handler is not interested in

            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {

                mask &= ~MASK_CHANNEL_REGISTERED;

            }

            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {

                mask &= ~MASK_CHANNEL_UNREGISTERED;

            }
    }

    `getContextOrDie()` 方法是通过调用 `context(handler)` 方法获取对应的 `AbstractChannelHandlerContext` 节点如果找不到对应的节点就会抛出异常 NoSuchElementException

    第二步删除双向链表中的 Handler 节点我们回到 `remove 처리()` 方法继续看下调用 `atomicRemoveFromHandlerList()` 方法的代码

    ```java
    private void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
    
        synchronized (this) {
    
            // 1. 移除节点
    
            remove0(ctx);
    
            if (!registered) {
    
                // 2. 将 handlerRemoved 状态修改为 REMOVE_COMPLETE
    
                callHandlerCallbackLater(ctx, false);
    
                return;
    
            }
    
            EventExecutor executor = ctx.executor();
    
            if (!executor.inEventLoop()) {
    
                // 3. 提交任务到 EventLoop 延后回调
    
                executor.execute(new Runnable() {
    
                    @Override
    
                    public void run() {
    
                        // 回调用户的 handlerRemoved() 方法
    
                        callHandlerRemoved0(ctx);
    
                    }
    
                });
    
                return;
    
            }
    
        }
    
        // 直接回调用户的 handlerRemoved() 方法
    
        callHandlerRemoved0(ctx);
    
    }
    ```

    在第一步中,`remove0()` 方法就是用于移除节点的核心代码我们一起看下 `remove0()` 方法的源码

    ```java
    // 从链表中移植指定节点
    
    private static void remove0(AbstractChannelHandlerContext ctx) {
    
        AbstractChannelHandlerContext prev = ctx.prev;
    
        AbstractChannelHandlerContext next = ctx.next;
    
        prev.next = next;
    
        next.prev = prev;
    
    }
    ```

    `remove0()` 方法非常简单就是将前一个节点的 `next` 指向要删除节点的后一个节点同时将后一个节点的 `prev` 指向要删除节点的前一个节点所以说我们可以通过 `remove0()` 方法将给定的节点从链表中移除

    第三步回调用户函数我们再看下 `callHandlerRemoved0()` 的核心代码

    ```java
    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    
        try {
    
            // 调用用户实现的 handlerRemoved() 方法
    
            ctx.handler().handlerRemoved(ctx);
    
        } catch (Throwable t) {
    
            // ...
    
        } finally {
    
            // 移除逻辑节点
    
            setRemoved(ctx);
    
        }
    
    }
    ```

    `callHandlerRemoved0()` 方法实际上就是调用用户 Handler 中实现的 `handlerRemoved()` 方法我们通过这个方法可以在 Handler 被移除出 Pipeline 时做一些额外的清理工作

    总结
    通过分析 Pipeline 的源码我们了解到了 Pipeline 的内部结构以及如何添加和移除 Handler

    Pipeline 内部的链表结构采用了双向链表的方式这种数据结构既可以高效地添加和移除节点又可以快速地定位到对应的节点Pipeline 在添加和移除 Handler 时会将用户 Handler 包装为一个节点 DefaultChannelHandlerContext并将其插入到链表中在数据传输过程中Pipeline 会执行链表中的每个节点的方法实现事件的处理和传递

    Pipeline 中的 Inbound 事件是从链表的头向尾传递Outbound 事件则从尾向头传递在传递事件时Pipeline 会首先判断当前节点是否感兴趣该事件如果不感兴趣则跳过该节点继续传递给下一个节点

    虽然 Netty 提供了一种方便的方式来快速添加和移除 Handler但是由于 Pipeline 内部的双向链表是一个有序的数据结构所以频繁地添加和移除节点可能会对性能产生一定的影响在实际的应用中我们应该尽量避免这种频繁的操作可以采用复用的方式来减少对 Pipeline 结构的改动

    那么这些数据结构设计的主旨是什么呢它们是 Netty 在性能优化方面的一个取舍首先由于 Netty 需要支持用户 **频繁地添加移除自己自定义的处理器可能到达秒级**所以构建一个链表结构是最简单和直观的方法但后来 Netty 又发现这样遍历速度会比较慢影响网络请求的速度所以他采用了逆向查询(逆向遍历)链表的方式来解决这个问题
}

}

public final ChannelHandlerContext context(ChannelHandler handler) {

if (handler == null) {

    throw new NullPointerException("handler");

}

// Traverse the bidirectional linked list

AbstractChannelHandlerContext ctx = head.next;

for (;;) {

    if (ctx == null) {

        return null;

    }

    // If the handler is the same, return the current Context node

    if (ctx.handler() == handler) { 

        return ctx;

    }

    ctx = ctx.next;

}

}


Indeed, Netty does traverse the bidirectional linked list starting from the head node and checks if the current Context node's handler is the same as the handler to be removed. If they are the same, then the Handler has been found, and the current Context node is returned.

After finding the Handler node to be removed, the next step is to delete it from the linked list. Let's take a look at the source code of the `atomicRemoveFromHandlerList()` method:

private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {

AbstractChannelHandlerContext prev = ctx.prev;

AbstractChannelHandlerContext next = ctx.next;

prev.next = next;

next.prev = prev;

}


Deleting a node is similar to adding a node, it's just a basic operation of manipulating the linked list by adjusting the pointers. Assuming that we want to delete the SampleOutboundA node, we can represent the process of pointer changes during deletion using the following diagram:

![Image 6.png](../images/CgqCHl_fiRyAI2KKAAH7-GYFUt4327.png)

After removing the node, Netty will call the user-defined `handlerRemoved()` method, which is similar to adding a node. I won't go into detail on how this is implemented here.

So far, we have learned the basic operations of the internal structure of ChannelPipeline. By performing basic operations on the linked list, we can add and remove Handler nodes. When adding a node, the events that the Handler is not interested in are filtered out using bitmask operations. How does ChannelPipeline dispatch Handlers? Let's continue learning.

### Data Flow in Pipeline

As we know, based on the direction of data flow, ChannelPipeline is divided into two types of handlers: inbound ChannelInboundHandler and outbound ChannelOutboundHandler. The propagation directions of inbound events and outbound events are opposite. The propagation direction of an inbound event is Head -> Tail, while the propagation direction of an outbound event is Tail -> Head. Today, we will delve into the event propagation mechanism of ChannelPipeline in the context of the request-response scenario between a client and a server.

#### Inbound Event Propagation

When a client sends data to a server, how does the server receive it? Let's review the Netty Reactor thread model we learned before. First, the NioEventLoop continuously polls for OP_ACCEPT and OP_READ events. When the event is ready, the NioEventLoop responds promptly. First, let's locate the entry point in the source code of NioEventLoop:

// NioEventLoop#processSelectedKey

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

unsafe.read();

}


It can be seen that `unsafe.read()` triggers subsequent event processing. It is important to avoid confusion here: the unsafe object bound to the server Channel and the client Channel is different, because the server Channel only cares about how to accept client connections, while the client Channel needs to pay attention to data reading and writing. Here, we focus on the process of the client Channel reading data and follow the source code of `unsafe.read()`:

```java
public final void read() {

    final ChannelConfig config = config();

    // Omit other code

    final ChannelPipeline pipeline = pipeline();

    final ByteBufAllocator allocator = config.getAllocator();

    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

    allocHandle.reset(config);

    ByteBuf byteBuf = null;

    boolean close = false;

    try {

        do {

            byteBuf = allocHandle.allocate(allocator); // Allocate ByteBuf

            allocHandle.lastBytesRead(doReadBytes(byteBuf)); // Read data from Channel to ByteBuf

            if (allocHandle.lastBytesRead() <= 0) {

                byteBuf.release();

                byteBuf = null;

                close = allocHandle.lastBytesRead() < 0;

                if (close) {

                    readPending = false;

                }

                break;

            }

            allocHandle.incMessagesRead(1);

            readPending = false;

            pipeline.fireChannelRead(byteBuf); // Propagate ChannelRead event

            byteBuf = null;

        } while (allocHandle.continueReading());

        allocHandle.readComplete();

        pipeline.fireChannelReadComplete(); // Propagate readComplete event

        if (close) {

            closeOnRead(pipeline);

        }

    } catch (Throwable t) {

        handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
    
    if (!readPending && !config.isAutoRead()) {
    
        removeReadOp();
    
    }
    
}

}

Netty will continuously read data from the Channel into the allocated ByteBuf, and then trigger the propagation of the ChannelRead event through the pipeline.fireChannelRead() method. fireChannelRead() is the object we need to focus on.

// DefaultChannelPipeline

public final ChannelPipeline fireChannelRead(Object msg) {

    AbstractChannelHandlerContext.invokeChannelRead(head, msg);

    return this;

}

Netty first calls a static method invokeChannelRead() with the Head node as the argument. If it is executed in the Reactor thread, it will directly execute next.invokeChannelRead(). If it is called by an external thread, Netty will wrap the next.invokeChannelRead() call into an asynchronous task and submit it to the task queue. As we learned from the NioEventLoop source code, this ensures that the execution flow is controlled within the current NioEventLoop thread to ensure thread safety. We continue to follow through the core logic next.invokeChannelRead().

// AbstractChannelHandlerContext

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {

    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);

    EventExecutor executor = next.executor();

    if (executor.inEventLoop()) { // If the current thread is in the Reactor thread, execute directly

        next.invokeChannelRead(m);

    } else {

        executor.execute(new Runnable() { // If it is an external thread, submit it to the asynchronous task queue

            @Override

            public void run() {

                next.invokeChannelRead(m);

            }

        });

    }

}

Netty takes the current ChannelHandlerContext node and retrieves the corresponding Handler, then invokes the channelRead method of the Handler. Since the current node is HeadContext, the inbound event is propagated from the HeadContext node. Let’s take a look at how HeadContext.channelRead() is implemented.

// HeadContext

public void channelRead(ChannelHandlerContext ctx, Object msg) {

    ctx.fireChannelRead(msg);

}

// AbstractChannelHandlerContext

public ChannelHandlerContext fireChannelRead(final Object msg) {

    // Find the next node and execute invokeChannelRead

    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);

    return this;

}

We can see that HeadContext.channelRead() does not do anything special, it just continues to propagate the read event through the fireChannelRead() method. Next, Netty uses findContextInbound(MASK_CHANNEL_READ), msg) to find the next node of HeadContext and continue to execute the static method invokeChannelRead(), thus entering a recursive call process until it meets a specific condition. The execution process of channelRead can be summarized into a flowchart as follows:

Drawing 6.png

How does Netty determine whether the InboundHandler cares about the channelRead event? This is related to a knowledge point in findContextInbound(MASK_CHANNEL_READ), msg), which is closely related to the executionMask bit mask operation mentioned earlier. Let’s take a look at the source code of findContextInbound():

private AbstractChannelHandlerContext findContextInbound(int mask) {

    AbstractChannelHandlerContext ctx = this;

    do {

        ctx = ctx.next;

    } while ((ctx.executionMask & mask) == 0);

    return ctx;

}

The value of MASK_CHANNEL_READ is 1 « 5, which means that the bit for the channelRead event has been set to 1. In the code example, SampleInboundA is the Inbound type custom processor we added. If the result of the bit mask operation between its executionMask and MASK_CHANNEL_READ is not 0, it means that SampleInboundA is interested in the channelRead event and needs to trigger the execution of SampleInboundA’s channelRead() method.

When will the Inbound event be terminated in the recursive call process mentioned above? There are two possible situations:

  1. If the user-defined Handler does not perform the fireChannelRead() operation, the Inbound event propagation will be terminated at the current Handler.
  2. If the user-defined Handler performs the fireChannelRead() operation, the Inbound event propagation will be terminated at the TailContext node.

Next, let’s focus on what the TailContext node does.

public void channelRead(ChannelHandlerContext ctx, Object msg) {

    onUnhandledInboundMessage(ctx, msg);

}

protected void onUnhandledInboundMessage(Object msg) {

    try {

        logger.debug(

The code sample you provided seems to be part of a Netty application, specifically in the context of handling inbound and outbound events in the Netty pipeline. Let’s go through the translation.

"Discarded inbound message {} that reached at the tail of the pipeline. " +
    "Please check your pipeline configuration.", msg);

} finally {

    ReferenceCountUtil.release(msg);

}

It can be seen that the TailContext only logs the discarded inbound message and releases the ByteBuf as a precaution to prevent memory leaks.

So far, the propagation process of inbound events has been completely explained. The propagation direction of inbound events in the ChannelPipeline is from head to tail. Netty will find the inbound handlers in the ChannelPipeline that are interested in the propagated events, execute the event callback methods, and then continue to propagate to the next node. The entire event propagation process is a recursive calling process.

Propagation of Outbound Events #

After analyzing the propagation process of inbound events, it will be much simpler to understand the propagation of outbound events. The direction of outbound event propagation is from tail to head, exactly opposite to the propagation direction of inbound events. The most common outbound event is the write event, which is triggered when the writeAndFlush() method is called. Let’s follow the source code and go through the writeAndFlush() method starting from TailContext:

@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

Continuing to follow the source code of tail.writeAndFlush(), we will eventually find the write method in AbstractChannelHandlerContext. This method is the core logic of writeAndFlush. The specific source code is as follows:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    // ...... Omitted some non-core code ......
    // Find the next outbound channel handler in the pipeline linked list
    final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);

    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();

    // Check if the current thread is the NioEventLoop thread
    if (executor.inEventLoop()) {
        if (flush) {
            // Because flush == true, the process reaches here
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        if (!safeExecute(executor, task, promise, m)) {
            task.cancel();
        }
    }
}

In the example code of SampleOutboundB calling writeAndFlush(), Netty will call the findContextOutbound() method to find the next outbound channel handler in the pipeline linked list. In the example code, the next outbound node is SampleOutboundA. Then it invokes next.invokeWriteAndFlush(m, promise). Let’s follow:

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

We can see that the invokeWriteAndFlush() method eventually executes the write method of the next channel handler. Generally, when implementing outbound channel handlers, users often inherit ChannelOutboundHandlerAdapter. Let’s take a look at how it handles the outbound event:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ctx.write(msg, promise);
}

ChannelOutboundHandlerAdapter.write() simply calls the write() method of AbstractChannelHandlerContext, which should sound familiar. The flow has reached back to AbstractChannelHandlerContext, which repeats the execution of the write() method to find the next outbound node. This is a recursive call process.

Encoder is a commonly customized handler. However, why doesn’t the user’s encoder override the write() method and only override the encode() method? In the course “How Does Netty Implement Custom Communication Protocols,” we introduced the source code of MessageToByteEncoder. Custom encoders usually inherit MessageToByteEncoder, which overrides the write() method of ChannelOutboundHandler. It calls the encode() method implemented by the subclass to complete the data encoding. We will not expand on this here.

When does the propagation of outbound events end? You may have guessed it already. Outbound events eventually propagate to the HeadContext node. Therefore, the HeadContext node serves as both an inbound handler and an outbound handler. Let’s take a look at how HeadContext intercepts and handles write events:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

HeadContext ultimately calls the underlying unsafe to write data. When executing the write() method, the data will only be written to an underlying buffer data structure and then await flush operation to flush the data to the channel. If you want to understand how the write() and flush() operations work on the buffer data structure, review the “Deep Dive into WriteAndFlush Process” course to complete your knowledge system.

So far, the propagation of outbound events has been explained. Its propagation direction is from tail to head, the opposite of inbound event propagation. MessageToByteEncoder is an abstract class that is often used when implementing encoders. MessageToByteEncoder overrides the write() method of ChannelOutboundHandler, and in most cases, users only need to override the encode() method.

Propagation of Exception Events #

In the course “Pipeline Coordination in Service Orchestration Layer,” we already introduced the best practice of intercepting and handling exceptions in Netty. Let’s first review the simple implementation of an exception interceptor.

// ExceptionInterceptor.java

// Pipeline exception interception and handling interceptor
public class ExceptionInterceptor extends ChannelDuplexHandler {

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Exception handling logic
    }
}

When an exception occurs during event propagation, Netty will invoke the exceptionCaught() method in the exception interceptor to handle the exception. Exception interception is a critical mechanism in Netty that allows you to handle various types of exceptions in one place. Custom exception interceptors need to be added to the pipeline to handle exceptions.

In summary, Netty’s event propagation in the pipeline involves inbound events, outbound events, and exception events. Each event is propagated in a specific direction: inbound events propagate from head to tail, outbound events propagate from tail to head, and exception events traverse the pipeline in both directions. Understanding and mastering event propagation is crucial for writing effective Netty applications.

public class ExceptionHandler extends ChannelDuplexHandler {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

        if (cause instanceof RuntimeException) {

            System.out.println("Handle Business Exception Success.");

        }

    }

}

The exception handler ExceptionHandler generally extends ChannelDuplexHandler, which is both an inbound handler and an outbound handler. ExceptionHandler should be added at the end of custom handlers, as shown in the following diagram:

Image 8.png

When will the exception handler ExceptionHandler be executed? Let’s analyze two types of event propagation scenarios: inbound exception event propagation and outbound exception event propagation.

Let’s first look at the propagation of inbound exception events. Starting with the data reading scenario, we can observe that there is related logic for exception handling when inbound events are propagated. Let’s analyze the source code of the data reading process again.

// AbstractChannelHandlerContext
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

// AbstractChannelHandlerContext
private void notifyHandlerException(Throwable cause) {
    // Omitted other code
    invokeExceptionCaught(cause);
}

// AbstractChannelHandlerContext
private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
        try {
            handler().exceptionCaught(this, cause); // Call the implementation of the exceptionCaught method in the handler
        } catch (Throwable error) {
            // Omitted other code
        }
    } else {
        fireExceptionCaught(cause);
    }
}

If an exception occurs while SampleInboundA is reading data, invokeChannelRead will catch the exception and call notifyHandlerException() to handle the exception. By following the steps, we can see that the exceptionCaught() method in the Handler will eventually be called. Users can override exceptionCaught() to implement custom exception handling.

We know that the unified exception handler ExceptionHandler is added at the end of the ChannelPipeline. Since SampleInboundA does not override exceptionCaught(), how does the exception generated by SampleInboundA propagate to ExceptionHandler? It is generally expected that the user’s implementation of the inbound handler will inherit the ChannelInboundHandlerAdapter abstract class. As expected, we found the implementation of exceptionCaught() in ChannelInboundHandlerAdapter:

// ChannelInboundHandlerAdapter
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.fireExceptionCaught(cause);
}

// AbstractChannelHandlerContext
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
    return this;
}

By default, ChannelInboundHandlerAdapter calls the fireExceptionCaught() method to propagate the exception event. When fireExceptionCaught() is called, it first calls the findContextInbound() method to find the next inbound handler that is interested in exception events, and then continues to propagate the exception downward. So now you should understand why the unified exception handler ExceptionHandler needs to be added at the end of the ChannelPipeline. This way, ExceptionHandler can receive exceptions generated by all inbound handlers.

Next, let’s analyze the propagation of outbound exception events. At this point, you may wonder why the unified exception handler ExceptionHandler is not added at the beginning of the ChannelPipeline, as the propagation direction of outbound events is opposite to that of inbound events. Let’s investigate by looking at the calling process of writeAndFlush().

// AbstractChannelHandlerContext
private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

We can see that if an exception occurs when flushing to send data, the exception will also be caught and handled by the same notifyHandlerException() method. This is because the notifyHandlerException() method looks for inbound handlers downwards. At this point, the inbound exception event propagation flow is reached again. Therefore, the propagation direction of the exception event is almost the same as that of the inbound event, and the exception event will eventually propagate to the unified exception handler ExceptionHandler.

At this point, the entire process of exception event propagation has been analyzed. What you need to remember is that the order of exception event handling is the same as the order in which the ChannelHandler is added, and the exception events will be propagated successively, regardless of whether they are inbound or outbound events.

Summary #

In this section, we learned about the complete data processing flow in Netty, with a focus on how data flows through the ChannelPipeline. Let’s summarize the key points:

  • The ChannelPipeline is a doubly-linked list structure that contains both ChannelInboundHandler and ChannelOutboundHandler.
  • Inbound events and outbound events propagate in opposite directions. The propagation direction of inbound events is from Head to Tail, while the propagation direction of outbound events is from Tail to Head.
  • The order of handling exception events is the same as the order in which the ChannelHandler is added, and the exceptions will be propagated successively, regardless of whether they are inbound or outbound events.

Let’s review the implementation principles of event propagation in the ChannelPipeline as a whole:

  • Inbound events start the propagation process from the HeadContext node, while outbound events start from the TailContext node.
  • The AbstractChannelHandlerContext abstract class provides a series of fire and invoke methods. To propagate events downward, you can call the fire methods. The fire and invoke methods, in combination with the findContextInbound() and findContextOutbound() methods, control the propagation direction of inbound and outbound events. The entire process is a recursive invocation.