19 Transporter Layer Core Implementation Codec and Thread Model One Article to Sum Up Above

19 Transporter Layer Core Implementation Codec and Thread Model One Article to Sum Up Above #

In Lesson 17, we detailed the core abstract interfaces related to Transporter in the dubbo-remoting-api module. In this lesson, we will continue exploring other contents of the dubbo-remoting-api module. We will start with the core interfaces of the Transporter layer, such as RemotingServer, Client, Channel, and ChannelHandler, and discuss their implementations.

AbstractPeer Abstract Class #

First, let’s take a look at the abstract class AbstractPeer. It implements both the Endpoint interface and the ChannelHandler interface. As shown in the diagram below, it is also the parent class of the abstract classes AbstractChannel and AbstractEndpoint.

Drawing 0.png

In the AbstractPeer class, there are four fields: one is a field of type URL that represents the endpoint itself, and two Boolean fields (closing and closed) used to record the current state of the endpoint. These three fields are all related to the Endpoint interface. The fourth field points to a ChannelHandler object. All implementations of the ChannelHandler interface in AbstractPeer are delegated to this ChannelHandler object. From the inheritance diagram above, we can conclude that AbstractChannel, AbstractServer, and AbstractClient all need to be associated with a ChannelHandler object.

AbstractEndpoint Abstract Class #

Continuing from the inheritance diagram, AbstractEndpoint extends the AbstractPeer abstract class. AbstractEndpoint maintains a Codec2 object (codec field) and two timeout values (timeout field and connectTimeout field). In the constructor of AbstractEndpoint, these three fields are initialized based on the URL passed in:

public AbstractEndpoint(URL url, ChannelHandler handler) {
    super(url, handler); // call the constructor of the parent class AbstractPeer

    // Determine the specific implementation of Codec2 based on the codec parameter value in the URL
    this.codec = getChannelCodec(url);

    // Set the value of timeout field based on the timeout parameter in the URL (default: 1000)
    this.timeout = url.getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);

    // Set the value of connectTimeout field based on the connect.timeout parameter in the URL (default: 3000)
    this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}

In Lesson 17, when introducing the Codec2 interface, it was mentioned that it is an SPI extension. The “AbstractEndpoint.getChannelCodec()” method is used to select its extension implementation based on Dubbo SPI. The specific implementation is as follows:

protected static Codec2 getChannelCodec(URL url) {
    // Get the extension name based on the codec parameter in the URL
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");

    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); // Load and instantiate the specific extension implementation of Codec2 through ExtensionLoader
    } else {
        // If there is no extension name for the Codec2 interface, try to find it from the extension names of the Codec interface (which is currently deprecated)
    }
}

In addition, AbstractEndpoint also implements the Resetable interface, which only requires implementing a “reset()” method. Although the reset() method in AbstractEndpoint is quite long, its logic is very simple - it resets the three fields of AbstractEndpoint based on the URL parameters passed in. The following code snippet is resetting the codec field, which is also implemented by calling the “getChannelCodec()” method:

public void reset(URL url) {
    // Check if the current AbstractEndpoint has been closed (omitted)
    // Omit the logic for resetting the timeout and connectTimeout fields
    try {
        if (url.hasParameter(Constants.CODEC_KEY)) {
            this.codec = getChannelCodec(url);
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
}

Server Inheritance Analysis #

Both AbstractServer and AbstractClient implement the AbstractEndpoint abstract class. Let’s start with the implementation of AbstractServer. The following diagram shows that AbstractServer extends AbstractEndpoint and also implements the RemotingServer interface:

Drawing 1.png

AbstractServer is an abstraction for the server and implements common server logic. AbstractServer has several core fields:

  • localAddress and bindAddress (InetSocketAddress type): correspond to the local address and the bound address of the Server, obtained from the parameters in the URL. The default value of bindAddress is the same as localAddress.
  • accepts (int type): the maximum number of connections that this Server can accept, obtained from the “accepts” parameter in the URL. The default value is 0, indicating no limit.
  • executorRepository (ExecutorRepository type): responsible for managing thread pools. We will have a detailed explanation of the implementation of ExecutorRepository later.
  • executor (ExecutorService type): the thread pool associated with the current Server, which is created and managed by the ExecutorRepository mentioned above.

In the constructor of AbstractServer, the above fields are initialized based on the URL passed in. The doOpen() abstract method is called to complete the startup of the Server. The specific implementation is as follows:

public AbstractServer(URL url, ChannelHandler handler) {
    super(url, handler); // call the constructor of the parent class
    localAddress = getUrl().toInetSocketAddress();

    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());

    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = ANYHOST_VALUE;
    }

    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);

    try {
        doOpen(); // call the doOpen() abstract method to start the Server
    } catch (Throwable t) {
        // handle exception
    }
}

throw new RemotingException("...");

}

// Get the thread pool associated with this Server

executor = executorRepository.createExecutorIfAbsent(url);

}

ExecutorRepository #

Before we continue analyzing the specific implementation class of AbstractServer, let’s first understand the ExecutorRepository interface.

ExecutorRepository is responsible for creating and managing thread pools in Dubbo. Although this interface is an SPI extension point, there is only one default implementation - DefaultExecutorRepository. In this default implementation, a ConcurrentMap» collection (data field) is maintained to cache existing thread pools. The first-level key value represents whether the thread pool belongs to the Provider side or the Consumer side, and the second-level key value represents the port associated with the thread pool.

The DefaultExecutorRepository.createExecutorIfAbsent() method creates the corresponding thread pool based on the URL parameter and caches it in the appropriate location. The specific implementation is as follows:

public synchronized ExecutorService createExecutorIfAbsent(URL url) {

    // Determine the first-level key based on the side parameter in the URL

    String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;

    if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {

        componentKey = CONSUMER_SIDE;

    }

    Map<Integer, ExecutorService> executors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>());

    // Determine the second-level key based on the port value in the URL

    Integer portKey = url.getPort();

    ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));

    // If the corresponding thread pool in the cache is already closed, the createExecutor() method also needs to be called

    // to create a new thread pool and replace the closed thread pool in the cache. This logic is omitted here.

    return executor;

}

In the createExecutor() method, it will use Dubbo SPI to find the extension implementation of the ThreadPool interface and call its getExecutor() method to create the thread pool. The ThreadPool interface is annotated with @SPI and defaults to use the FixedThreadPool implementation. However, the getExecutor() method in the ThreadPool interface is annotated with @Adaptive, and the dynamically generated adapter class will prioritize selecting the extension implementation of ThreadPool based on the threadpool parameter in the URL. The implementation classes of the ThreadPool interface are shown in the figure below:

Drawing 2.png

ThreadPool inheritance diagram

Different implementations create thread pools with different characteristics based on URL parameters. Here is an analysis of the CacheThreadPool as an example:

public Executor getExecutor(URL url) {

    String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);

    // Core thread count

    int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);

    // Maximum thread count

    int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);

    // Maximum length of the buffer queue

    int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);

    // Maximum idle time of non-core threads, when the idle time of non-core threads exceeds this value, they will be recycled

    int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

    // The following code relies on JDK's ThreadPoolExecutor to create a thread pool with specified characteristics and return it

    return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,

            queues == 0 ? new SynchronousQueue<Runnable>() :

                    (queues < 0 ? new LinkedBlockingQueue<Runnable>()

                            : new LinkedBlockingQueue<Runnable>(queues)),

            new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));

}

Let’s briefly explain other thread pools created by ThreadPool implementations.

  • LimitedThreadPool: Same as CacheThreadPool, you can specify the core thread count, maximum thread count, and buffer queue length. The difference is that the non-core threads created by LimitedThreadPool’s thread pool will not be recycled.
  • FixedThreadPool: The core and maximum thread counts are the same and will not be recycled.

The above three types of thread pools are based on the JDK ThreadPoolExecutor thread pool. When all the core threads are occupied, tasks will be placed in the buffer queue for caching before a new thread is created to handle the tasks when the buffer queue is full.

The thread pool created by EagerThreadPool is EagerThreadPoolExecutor (which inherits from the ThreadPoolExecutor provided by JDK), and the queue used is TaskQueue (which inherits from LinkedBlockingQueue). The difference between this thread pool and ThreadPoolExecutor is that EagerThreadPoolExecutor creates threads to execute tasks first before placing them in the buffer queue when the thread count has not reached the maximum thread count, instead of putting them in the buffer queue. When the thread count reaches the maximum value, EagerThreadPoolExecutor places the tasks in the buffer queue and waits for idle threads.

EagerThreadPoolExecutor overrides two methods in ThreadPoolExecutor: the execute() method and the afterExecute() method. The specific implementation is as follows. We can see that it maintains a submittedTaskCount field (AtomicInteger type), which is used to record the total number of tasks in the thread pool (the number of tasks being executed in threads + the number of tasks waiting in the queue).

public void execute(Runnable command) {

    // Increment submittedTaskCount before submitting the task

    submittedTaskCount.incrementAndGet();

    try {

        super.execute(command); // Submit the task

    } catch (RejectedExecutionException rx) {

        final TaskQueue queue = (TaskQueue) super.getQueue();

        try {

            // After the task is rejected, try to put it back into the queue for caching and wait for idle threads to execute

            if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {

                // When the task is rejected again, the queue is full and cannot execute the task

                // Decrement submittedTaskCount

                submittedTaskCount.decrementAndGet();

                throw new RejectedExecutionException("Queue capacity is full.", rx);

            }

        } catch (InterruptedException x) {

            // Exception when putting back into the queue, decrement submittedTaskCount

            submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);

}

} catch (Throwable t) { // Task submission exception, decrement submittedTaskCount

submittedTaskCount.decrementAndGet();

throw t;

}

}

protected void afterExecute(Runnable r, Throwable t) {

// Decrease submittedTaskCount when the task is finished

submittedTaskCount.decrementAndGet();

}

When you see this, you might be confused: there is no logic to create threads and execute tasks. In fact, the key lies in the associated TaskQueue implementation. It overrides the LinkedBlockingQueue.offer() method, which checks whether the submittedTaskCount of the thread pool has reached the maximum number of threads. If it has not exceeded, it returns false, forcing the thread pool to create a new thread to execute the task. The sample code is as follows:

```java
public boolean offer(Runnable runnable) {

// Get the current number of active threads in the thread pool

int currentPoolThreadSize = executor.getPoolSize();

// If there are idle threads, submit the task directly to the queue. The idle thread will fetch the task and execute it immediately.

if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {

    return super.offer(runnable);

}

// If there are no idle threads but new threads can be created, return false to force the thread pool to create new threads to execute tasks.

if (currentPoolThreadSize < executor.getMaximumPoolSize()) {

    return false;

}

// If the current number of threads has reached the upper limit, the task can only be cached in the queue.

return super.offer(runnable);

}

The last related detail of the thread pool is AbortPolicyWithReport, which inherits the ThreadPoolExecutor.AbortPolicy. In the overridden rejectedExecution method, it outputs a WARN-level log containing information about the thread pool, then calls the dumpJStack() method, and finally throws a RejectedExecutionException.

Let’s go back to the inheritance line of Server. Next, let’s look at NettyServer implemented based on Netty 4. It extends the AbstractServer introduced earlier and implements the doOpen() and doClose() methods. Here, let’s focus on the doOpen() method:

protected void doOpen() throws Throwable {

// Create ServerBootstrap

bootstrap = new ServerBootstrap();

// Create boss EventLoopGroup

bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");

// Create worker EventLoopGroup

workerGroup = NettyEventLoopFactory.eventLoopGroup(

    getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),

    "NettyServerWorker");

// Create NettyServerHandler, which is an implementation of ChannelHandler in Netty,

// not an implementation of the ChannelHandler interface in Dubbo's Remoting layer.

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

// Get all the Channels created by the current NettyServer. The channels collection here

// contains not Netty's Channel objects, but Dubbo's Channel objects in the Remoting layer.

channels = nettyServerHandler.getChannels();

// Initialize ServerBootstrap and specify the boss and worker EventLoopGroup

bootstrap.group(bossGroup, workerGroup)

    .channel(NettyEventLoopFactory.serverSocketChannelClass())

    .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)

    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)

    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

    .childHandler(new ChannelInitializer<SocketChannel>() {

        @Override

        protected void initChannel(SocketChannel ch) throws Exception {

            // Connection idle timeout

            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());

            // NettyCodecAdapter will create the Decoder and Encoder

            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);

            ch.pipeline()

                // Register Decoder and Encoder

                .addLast("decoder", adapter.getDecoder())

                .addLast("encoder", adapter.getEncoder())

                // Register IdleStateHandler

                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))

                // Register NettyServerHandler

                .addLast("handler", nettyServerHandler);

        }

    });

// Bind the specified address and port

ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly(); // Wait for bind operation to complete

channel = channelFuture.channel();

After reading the doOpen() method implementation of NettyServer, you will find that it is similar to the basic process of starting a Netty server in the simplified RPC framework: initializing ServerBootstrap, creating boss EventLoopGroup and worker EventLoopGroup, creating ChannelInitializer to specify how to initialize Channel and other standardized processes used by Netty.

In fact, from the perspective of the Transporter layer, the only difference in functionality is the different ChannelHandlers registered on the Channel. The server structure obtained by the doOpen() method is as follows:

5.png

Core ChannelHandlers #

Now let’s take a look at the core functionality of these four ChannelHandlers.

Firstly, the decoder and encoder are inner classes of NettyCodecAdapter in Netty, as shown in the following image. They both inherit from ByteToMessageDecoder and MessageToByteEncoder in Netty, respectively.

Drawing 4.png

Do you notice the codec field (of type Codec2) in the AbstractEndpoint abstract class? The InternalDecoder and InternalEncoder delegate the actual encoding and decoding tasks to the Codec2 object associated with the NettyServer. Let’s take a closer look at InternalDecoder for analysis:

private class InternalDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {

        // Wrap ByteBuf into a unified ChannelBuffer
        ChannelBuffer message = new NettyBackedChannelBuffer(input);

        // Get the associated Channel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

        do {

            // Record the current readerIndex position
            int saveReaderIndex = message.readerIndex();

            // Delegate the decoding to the Codec2
            Object msg = codec.decode(channel, message);

            // If the currently received data is less than the length of a message,
            // NEED_MORE_INPUT will be returned.
            // Here, the readerIndex will be reset to continue waiting for more data to be received.
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {

                message.readerIndex(saveReaderIndex);
                break;

            } else {

                if (msg != null) {
                    // Pass the read message to the subsequent handlers for processing
                    out.add(msg);
                }
            }

        } while (message.readable());
    }
}

Do you notice the similarities between the implementation of InternalDecoder and the decoder implementation in the simplified RPC framework?

The specific implementation of InternalEncoder will not be explained here, but you can refer to the source code for further study and analysis if you are interested.

Next is the IdleStateHandler, which is a utility ChannelHandler provided by Netty. It is used for the functionality of sending periodic heartbeat requests or automatically closing long-idle connections. What is the principle behind it? In IdleStateHandler, several fields such as lastReadTime and lastWriteTime record the time of the most recent read/write event. When IdleStateHandler is initialized, it creates a scheduled task to periodically check the difference between the current time and the last read/write time. If it exceeds the threshold we set (which is the idleTimeout set in the NettyServer), an IdleStateEvent will be triggered and passed to the subsequent ChannelHandler for processing. The userEventTriggered() method in the subsequent ChannelHandler will decide whether to close the long-idle connection or send a heartbeat probe based on the received IdleStateEvent.

Finally, let’s look at the NettyServerHandler, which inherits from ChannelDuplexHandler. This is a ChannelHandler provided by Netty that can handle both inbound and outbound data. You can see from the inheritance diagram below.

Drawing 5.png

NettyServerHandler has two core fields: channels (a map) and handler (of type ChannelHandler).

  • channels (a Map collection): It records all the channels created by the current Server. You can see from the image below that the channels collection will be manipulated when a connection is created (channelActive() method is triggered) or disconnected (channelInactive() method is triggered).

Drawing 6.png

  • handler (of type ChannelHandler): Almost all methods in NettyServerHandler will trigger this Dubbo ChannelHandler object (as shown in the image below).

Drawing 7.png

Here is an example analysis of the write() method:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

    super.write(ctx, msg, promise); // Continue to pass the sent data downward

    // It does not affect the continuous sending of messages.
    // It only triggers the `sent()` method for related operations. The name of the method is in the past tense
    // because it is a verb, you can analyze it carefully. Although other methods may not be as obvious as this one,
    // this example describes the write() method to explain the concept.

    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    handler.sent(channel, msg);
}

When creating the NettyServerHandler in the NettyServer, you can see the following line of code:

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

The second parameter passed here is the NettyServer object. If you trace back the inheritance structure of NettyServer, you will find that its top-level parent class AbstractPeer implements ChannelHandler and delegates all methods to the encapsulated ChannelHandler object, as shown in the image below:

Drawing 8.png

In other words, NettyServerHandler will delegate data to this ChannelHandler.

So far, the discussion on the server side has been completed. Take a look at the inheritance path starting from AbstractPeer, all the way down, and you will see that NettyServer has acquired the capabilities of Endpoint, ChannelHandler, RemotingServer and multiple interfaces, associated with a ChannelHandler object and a Codec2 object, and ultimately delegates the data to these two objects for processing. So, the upper-level caller only needs to implement the ChannelHandler and Codec2 interfaces.

6.png

Summary #

In this lesson, we focused on the server-side implementation of the Dubbo Transporter layer.

First, we introduced the AbstractPeer abstract class, the top-level abstract class, and understood the common attributes of Server, Client, and Channel. Next, we explained the AbstractEndpoint abstract class, which provides common capabilities required by Server and Client, such as encoding and decoding. Finally, we performed a detailed analysis of the AbstractServer abstract class and the NettyServer implementation based on Netty 4. We also delved into various components involved, such as ExecutorRepository and NettyServerHandler.