09 Data Transmission Write and Flush, Dissecting the Processing Flow

09 Data transmission writeAndFlush, dissecting the processing flow #

In the previous few lessons, we introduced the basics of Netty encoding and decoding. You must have mastered the skills of implementing encoding and decoding logic in Netty. So how do we send the encoded and decoded results? In Netty, it is very simple to implement data transmission. You just need to call the writeAndFlush method. What exactly does this one line of code do in Netty? Let’s explore this in today’s lesson!

Review of Pipeline Event Propagation #

Before discussing the working principle of writeAndFlush, let’s first review the event propagation mechanism of the Pipeline, because they are closely related. According to the flow of network data, the ChannelPipeline is divided into two types of handlers: inbound ChannelInboundHandler and outbound ChannelOutboundHandler, as shown in the following figure.

图片11.png

When we send a request from the client to the server, or when the server responds to the client with the request result, these actions belong to the outbound handler ChannelOutboundHandler. So when we call writeAndFlush, the data will definitely propagate in the Pipeline.

Here, I first raise a few questions that you can use to check whether you really understand the principle of writeAndFlush after studying this lesson.

  • How does writeAndFlush trigger event propagation? How is the data written to the underlying socket?
  • Why are there two actions, write and flush? How is the data stored before executing flush?
  • Is writeAndFlush synchronous or asynchronous? Is it thread-safe?

Analysis of writeAndFlush Event Propagation #

In order to analyze the event propagation process of writeAndFlush, first we simulate the simplest outbound data transmission scenario through code. After the server receives a request from the client, it encodes the response result and writes it back to the client.

The following is the server’s startup class, which registers three ChannelHandlers: FixedLengthFrameDecoder for decoding a fixed length, ResponseSampleEncoder for encoding the response result, and RequestSampleHandler for business logic processing.

public class EchoServer {

    public void startEchoServer(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)

                    .channel(NioServerSocketChannel.class)

                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override

                        public void initChannel(SocketChannel ch) {

                            ch.pipeline().addLast(new FixedLengthFrameDecoder(10));

                            ch.pipeline().addLast(new ResponseSampleEncoder());

                            ch.pipeline().addLast(new RequestSampleHandler());

                        }

                    });

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();

        } finally {

            bossGroup.shutdownGracefully();

            workerGroup.shutdownGracefully();

        }

    }

    public static void main(String[] args) throws Exception {

        new EchoServer().startEchoServer(8088);

    }

}

The FixedLengthFrameDecoder is a built-in decoder provided by Netty, which will not be discussed here. Now let’s take a look at the specific implementation of the other two ChannelHandlers.

The ResponseSampleEncoder is used to encode the server’s processing result. The specific implementation logic is as follows:

public class ResponseSampleEncoder extends MessageToByteEncoder<ResponseSample> {

    @Override

    protected void encode(ChannelHandlerContext ctx, ResponseSample msg, ByteBuf out) {

        if (msg != null) {

            out.writeBytes(msg.getCode().getBytes());

            out.writeBytes(msg.getData().getBytes());

            out.writeLong(msg.getTimestamp());

        }

    }

}

The RequestSampleHandler is responsible for handling client data and returns a ResponseSample object to the client by calling ctx.channel().writeAndFlush, which includes the return code, response data, and timestamp.

public class RequestSampleHandler extends ChannelInboundHandlerAdapter {

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8);

        ResponseSample response = new ResponseSample("OK", data, System.currentTimeMillis());

        ctx.channel().writeAndFlush(response);

    }

}

With the above code examples, we can illustrate the linked list structure of the Pipeline, as shown in the following figure.

图片12.png

So when the RequestSampleHandler calls writeAndFlush, how does the data propagate, process, and send to the client in the Pipeline? Let’s take a deep dive into the writeAndFlush process by analyzing this scenario.

Since writeAndFlush is a specific outbound operation, we guess that it starts to propagate from the Tail node of the Pipeline and continues to propagate forward to the Head node. We followed the source code of ctx.channel().writeAndFlush, as shown below, and found that it does call the writeAndFlush method of the Tail node in the DefaultChannelPipeline class.

@Override
public final ChannelFuture writeAndFlush(Object msg) {

    return tail.writeAndFlush(msg);

}

Continuing to follow the source code of tail.writeAndFlush, it will eventually locate the write method in AbstractChannelHandlerContext. This method is the core logic of writeAndFlush, as shown in the following source code.

private void write(Object msg, boolean flush, ChannelPromise promise) {

    // ...... omitted some non-core code ......
    // Seek the next Outbound type ChannelHandler node in the Pipeline linked list

    final AbstractChannelHandlerContext next = findContextOutbound(flush ?

            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);

    final Object m = pipeline.touch(msg, next);

    EventExecutor executor = next.executor();

    // Determine whether the current thread is the thread in the NioEventLoop

    if (executor.inEventLoop()) {

        if (flush) {

            // Because flush == true, the process comes here

            next.invokeWriteAndFlush(m, promise);

        } else {

            next.invokeWrite(m, promise);

        }

    } else {

        final AbstractWriteTask task;

        if (flush) {

            task = WriteAndFlushTask.newInstance(next, m, promise);

        }  else {

            task = WriteTask.newInstance(next, m, promise);

        }

        if (!safeExecute(executor, task, promise, m)) {

            task.cancel();

        }

    }

}

First, let’s confirm the parameters of the method. Since we need to perform the flush action, flush is set to true. The write method also requires a ChannelPromise parameter, indicating that the write operation is an asynchronous process. AbstractChannelHandlerContext will initialize a ChannelPromise by default to complete this asynchronous operation. The ChannelPromise internally holds the current Channel and EventLoop, and you can also register a callback listener with the ChannelPromise to get the result of the asynchronous operation.

The core logic of the write method consists of three important steps, which I have marked in the source code with comments. Now let’s analyze the execution mechanism of the write method in detail, based on the example code of EchoServer mentioned earlier.

In the first step, the findContextOutbound method is called to find the next Outbound type ChannelHandler in the Pipeline linked list. In our simulated scenario, the next Outbound node is ResponseSampleEncoder.

In the second step, the inEventLoop method is used to determine the identity of the current thread. If the current thread is the same as the thread assigned to the EventLoop of the current Channel, the submitted task will be executed immediately. Otherwise, the current operation will be encapsulated into a Task and put into the task queue of the EventLoop for execution later. So is writeAndFlush thread-safe? Do you have an answer in mind?

In the third step, since flush is true, the code next.invokeWriteAndFlush(m, promise) will be executed directly. Let’s follow the source code. Eventually, it will execute the write method of the next ChannelHandler node. So the flow returns to AbstractChannelHandlerContext to execute the write method again, continuing to find the next Outbound node.

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {

    if (invokeHandler()) {

        invokeWrite0(msg, promise);

        invokeFlush0();

    }  else {

        writeAndFlush(msg, promise);

    }

}

private void invokeWrite0(Object msg, ChannelPromise promise) {

    try {

        ((ChannelOutboundHandler) handler()).write(this, msg, promise);

    } catch (Throwable t) {

        notifyOutboundHandlerException(t, promise);

    }

}

Why did we override the encode method in ResponseSampleEncoder instead of the write method? When is the encode method executed? This brings us back to the source code of MessageToByteEncoder introduced in the course “How does Netty implement custom communication protocols”. When implementing an encoder, we usually inherit the MessageToByteEncoder abstract class. MessageToByteEncoder overrides the write method of ChannelOutboundHandler, which calls the encode method implemented by the subclass to complete data encoding. We won’t go into detail here.

So far, the event propagation flow of writeAndFlush has been analyzed. It can be seen that Netty’s Pipeline design is very clever. When calling writeAndFlush, the data is propagated between Outbound type ChannelHandler nodes, and finally the data is written to the Socket underlying layer. Let’s continue the analysis.

Write Buffer Queue #

From the above example analysis, we know that the data will continue to find Outbound nodes in the Pipeline and propagate forward until it reaches the Head node, where the Head node completes the final data sending. Therefore, the Head node plays an important role in the process of writeAndFlush. Let’s take a look at the source code of the write method in the Head node.

// HeadContext # write

@Override

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {

    unsafe.write(msg, promise);

}

// AbstractChannel # AbstractUnsafe # write

@Override

public final void write(Object msg, ChannelPromise promise) {

    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

    if (outboundBuffer == null) {

        safeSetFailure(promise, newClosedChannelException(initialCloseCause));
    AbstractChannelHandlerContext next = findContextOutbound();
    next.invokeFlush();
}

继续跟进 invokeFlush 方法,可以发现它所执行的任务被添加到了 EventLoop 的任务队列中,只是添加的任务是一个 Runnable 对象,具体代码如下:
// SingleThreadEventLoop # execute

@Override

public final boolean inEventLoop(Thread thread) {

    return thread == thread();

}

@Override

public final void execute(Runnable task) {

    if (task == null) {

        throw new NullPointerException("task");

    }

    boolean inEventLoop = inEventLoop();

    if (inEventLoop) {

        addTask(task);

    } else {

        boolean success = false;

        for (;;) {

            if (isShuttingDown()) {

                reject();

            }

            if (taskQueue.offer(task)) {

                success = true;

                break;

            } else {

                Thread.yield();

            }

        }

        if (!success) {

            reject();

        }

        if (!inEventLoop && wakesUpForTask(task)) {

            wakeup(inEventLoop);

        }

    }

}
通过源码可以看出 InEventLoop 的 websocket 会添加任务到自己的任务队列 taskQueue 中,然后不断循环判断 WebSocket 是否正在关闭。在这个循环内部如果 taskQueue.offer(task) 返回了 true,那么任务会被添加到了队列中并成功抛出,如果 taskQueue.offer 返回了 false,那么会执行 Thread.yield() 方法。yield 方法会释放 CPU 的执行权,让出自己的 CPU 执行时间,给其他线程执行的机会。

最后,如果 WebSocket 未关闭且线程并不处于 InEventLoop 中,那么会通过 wakeup 方法唤醒线程来继续后续的逻辑处理。那么 wakeup 方法是如何实现的呢?我们看一下它的实现源码:
// SingleThreadEventExecutor # wakeup

protected void wakeup(boolean inEventLoop) {

    if (!inEventLoop || state == ST_SHUTDOWN) {

        commandWakesUp = commandWakesUp + 1;

        return;

    }

    final Thread thread = threadRef;

    // 可以看到,此处通过判断 Thread 是否是等待状态,然后通过调用 Lock 的 lock 唤醒线程。

    if (!thread.isInterrupted() && wakupgLock.tryLock()) {

        try {

            interrupted = false;

            threadRef.interrupt();

        } finally {

            wakupgLock.unlock();

        }

    }

}
```java
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    
        if (outboundBuffer == null) {
    
            return;
    
        }
    
        outboundBuffer.addFlush();
    
        flush0();
    
    }

As can be seen, the core logic of flush is mainly divided into two steps: addFlush and flush0. Now let’s analyze them one by one.

First, let’s take a look at the source code of the addFlush method:

// ChannelOutboundBuffer # addFlush

public void addFlush() {

    Entry entry = unflushedEntry;

    if (entry != null) {

        if (flushedEntry == null) {

            flushedEntry = entry;

        }

        do {

            flushed ++;

            if (!entry.promise.setUncancellable()) {

                int pending = entry.cancel();

                // Subtract the bytes of pending data. If the total number of bytes is lower than the low water mark, then the Channel will become writable

                decrementPendingOutboundBytes(pending, false, true);

            }

            entry = entry.next;

        } while (entry != null);

        unflushedEntry = null;

    }

}

The addFlush method also operates on the ChannelOutboundBuffer cache data. When addFlush is executed, how does the pointers in the cache change? As shown in the figure below, we continue to analyze based on the write process.

图片14.png

At this point, the flushedEntry pointer changes to the data pointed to by the unflushedEntry pointer, and then the unflushedEntry pointer points to NULL. The data pointed to by the flushedEntry pointer will be sent to the Socket buffer.

In the addFlush source code, decrementPendingOutboundBytes is corresponding to incrementPendingOutboundBytes in the previous addMessage source code. decrementPendingOutboundBytes mainly subtracts the pending data bytes. If the size of the cache is already less than the low water mark, the Channel will be restored to writable.

We have finished introducing the overall flow of addFlush, and the next step is the second step, the flush0 method responsible for sending data. Similarly, let us follow the source code of flush0 and locate the core call chain of flush0:

// AbstractNioUnsafe # flush0

@Override

protected final void flush0() {

    if (!isFlushPending()) {

        super.flush0();

    }

}

// AbstractNioByteChannel # doWrite

@Override

protected void doWrite(ChannelOutboundBuffer in) throws Exception {

    int writeSpinCount = config().getWriteSpinCount();

    do {

        Object msg = in.current();

        if (msg == null) {

            clearOpWrite();

            return;

        }

        writeSpinCount -= doWriteInternal(in, msg);

    } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);

}

In fact, the calling level of flush0 is deep, but the core logic is in the doWrite method of AbstractNioByteChannel, which is responsible for writing the data to the Socket buffer. The processing flow of doWrite method can be summarized as follows:

Firstly, get the number of spins for the spin lock according to the configuration. Then the question that may arise is, what is the number of spins for the spin lock used for? When writing data to the underlying Socket, if the amount of data to be written each time is large, it will not be possible to write all the data at once, so it can only be written in batches. When Netty keeps calling to perform write logic, the EventLoop thread may be waiting all the time, which may block other event handling. Therefore, the number of spins for the spin lock is used to control the maximum number of loop executions for writing data at a time. If it exceeds the set number of spins, the write operation will be temporarily interrupted.

Secondly, data is sent by repeatedly calling the doWriteInternal method according to the spin lock count. For each successful data sending, the spin lock count writeSpinCount is reduced by 1. When the writeSpinCount is exhausted, the doWrite operation will be temporarily interrupted. The source code of doWriteInternal involves the underlying API of JDK NIO and will not be further discussed here. Its main purpose is to delete linked list nodes in the cache and call the underlying API to send data. Interested students can study it further.

Finally, the incompleteWrite method is called to ensure that all the data can be sent out. Because of the limitation of the number of spins, the data may not have been completely written, so OP_WRITE will need to be continued. If the data has been written completely, clear OP_WRITE.

So far, the entire working principle of writeAndFlush has been analyzed. The calling level of the entire process is relatively deep. I have organized a timing diagram of writeAndFlush, as shown below, to help everyone understand the calling process of writeAndFlush and deepen the understanding of the above knowledge points.

图片15.png

Summary #

In this lesson, we have conducted a deep analysis of the processing flow of writeAndFlush, and the following three points can be summarized:

  • writeAndFlush belongs to the outbound operation. It starts from the Tail node of the Pipeline and propagates forward to the Head node. Whether in the write or flush process, the Head node plays an important role.
  • The write method does not write data to the Socket buffer, but writes data to the ChannelOutboundBuffer cache. The ChannelOutboundBuffer cache is internally implemented by a unidirectional linked list.
  • The flush method finally writes the data to the Socket buffer.

Finally, leave a small question for you to think about: both Channel and ChannelHandlerContext have a writeAndFlush method, what is the difference between them?