20 Transporter Layer Core Implementation Codec and Thread Model One Article to Sum Up Below

20 Transporter Layer Core Implementation Codec and Thread Model One Article to Sum Up Below #

In the previous lesson, we delved into the core abstract classes and implementation classes related to the Server in the Transporter layer. In this lesson, we will continue to analyze the remaining core interface implementations in the Transporter layer, mainly focusing on the Client interface, Channel interface, ChannelHandler interface, and related key components.

Analysis of Client Inheritance Hierarchy #

In the last lesson, when analyzing AbstractEndpoint, we discovered that in addition to the inheritance hierarchy of AbstractServer, there is also an inheritance hierarchy of AbstractClient, which represents the abstraction of the client. The core fields in AbstractClient are as follows:

  • connectLock (Lock type): This lock is used for synchronization when performing connection, disconnection, reconnection, and other operations in the Client.
  • needReconnect (Boolean type): Before sending data, it checks whether the underlying connection of the Client is disconnected. If it is disconnected, needReconnect field determines whether to reconnect.
  • executor (ExecutorService type): The thread pool associated with the current Client. The specifics of the thread pool have been explained in detail in the previous lesson and will not be repeated here.

In the constructor of AbstractClient, the URL is parsed to initialize the needReconnect and executor fields, as shown in the code example below:

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {

    super(url, handler); // Call the parent's constructor

    // Parse the URL and initialize the `needReconnect` field
    needReconnect = url.getParameter("send.reconnect", false);

    initExecutor(url); // Parse the URL and initialize the `executor` field

    doOpen(); // Initialize the underlying components of the NIO library

    // Create the underlying connection
    connect(); // Exception handling code omitted
}

Similar to the AbstractServer, the AbstractClient defines four abstract methods doOpen(), doClose(), doConnect(), and doDisConnect() that need to be implemented by subclasses.

Next, let’s take a look at NettyClient, which is implemented based on Netty 4 and extends the AbstractClient abstract class. It implements the four do*() abstract methods mentioned above. Here, we focus on the doOpen() and doConnect() methods. In the doOpen() method of NettyClient, the client is built using Bootstrap. It sets the connection timeout, keepalive, and other parameters, as well as creates and registers the ChannelHandler. The implementation is as follows:

protected void doOpen() throws Throwable {

    // Create the NettyClientHandler
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);

    bootstrap = new Bootstrap(); // Create the Bootstrap

    bootstrap.group(NIO_EVENT_LOOP_GROUP)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .channel(socketChannelClass());

    // Set the connection timeout, which uses the `connectTimeout` field in AbstractEndpoint
    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));

    bootstrap.handler(new ChannelInitializer<SocketChannel>() {

        protected void initChannel(SocketChannel ch) throws Exception {

            // Heartbeat request interval
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

            // Create the Netty encoding and decoding adapters (NettyCodecAdapter)
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);

            // Register the ChannelHandler
            ch.pipeline().addLast("decoder", adapter.getDecoder())
                         .addLast("encoder", adapter.getEncoder())
                         .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                         .addLast("handler", nettyClientHandler);

            // If Socks5Proxy is required, add the Socks5ProxyHandler (not shown)
        }
    });
}

The resulting structure of NettyClient is shown in the following diagram:

Lark20200930-161759.png

NettyClient Structure

The implementation of the NettyClientHandler method is similar to the NettyServerHandler introduced in the previous lesson. It also implements the ChannelDuplexHandler in Netty, and delegates all methods to the ChannelHandler object associated with the NettyClient. The implementation of the userEventTriggered() method in NettyClientHandler is slightly different. When receiving an IdleStateEvent, NettyServerHandler disconnects the connection, while NettyClientHandler sends a heartbeat message. The code implementation is as follows:

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    if (evt instanceof IdleStateEvent) {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

        Request req = new Request(); 

        req.setVersion(Version.getProtocolVersion());

        req.setTwoWay(true);

        req.setEvent(HEARTBEAT_EVENT); // Send heartbeat request

        channel.send(req);
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

Analysis of Channel Inheritance Hierarchy #

In addition to the AbstractEndpoint introduced in the previous lesson, AbstractChannel also extends the AbstractPeer abstract class and implements the Channel interface. The implementation of AbstractChannel is very simple, it only checks the status of the underlying connection in the send() method and does not implement the specific message sending logic.

Here, let’s take NettyChannel as an example to analyze its implementation of AbstractChannel. The core fields in NettyChannel are as follows:

  • channel (Channel type): The Netty Channel, which corresponds to the current Dubbo Channel object.
  • attributes (Map type): Additional attributes attached to the current Channel are recorded in this Map. The methods provided by NettyChannel, such as getAttribute(), hasAttribute(), setAttribute(), operate on this collection.
  • active (AtomicBoolean): Used to indicate whether the current Channel is active.

In addition, NettyChannel also has a static Map collection (CHANNEL_MAP field) that caches the mapping relationship between Netty framework Channels and Dubbo Channels in the current JVM. From the call diagram below, we can see that NettyChannel provides methods to read and write the CHANNEL_MAP collection:

Drawing 1.png The send() method in NettyChannel is responsible for sending data to the peer through the underlying Netty framework Channel. The second parameter can be used to specify if the send operation should wait for it to complete. Here is the implementation:

public void send(Object message, boolean sent) throws RemotingException {

    // Call the send() method of AbstractChannel to check if the connection is available
    super.send(message, sent); 

    boolean success = true;

    int timeout = 0;

    // Send data using the Netty framework Channel
    ChannelFuture future = channel.writeAndFlush(message);

    if (sent) { // Wait for send operation to complete with a timeout

        timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        success = future.await(timeout);

    }

    Throwable cause = future.cause();

    if (cause != null) {
        throw cause;
    }

    // If an exception occurs, remove the channel from CHANNEL_MAP cache when the underlying connection is disconnected (omitted)
}

ChannelHandler Inheritance Analysis #

The previously mentioned AbstractServer, AbstractClient, and Channel implementations all implement the ChannelHandler interface through AbstractPeer. However, this is just a simple delegation (or decorator) where all methods are delegated to the underlying ChannelHandler object.

Now let’s dive deeper into other implementations of the ChannelHandler. The following implementations are involved:

Drawing 2.png

ChannelHandler inheritance diagram

The ChannelHandlerDispatcher was explained in [Session 17]. It aggregates multiple ChannelHandler objects into a single ChannelHandler object.

ChannelHandlerAdapter is an empty implementation of ChannelHandler, and TelnetHandlerAdapter extends it and implements the TelnetHandler interface. We will discuss Dubbo’s support for Telnet separately in upcoming sessions, so we won’t go into further analysis here.

Based on the name, the ChannelHandlerDelegate interface is a wrapper for another ChannelHandler object. Its two implementation classes, AbstractChannelHandlerDelegate and WrappedChannelHandler, also just wrap another ChannelHandler object.

Among them, AbstractChannelHandlerDelegate has three implementations, which are quite simple. Let’s go through them one by one:

  • MultiMessageHandler: A ChannelHandler implementation specifically for handling MultiMessage. MultiMessage is a type of message in the Exchange layer that encapsulates multiple messages. When MultiMessageHandler receives a MultiMessage, its received() method iterates through all the messages and delegates them to the underlying ChannelHandler for further processing.

  • DecodeHandler: A ChannelHandler implementation specifically for handling Decodeable. Classes implementing the Decodeable interface provide a decode() method to decode themselves. The DecodeHandler.received() method obtains the decoded message from that method and passes it to the underlying ChannelHandler for further processing.

  • HeartbeatHandler: A ChannelHandler implementation specifically for handling heartbeat messages. When the HeartbeatHandler.received() method receives a heartbeat request, it generates the corresponding heartbeat response and returns it. When it receives a heartbeat response, it logs the response. For other types of messages, it delegates them to the underlying ChannelHandler for further processing. Here is the core implementation:

    public void received(Channel channel, Object message) throws RemotingException {
    
        setReadTimestamp(channel); // Record the timestamp of the most recent read and write events
    
        if (isHeartbeatRequest(message)) { // Receive heartbeat request
    
            Request req = (Request) message;
    
            if (req.isTwoWay()) { // Return heartbeat response, note that the request ID is carried
    
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(HEARTBEAT_EVENT);
                channel.send(res);
    
                return;
            }
    
            if (isHeartbeatResponse(message)) { // Receive heartbeat response
    
                // Log response (omitted)
    
                return;
            }
    
            handler.received(channel, message);
        }
    }
    

Furthermore, we can see that HeartbeatHandler records the most recent read and write timestamp as an additional attribute in the Channel.

Through the above analysis, we can see that the three implementations under AbstractChannelHandlerDelegate actually add some enhanced features to the original ChannelHandler. This is a typical application of the decorator pattern.

Dispatcher and ChannelHandler #

Next, let’s introduce another inheritance line of the ChannelHandlerDelegate interface, which is WrappedChannelHandler. Its subclasses mainly determine how Dubbo processes received events and messages in terms of thread model, known as the “message dispatch mechanism”. It has a close connection with the ThreadPool introduced earlier.

Drawing 3.png

WrappedChannelHandler inheritance diagram

From the above diagram, we can see that each WrappedChannelHandler implementation is created by a corresponding Dispatcher implementation. Here is the definition of the Dispatcher interface:

@SPI(AllDispatcher.NAME) // The default extension name is "all"
public interface Dispatcher {
    // The extension name can be specified in the URL parameters to override the default extension
    @Adaptive({"dispatcher", "dispather", "channel.handler"})
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}

The AllDispatcher creates the AllChannelHandler object which handles all network events and messages using the associated thread pool. AllChannelHandler overrides all network event handling methods in WrappedChannelHandler except for the sent() method, and executes the logic of its underlying ChannelHandler in the associated thread pool.

Let’s start with the connect() method. It encapsulates the processing of the CONNECTED event into a ChannelEventRunnable and submits it to the thread pool for execution. Here is the implementation:

public void connected(Channel channel) throws RemotingException {

    ExecutorService executor = getExecutorService(); // Get the shared thread pool

    // Encapsulate the processing of the CONNECTED event into a ChannelEventRunnable and submit it to the thread pool
    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));

    // Exception handling logic (omitted)
}

The getExecutorService() method here retrieves the corresponding public thread pool from the ExecutorRepository based on the current endpoint (Server/Client) URL.

The disconnected() method handles connection disconnection events, and the caught() method handles exception events. They are implemented in the same way mentioned above, but are not further explained here.

The received() method is called when the current endpoint receives data. The specific execution process is as follows: firstly, the IO thread (EventLoopGroup in Netty) decodes the request from the binary stream, then calls the received() method of AllChannelHandler, which submits the request to the thread pool for execution. After execution, the sent() method is called to write back the response to the remote end. The specific implementation of the received() method is as follows:

public void received(Channel channel, Object message) throws RemotingException {

    // Get the thread pool

    ExecutorService executor = getPreferredExecutorService(message);

    try {

        // Wrap the message into a ChannelEventRunnable task and submit it to the thread pool for execution

        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

    } catch (Throwable t) {

        // If the thread pool is full, the request will be rejected. Here, depending on the request configuration, a descriptive response may be returned

        if(message instanceof Request && t instanceof RejectedExecutionException){

            sendFeedback(channel, (Request) message, t);

            return;

        }

        throw new ExecutionException("...");

    }

}

The getPreferredExecutorService() method has special handling for responses: if the thread pool associated with the request is specified when sending, when receiving the corresponding response message, it will first check the thread pool associated with the request based on the request ID and process the response.

public ExecutorService getPreferredExecutorService(Object msg) {

    if (msg instanceof Response) {

        Response response = (Response) msg;

        DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId()); // Get the DefaultFuture associated with the request

        if (responseFuture == null) { 

            return getSharedExecutorService();

        } else { // If the request is associated with a thread pool, the corresponding threads will be used to process the response

            ExecutorService executor = responseFuture.getExecutor();

            if (executor == null || executor.isShutdown()) {

                executor = getSharedExecutorService();

            }

            return executor;

        }

    } else { // If it is a request message, use the shared thread pool

        return getSharedExecutorService();

    }

}

Here, the concepts of Request and Response are involved, which are concepts in the Exchange layer and will be explained in detail later. For now, you only need to know that they are different message types.

Note that AllChannelHandler does not override the sent() method of its parent class, which means that sending messages is done directly by calling the sent() method in the current thread.

Next, let’s look at the implementation of the remaining WrappedChannelHandler. The ExecutionChannelHandler (created by ExecutionDispatcher) only dispatches request messages to the thread pool for processing, which means that it only overrides the received() method. The response messages and other network events (such as connection establishment events, connection disconnection events, heartbeat messages, etc.) are directly processed in the IO thread.

The DirectChannelHandler implementation (created by DirectDispatcher) handles all messages and network events directly in the IO thread.

The MessageOnlyChannelHandler implementation (created by MessageOnlyDispatcher) submits all received messages to the thread pool for processing, while other network events are directly handled by the IO thread.

The ConnectionOrderedChannelHandler implementation (created by ConnectionOrderedDispatcher) submits received messages to the thread pool for processing, and for connection establishment and disconnection events, they are submitted to a separate thread pool and queued for processing. In the constructor of ConnectionOrderedChannelHandler, a thread pool is initialized, and the length of its queue is fixed:

public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {

    super(handler, url);

    String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);

    // Note that this thread pool only has one thread, and the length of the queue is fixed,

    // specified by the `connect.queue.capacity` parameter in the URL

    connectionExecutor = new ThreadPoolExecutor(1, 1, 

            0L, TimeUnit.MILLISECONDS,

            new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),

            new NamedThreadFactory(threadName, true),

            new AbortPolicyWithReport(threadName, url)

    );

    queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);

}

In the connected() and disconnected() methods of ConnectionOrderedChannelHandler, the connection establishment and disconnection events are submitted to the above connectionExecutor thread pool for processing.

When introducing the various implementations of WrappedChannelHandler above, you can see that there is special handling for the ThreadlessExecutor type of thread pool. For example, in the ExecutionChannelHandler.received() method, there is the following conditional logic:

public void received(Channel channel, Object message) throws RemotingException {

    // Get the thread pool (the thread pool associated with the request or the shared thread pool)

    ExecutorService executor = getPreferredExecutorService(message);

    if (message instanceof Request) { // Submit the request message directly to the thread pool for processing

        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

    } else if (executor instanceof ThreadlessExecutor) {

        // Special handling for the ThreadlessExecutor type of thread pool

        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

    } else {

        handler.received(channel, message);

    }

}

Optimization of ThreadlessExecutor #

ThreadlessExecutor is a special type of thread pool, which is different from other normal thread pools in that ThreadlessExecutor does not manage any threads internally.

We can submit tasks to the ThreadlessExecutor thread pool by calling the execute() method. However, these submitted tasks are not scheduled for execution on any thread, but instead stored in a blocking queue. Only when another thread calls the ThreadlessExecutor.waitAndDrain() method will the tasks be executed. In other words, the thread that executes the tasks is the same as the one calling the waitAndDrain() method.

So why do we have this implementation of ThreadlessExecutor? This is mainly because, before Dubbo version 2.7.5, each connection would start a thread pool in the WrappedChannelHandler.

In the old version, there is no concept of ExecutorRepository, so it does not reuse the same thread pool based on the URL, but creates a new thread pool through SPI.

At that time, the thread model for Dubbo Consumer synchronous requests was as shown in the diagram below:

Drawing 4.png

Thread model for Dubbo Consumer synchronous requests

From the diagram, we can see the following request-response flow:

  1. After the business thread sends a request, it obtains a Future instance.
  2. The business thread then calls Future.get() to block and wait for the request result to return.
  3. When the response is returned, it will be deserialized and processed by the independent thread pool associated with the connection.
  4. After the processing is completed, the business result will be returned to the business thread through the Future.set() method.

In this design, the Consumer side maintains a thread pool, and the thread pool is isolated by connection, meaning that each connection has its own thread pool. This means that in scenarios where a large number of services need to be consumed and the concurrency is high, such as typical gateway scenarios, the number of threads on the Consumer side may continue to increase, resulting in excessive CPU consumption for thread scheduling, or even OutOfMemoryError due to excessive thread creation.

To solve the above problems, starting from version 2.7.5, Dubbo introduced ThreadlessExecutor, and modified the thread model as shown in the following diagram:

Drawing 5.png

Structure after introducing ThreadlessExecutor

  1. After the business thread sends a request, it obtains a Future object.
  2. The business thread calls the ThreadlessExecutor.waitAndDrain() method, which waits on the blocking queue.
  3. When a response is received, the IO thread generates a task and fills it into the ThreadlessExecutor queue.
  4. The business thread takes out the added task and executes it in the current thread. After obtaining the business result, it calls the Future.set() method to set the result, and then the waitAndDrain() method returns.
  5. The business thread retrieves the result value from the Future.

After understanding the reasons for the existence of ThreadlessExecutor, let’s delve into the core implementation of ThreadlessExecutor. First, there are several core fields of ThreadlessExecutor:

  • queue (of type LinkedBlockingQueue): Blocking queue used to pass tasks between IO threads and business threads.
  • waiting, finished (of type Boolean): The waitAndDrain() method in ThreadlessExecutor is generally bound to a single RPC call and is executed only once. When waitAndDrain() is called again, it checks the finished field. If it is true, the call directly returns. When execute() is called again to submit a task, it determines whether the task is put into the queue to wait for execution by the business thread or executed directly by the sharedExecutor thread pool based on the waiting field.
  • sharedExecutor (of type ExecutorService): The shared thread pool associated with ThreadlessExecutor. When the business thread no longer waits for a response, the submitted tasks are executed by this shared thread.
  • waitingFuture (of type CompletableFuture): Points to the DefaultFuture object corresponding to the request. We will give a detailed explanation of its specific implementation in the following lessons.

The core logic of ThreadlessExecutor lies in the execute() method and waitAndDrain() method. The execute() method is relatively simple. It determines where to submit the task based on the waiting state. The relevant example code is as follows:

public void execute(Runnable runnable) {

    synchronized (lock) {

        if (!waiting) { // Check if the business thread is still waiting for the response result

            // If not waiting, directly hand over the task to the shared thread pool

            sharedExecutor.execute(runnable); 

        } else { // If the business thread is still waiting, write the task to the queue for the business thread to execute

            queue.add(runnable);

        }

    }
}

}

In the waitAndDrain() method, it first checks the value of the finished field. Then, it retrieves all tasks from the blocking queue and executes them. After execution, it modifies the finished and waiting fields to indicate that the current ThreadlessExecutor has finished its work and there are no business threads waiting.

public void waitAndDrain() throws InterruptedException {

    if (finished) { // Check the current ThreadlessExecutor status

        return;

    }

    // Retrieve tasks from the blocking queue

    Runnable runnable = queue.take();

    synchronized (lock) {

        waiting = false; // Modify the waiting status

        runnable.run(); // Execute the task

    }

    runnable = queue.poll(); // If there are other tasks in the blocking queue, they need to be executed as well

    while (runnable != null) {

        runnable.run(); // Omit exception handling logic

        runnable = queue.poll();

    }

    finished = true; // Modify the finished status

}

With this, we have completed the introduction to the implementation of the ChannelHandler in the Transporter layer of Dubbo. It involves multiple decorators of ChannelHandler. To help you better understand, let’s go back to NettyServer and see how it encapsulates the upper-layer ChannelHandler.

In the constructor of NettyServer, the ChannelHandlers.wrap() method is called to decorate the passed-in ChannelHandler object:

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {

    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)

            .getAdaptiveExtension().dispatch(handler, url)));

}

Combined with the previous analysis, we can obtain the following graph:

Drawing 6.png

Server-side ChannelHandler structure diagram

We can set breakpoints and debug at the place where NettyServerHandler is created to obtain the following graph, which also confirms the content of the above graph:

Drawing 7.png

Summary #

In this lesson, we focused on the implementation and optimization of the Client, Channel, and ChannelHandler in the Dubbo Transporter layer.

First, we introduced the AbstractClient abstract interface and the implementation based on Netty 4, NettyClient. Next, we introduced the AbstractChannel abstract class and the implementation NettyChannel. Finally, we delved into the implementation of the ChannelHandler interface, including a detailed analysis of key ChannelHandler implementations such as WrappedChannelHandler, as well as the optimization of ThreadlessExecutor.