17 Source Code Series, Deep Analysis of the Server Startup Process From Linux

17 Source code series, deep analysis of the server startup process from Linux #

Through the study of the previous chapters, we have gained a preliminary understanding of the technical ideas and basic principles of Netty. Starting from today’s lesson, we will formally enter the course of studying the core source code of Netty. It is hoped that by analyzing the source code, you can gain a deeper understanding of the essence of Netty, such as its design ideas, engineering skills, etc., and lay a solid foundation for further in-depth research on Netty.

Before the course begins, I would like to share some experiences and suggestions about studying source code. First, many students face the first problem of not knowing where to start when they start studying source code. At this time, you must not look aimlessly at the source code. I suggest that you start with Hello World or TestCase as the entry point for source code study, and then debug the source code through breakpoints to make it run. Second, reading source code must have a global view. First of all, you need to grasp the main process of the source code to avoid getting stuck in the details of the code at the beginning. Third, the source code must be read repeatedly, so that each reading can have different gains. We can use diagrams and notes to help us understand the core process of the source code more easily, which is convenient for subsequent review and reference.

As the first lesson in source code analysis, we will delve into the startup process of the Netty server. By starting the server, we can understand the relationship between the major core components of Netty. This will be a very good starting point for learning the source code of Netty. Let’s take a look at how each part of Netty works together.

Note: This article refers to Netty source code version 4.1.42.Final.

Starting with the Echo Server Example #

In the lesson “The Role of Bootstrappers: What Do Client and Server Startup Need to Do?”, we introduced how to use bootstrappers to build the basic framework of the server. Here, we have implemented a simplest Echo server for debugging the source code of Netty server startup.

public class EchoServer {

    public void startEchoServer(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)

                    .channel(NioServerSocketChannel.class)

                    .handler(new LoggingHandler(LogLevel.INFO)) // Set the handler for ServerSocketChannel

                    .childHandler(new ChannelInitializer<SocketChannel>() { // Set the handler for SocketChannel

                        @Override

                        public void initChannel(SocketChannel ch) {

                            ch.pipeline().addLast(new FixedLengthFrameDecoder(10));

                            ch.pipeline().addLast(new ResponseSampleEncoder());

                            ch.pipeline().addLast(new RequestSampleHandler());

                        }

                    });

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();

        } finally {

            bossGroup.shutdownGracefully();

            workerGroup.shutdownGracefully();

        }

    }

}

We will start the in-depth analysis of the startup process of the Netty server from the bootstrap ServerBootstrap. Before starting the server, you need to configure the parameters of ServerBootstrap. This step can be roughly divided into the following steps:

  • Configure the EventLoopGroup thread group;
  • Configure the type of Channel;
  • Set the handler for ServerSocketChannel;
  • Set the network listening port;
  • Set the handler for SocketChannel;
  • Configure the parameters of Channel.

The process of configuring ServerBootstrap parameters is very simple. Just store the parameter values in the member variables defined by ServerBootstrap. Let’s take a look at the member variable definitions of ServerBootstrap. They are basically one-to-one corresponding to the configuration methods exposed by ServerBootstrap, as shown below. I have explained each member variable in the form of comments, indicating the corresponding method for each member variable.

volatile EventLoopGroup group; // group()

volatile EventLoopGroup childGroup; // group()

volatile ChannelFactory<? extends C> channelFactory; // channel()

volatile SocketAddress localAddress; // localAddress()

Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<ChannelOption<?>, Object>(); // childOption()

volatile ChannelHandler childHandler; // childHandler()

ServerBootstrapConfig config = new ServerBootstrapConfig(this);

We will not go into the process of how ServerBootstrap saves parameters for each member variable here. You can understand that this part of work is just a pre-preparation. You can follow up with the source code of each method after class. Today, we will focus on the line of code b.bind().sync(). bind() is the entry point for the actual server port binding and startup, and sync() means blocking and waiting for the server to start. Next, we will analyze the bind() method in detail.

Before starting the source code analysis, let’s think about the following questions while reading:

  • How does Netty establish a connection between its own implemented Channel and the underlying Channel of the JDK?
  • What is the role of the special handler ChannelInitializer?
  • What is the process of pipeline initialization like?

Full Process of Server Startup #

First, let’s take a look at the source code implementation of the bind() method in the ServerBootstrap class.

public ChannelFuture bind() {

    validate();

    SocketAddress localAddress = this.localAddress;

    if (localAddress == null) {

        throw new IllegalStateException("localAddress not set");

    }

    return doBind(localAddress);

}

private ChannelFuture doBind(final SocketAddress localAddress) {

    final ChannelFuture regFuture = initAndRegister();

    final Channel channel = regFuture.channel();

    if (regFuture.cause() != null) {
        @Override
        public T newChannel() {
            try {
                return constructor.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
            }
        }
    }

channelFactory.newChannel() 调用的是 ReflectiveChannelFactory 的 newChannel() 方法,通过反射创建了一个新的 Channel 实例。这里的 clazz 是通过 ServerBootstrap 的 channel() 方法指定的,上文中指定了 NioServerSocketChannel.class。所以 c 会被设置为 NioServerSocketChannel.class 。

总结:initAndRegister() 创建了一个 NioServerSocketChannel 实例。接下来我们看它的详细初始化过程。

void init(Channel channel) {
    setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger); // 设置 Socket 参数
    setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); // 设置属性
    ChannelPipeline p = channel.pipeline();
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    init(channel.pipeline());
    ChannelFuture f = config().group().register(channel);
    if (f.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
        p.fireExceptionCaught(f.cause());
    } else {
        if (group != null) { // 注册的事件监听器
            group.register(f.channel()).addListener(new ChannelGroupFutureListener(this, currentChildGroup));
        }
        f.addListener(new ChannelFutureListener() {
            @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                p.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            } else {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }
            }
        });
    }
    // 这里只注册了一个监听器,用于监听 Channel 注册的结果。
    // 当 Channel 注册的结果返回之后,会执行 ChannelFutureListener 中的 operationComplete() 方法。
}

通过上面的代码,我们可以看到初始化线程组、ChannelPipeline、Channel 的流程如下:

  1. 设置 Channel 的一些 Socket 参数;
  2. 设置 Channel 的一些属性;
  3. 初始化 ChannelPipeline;
  4. 注册 Channel 到 EventLoop 上;
  5. 注册一个监听器到 ChannelFuture 中。

EpollServerSocketChannel 在 Linux 中的连接注册 #

接下来,我们继续跟进 ServerBootstrapAcceptor 的操作。我们回到之前创建 ServerBootstrapAcceptor 的地方,即在 ServerBootstrap 的 init() 方法中的最后一行代码:

p.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

在这里,我们可以看到,ServerBootstrapAcceptor 被加入到了 ChannelPipeline 的最后一个位置。我们接下来分析 ServerBootstrapAcceptor 的代码实现:

final class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

    // 省略其他代码

    @Override

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

        final Channel child = ctx.channel(); // 新建 Channel

        child.pipeline().addLast(handlers); // 添加服务端的的业务处理器

        registeredChannels.add(child); // 添加到 ChannelGroup 中

        // 关联 Channel 与 EventLoopThread,为后续的 ChannelHandler 分发做准备

        childGroup.register(child).addListener(new ChannelFutureListener() {

            @Override

            public void operationComplete(ChannelFuture future) throws Exception {

                if (!future.isSuccess()) {

                    forceClose(child, future.cause()); // 异常处理

                }

            }

        });

    }

}

从上述代码可以看出,ServerBootstrapAcceptor 实现了 ChannelInboundHandler 接口。当新建的 Channel 注册成功之后,会执行 ServerBootstrapAcceptor 的 channelRegistered() 方法。在 channelRegistered() 方法中,首先获取新创建的 Channel(即连接到服务端的客户端连接)。 然后会将服务端的业务处理器(childHandler)添加到新创建 Channel 的 pipeline 中,通过 childGroup.register(child) 将新创建的 Channel 与 EventLoopThread 关联起来,为后续的业务处理器分发做准备。

至此,我们已经分析完从客户端连接到 Netty 服务器的整个过程。在这个过程中,我们了解了 bootstrap.bind() 方法的原理,以及服务端 ChannelPipeline 的初始化工作。


setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); // Save user-defined attributes

ChannelPipeline p = channel.pipeline();

// Get the constructor parameters of ServerBootstrapAcceptor

final EventLoopGroup currentChildGroup = childGroup;

final ChannelHandler currentChildHandler = childHandler;

final Entry<ChannelOption<?>, Object>[] currentChildOptions =

        childOptions.entrySet().toArray(newOptionArray(0));

final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

// Add special handler

p.addLast(new ChannelInitializer<Channel>() {

    @Override

    public void initChannel(final Channel ch) {

        final ChannelPipeline pipeline = ch.pipeline();

        ChannelHandler handler = config.handler();

        if (handler != null) {

            pipeline.addLast(handler);

        }

        ch.eventLoop().execute(new Runnable() {

            @Override

            public void run() {

                pipeline.addLast(new ServerBootstrapAcceptor(

                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

            }

        });

    }

});

}

The source code of the init() method is quite long, so we will break it down into two parts:

Step 1: Set socket parameters and user-defined attributes. When creating a server channel, the channel’s configuration parameters are stored in NioServerSocketChannelConfig. During the initialization of the channel, Netty sets these parameters to the underlying socket of the JDK and binds user-defined attributes to the channel.

Step 2: Add special handlers. First, ServerBootstrap adds a ChannelInitializer to the pipeline. The ChannelInitializer is an anonymous class that implements the ChannelHandler interface. The initChannel() method implemented by ChannelInitializer is used to add handlers corresponding to ServerSocketChannel. Then, Netty asynchronously adds a ServerBootstrapAcceptor handler to the pipeline through an asynchronous task. From the name ServerBootstrapAcceptor, it can be seen that this is a connector specially used to accept new connections and distribute events to EventLoops for execution. However, we will not go into detail here. At this point, the internal structure of the server pipeline has changed again, as shown in the diagram below.

图片1.png

Let’s think about a question: why do we need the ChannelInitializer handler? Why does the registration process of ServerBootstrapAcceptor need to be encapsulated as an asynchronous task? This is because when initializing, the channel has not yet been registered with the Selector object, so it is not possible to register the Accept event to the Selector in advance. Therefore, the ChannelInitializer handler is added in advance, and the ServerBootstrapAcceptor handler is added to the pipeline after the channel is registered.

The initialization process of the server channel is complete. The overall process is relatively simple, mainly setting socket parameters and user-defined attributes, and adding two special handlers to the pipeline. Next, let’s continue to analyze how the initialized channel is registered with the Selector object.

Register the Server Channel #

Returning to the main process of initAndRegister(), after creating the server channel, we continue to follow the register() method source code:


// MultithreadEventLoopGroup#register

public ChannelFuture register(Channel channel) {

    return next().register(channel); // Select an eventLoop to register

}

// AbstractChannel#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

    // Other code is omitted

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) { // Called internally by the reactor thread

        register0(promise);

    } else { // Called by an external thread

        try {

            eventLoop.execute(new Runnable() {

                @Override

                public void run() {

                    register0(promise);

                }

            });

        } catch (Throwable t) {

            // Other code is omitted

        }

    }

}

Netty selects an EventLoop from the EventLoopGroup thread pool to bind with the current channel. All I/O events in the channel’s lifecycle, such as accept, connect, read, write, etc., will be handled by this EventLoop. As can be seen, whether it is called by the EventLoop thread itself or an external thread, it will eventually be registered through the register0() method.


private void register0(ChannelPromise promise) {

    try {

        if (!promise.setUncancellable() || !ensureOpen(promise)) {

            return;

        }

        boolean firstRegistration = neverRegistered;
        doRegister(); // Call JDK's underlying register() for registration

        neverRegistered = false;

        registered = true;

        pipeline.invokeHandlerAddedIfNeeded(); // Trigger the handlerAdded event

        safeSetSuccess(promise);

        pipeline.fireChannelRegistered(); // Trigger the channelRegistered event

        // At this point, the Channel is not yet bound to an address, so it is in an inactive state

        if (isActive()) {

            if (firstRegistration) {

                pipeline.fireChannelActive(); // When the Channel is active, trigger the channelActive event

            } else if (config().isAutoRead()) {

                beginRead();

            }

        }

    } catch (Throwable t) {

        // Omitted code

    }

}

register0() primarily does four things: invoking JDK’s underlying register() for Channel registration, triggering the handlerAdded event, triggering the channelRegistered event, and triggering the channelActive event when the Channel is active. Let’s analyze them one by one.

First, let’s look at the process of registering the Channel to the JDK’s underlying implementation, which corresponds to the implementation logic of doRegister().

protected void doRegister() throws Exception {

    boolean selected = false;

    for (;;) {

        try {

            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // Call JDK's underlying register() for registration

            return;

        } catch (CancelledKeyException e) {

            // Omitted code

        }

    }

}

public final SelectionKey register(Selector sel, int ops,

                                   Object att)

    throws ClosedChannelException

{

    synchronized (regLock) {

        // Omitted code

        SelectionKey k = findKey(sel);

        if (k != null) {

            k.interestOps(ops);

            k.attach(att);

        }

        if (k == null) {

            synchronized (keyLock) {

                if (!isOpen())

                    throw new ClosedChannelException();

                k = ((AbstractSelector)sel).register(this, ops, att);

                addKey(k);

            }

        }

        return k;

    }

}

javaChannel().register() is responsible for calling the JDK’s underlying implementation to register the Channel to the Selector. The third parameter passed to register() is Netty’s own implementation of the Channel object. Calling register() will bind it to the attachment of the underlying JDK Channel. This way, every time the Selector goes through the event loop, Netty can retrieve its own Channel object from the returned underlying JDK Channel.

After successfully registering the Channel to the Selector, the Pipeline will trigger a series of event propagations. Before the event propagation, how are user-defined business handlers added to the Pipeline? The answer lies in pipeline.invokeHandlerAddedIfNeeded(). Let’s take a closer look at the handling process of the handlerAdded event. The calling hierarchy of invokeHandlerAddedIfNeeded() is quite deep, and I recommend you to use the Debug feature of your IDE to trace the call stack based on the Echo server example mentioned above, as shown in the following image.

Image 2

First, let’s focus on the core source code of ChannelInitializer and analyze it step by step.

// ChannelInitializer

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    if (ctx.channel().isRegistered()) {

        if (initChannel(ctx)) {

            removeState(ctx);

        }

    }

}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
protected void doBind(SocketAddress localAddress) throws Exception {
        if (javaChannel() instanceof java.nio.channels.ServerSocketChannel) {
            java.nio.channels.ServerSocketChannel ch = (java.nio.channels.ServerSocketChannel) javaChannel();
            boolean b = ch.isBound();

            if (!b) {
                // 如果当前 Channel 还未绑定端口,则调用 JDK 底层的 bind() 方法进行绑定
                doPreBind(ch, localAddress);
                b = true;
                ch.configureBlocking(false);
            }

            SelectionKey selectionKey = selectionKey();
            selectionKey.nioInterestOps(SelectionKey.OP_ACCEPT); // 将事件注册到 Selector 上
            // 将 ServerSocketChannel 绑定到 EventLoop 的 Selector 上
            // 注册事件为 OP_ACCEPT,表示可以进行新的客户端连接
            // OP_ACCEPT 是连接建立时触发

            unsafe().register(eventLoop(), selectionKey, null);

            if (ch.isOpen()) {
                // 发起绑定操作
                ch.bind(localAddress, config.getBacklog());
                localAddress = ch.getLocalAddress(); // 获取绑定的地址
            } else {
                b = false;
            }
        } else {
            throw new UnsupportedOperationException("Only supports ServerSocketChannel");
        }

        if (b) {
            // 设置 channel 的端口和绑定的地址
            this.localAddress = localAddress;
            this.local = true;
        }
    }
if (PlatformDependent.javaVersion() >= 7) {

    javaChannel().bind(localAddress, config.getBacklog());

} else {

    javaChannel().socket().bind(localAddress, config.getBacklog());

}

}

Netty will call different bind() methods in the underlying JDK depending on the version of JDK. I am using JDK 8, so it will call the bind() method of the native JDK Channel. After executing doBind(), the server-side native JDK Channel has completed port binding.

After port binding is completed, the Channel enters the active state, and then the pipeline.fireChannelActive() method is called to trigger the channelActive event. We can dive deeper into the fireChannelActive() method and discover the important parts in it:

// DefaultChannelPipeline#channelActive
public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    readIfIsAutoRead();
}

// AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp); // Register the OP_ACCEPT event to the event set of the server-side Channel
    }
}

As can be seen, after the channelActive event propagation is complete, the readIfIsAutoRead() method is called to trigger the read event of the Channel. Eventually, it calls the doBeginRead() method in AbstractNioChannel, where the readInterestOp parameter is the SelectionKey.OP_ACCEPT event initialized earlier when initializing the Channel. Therefore, the OP_ACCEPT event will be registered in the event set of the Channel.

At this point, the entire server has truly completed startup. Let’s summarize the full process of server startup as shown in the following diagram:

                                                             +---------+
                                                             | added   |
                                                             | to      |
                                                             | pipeline|
                                                             |         |     
                                                             +----+----+
                                                                  |
                                                                  |
     +------------------------------------------------------------+
     |                    ChannelInitializer                      |
     |                (added to the pipeline)                     |
     |                                                           |
     |                                                           |
     |      .handler(new ServerBootstrapAcceptor(this));          |
     |                                                           |
     +---+----------------+-------------------------+--------------------------+           
         |                |                         |                          |
         v                v                         v                          v
     +---+---+         +--+----+                +---+---+                  +--+---+
     | Server |         | Tragets |                | New   |                  | Bound |
     |Socket  |         |Issues  |                | Server|                  | Server|
     |Channel |         |Handler |                |Channel|                  |Channel|
     +---+----+         +-------+                +---+---+                  +---+---+
         |                                          |                          |
         |                                          |                          |
         v                                          v                          v
   +------+-----+                               +------+-----+              +-------+------+
   |          |                               |              |              |              |
   |  JDK 7   |                               |   JDK 8      |              |   JDK 7   |
   | Specific |                               |  Specific    |              |  Specific |
   |  Server  |                               |   Server     |              |   Server  |
   |  Channel |                               |   Channel    |              |   Channel |
   |          |                               |              |              |           |
   +----------+                               +--------------+              +-----------+
      |
      |
      v
  +---+----------------------------+
  | Socket methods (eg. bind())   |
  | called on the native socket. |
  +-------------------------------+
  • Create the Server Channel: Essentially, create the native JDK Channel and initialize several important attributes, including id, unsafe, and pipeline.
  • Initialize the Server Channel: Set socket parameters and user-defined attributes, and add two special handlers: ChannelInitializer and ServerBootstrapAcceptor.
  • Register the Server Channel: Call the underlying JDK to register the Channel with the Selector.
  • Port binding: Call the underlying JDK to bind the port and trigger the channelActive event, registering the OP_ACCEPT event in the Channel’s event set.

Extra: How the server handles new client connections #

After Netty server is fully started, it can start working externally. What are the steps for processing new client connections by the Netty server? It can be divided into four steps:

  1. Boss NioEventLoop thread to poll new client connection OP_ACCEPT events.
  2. Construct Netty client NioSocketChannel.
  3. Register Netty client NioSocketChannel to the Worker thread.
  4. Register OP_READ event to NioSocketChannel’s event set.

Let’s briefly introduce each step.

The Boss NioEventLoop in Netty is responsible for accepting new connections. We will focus on the core source code of NioEventLoop in the next class. Here, we only need to understand the basic processing flow. When there is a new connection to the server from the client, the Boss NioEventLoop will listen for the OP_ACCEPT event, as shown in the following source code:

// NioEventLoop#processSelectedKey
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

The unsafe held by NioServerSocketChannel is of type NioMessageUnsafe. Let’s take a look at what NioMessageUnsafe.read() does.

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);  // A while loop that continuously reads data from the Buffer
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());

} catch (Throwable t) {

exception = t;

}

int size = readBuf.size();

for (int i = 0; i < size; i ++) {

readPending = false;

pipeline.fireChannelRead(readBuf.get(i)); // Propagate read event

}

readBuf.clear();

allocHandle.readComplete();

pipeline.fireChannelReadComplete(); // Propagate read complete event

// Omitted code

} finally {

if (!readPending && !config.isAutoRead()) {

    removeReadOp();

}

}

}

From the code, we can see that the core logic of the read() method is to continuously read data through a while loop and put it into a List. This data is actually a new connection. Let’s focus on the doReadMessages() method in NioServerSocketChannel.

protected int doReadMessages(List buf) throws Exception {

SocketChannel ch = SocketUtils.accept(javaChannel());

try {

    if (ch != null) {

        buf.add(new NioSocketChannel(this, ch));

        return 1;

    }

} catch (Throwable t) {

    logger.warn("Failed to create a new channel from an accepted socket.", t);

    try {

        ch.close();

    } catch (Throwable t2) {

        logger.warn("Failed to close a socket.", t2);

    }

}

return 0;

}

This is when the second step begins: constructing the Netty client NioSocketChannel. Netty first uses the accept() method in the JDK to get the native SocketChannel, and then wraps it into Netty’s own NioSocketChannel. The creation of the Netty client channel is similar to the creation of the server channel mentioned earlier, except that the type of the server channel is NioServerSocketChannel, while the type of the client channel is NioSocketChannel. Creating NioSocketChannel also completes several tasks: creating core member variables id, unsafe, and pipeline; registering the SelectionKey.OP_READ event; setting the channel to non-blocking mode; and creating the configuration for the client channel.

After successfully constructing the client NioSocketChannel, it will trigger the channelRead event propagation through pipeline.fireChannelRead(). For the server, the internal structure of the Pipeline at this time is shown in the following diagram:

[Image]

In the previous text, we mentioned a special handler - ServerBootstrapAcceptor - and it plays an important role here. The channelRead event will be propagated to the ServerBootstrapAcceptor.channelRead() method, which will assign the client channel to the worker thread pool for execution. The specific implementation is as follows:

public void channelRead(ChannelHandlerContext ctx, Object msg) {

final Channel child = (Channel) msg;

// Add childHandler in the client channel; childHandler is specified by the user through the childHandler() method in the startup class

child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);

setAttributes(child, childAttrs);

try {

    // Register the client channel

    childGroup.register(child).addListener(new ChannelFutureListener() {

        @Override

        public void operationComplete(ChannelFuture future) throws Exception {

            if (!future.isSuccess()) {

                forceClose(child, future.cause());

            }

        }

    });

} catch (Throwable t) {

    forceClose(child, t);

}

}

ServerBootstrapAcceptor immediately casts msg to Channel. Won’t there be any other types of data? Because ServerBootstrapAcceptor is a special handler in the server channel, and the channelRead event of the server channel will only be triggered when a new connection is accepted, so the data obtained here are all new connections from clients.

ServerBootstrapAcceptor will assign childHandler to the client channel in child.pipeline(); childHandler is specified by the user through the childHandler() method in the startup class. During the registration process, an interesting point is that it will call pipeline.fireChannelRegistered() to propagate the channelRegistered event, and then call pipeline.fireChannelActive() to propagate the channelActive event. After all this, it will return to the readIfIsAutoRead() method we introduced earlier, and it will register the SelectionKey.OP_READ event to the channel’s event set.

I won’t continue to analyze the specific source code for how the server handles new client connections here. I suggest you analyze the related source code of childGroup.register() by yourself to gain a deeper understanding of the server startup and new connection handling process. With a basic understanding of the source code of server startup, it will be relatively easier to understand the process of creating new client connections.

Summary

In this lesson, we have analyzed the complete process of Netty server startup and gained a basic understanding of the core components involved. The source code of Netty server startup is relatively deep, and I recommend that you organize the main process before diving into the specific methods. Start from the top and draw a complete call chain diagram (as shown in the figure above), and then tackle each step one by one.

In the next lesson, we will study the source code of the most important Reactor threading model in Netty. I recommend that you review the two lessons together, as it can help answer many of your current questions, such as how are asynchronous tasks encapsulated and executed? How are events processed after registration?