22 Exchange Layer Deep Analysis Fully Understanding Request Response Model Below

22 Exchange Layer Deep Analysis Fully Understanding RequestResponse Model Below #

In the previous lesson, we focused on analyzing the Channel interface and ChannelHandler interface implementations in the Exchange layer. We also introduced the Request and Response base classes, as well as the DefaultFuture implementation. In this lesson, we will continue to explain the implementation logic of other interfaces in the Exchange layer.

HeaderExchangeClient #

HeaderExchangeClient is a decorator for the Client, primarily adding two functionalities to the Client:

  • Maintaining a long-term connection with the Server, which is achieved by sending heartbeat messages at regular intervals.
  • Reconnecting after a connection failure, which is achieved by periodically checking the connection status.

Therefore, HeaderExchangeClient focuses on resource allocation for the timer and task scheduling.

HeaderExchangeClient implements the ExchangeClient interface, as shown in the following diagram. It indirectly implements the ExchangeChannel and Client interfaces. The ExchangeClient interface is an empty interface with no defined methods.

Drawing 0.png

Inheritance diagram of HeaderExchangeClient

HeaderExchangeClient has the following two core fields:

  • client (of type Client): The Client object being decorated. HeaderExchangeClient delegates the implementation of the Client interface to this object.
  • channel (of type ExchangeChannel): The connection between the Client and the server. HeaderExchangeChannel is also a decorator that we have explained in detail earlier. HeaderExchangeClient delegates the implementation of the ExchangeChannel interface to this object.

The constructor of HeaderExchangeClient takes the Transport layer’s Client object as its first parameter, and the startTimer parameter determines whether to enable the heartbeat and reconnect tasks. Only if it is true and other conditions are met, the tasks will be started. Let’s take the heartbeat task as an example:

private void startHeartBeatTask(URL url) {

    if (!client.canHandleIdle()) { // Determine whether to start the heartbeat task based on the implementation of the Client

        AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

        // Calculate the heartbeat interval, with a minimum interval of 1 second

        int heartbeat = getHeartbeat(url); 

        long heartbeatTick = calculateLeastDuration(heartbeat);

        // Create the heartbeat task

        this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);

        // Submit the task to the IDLE_CHECK_TIMER wheel to wait for execution

        IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);

    }

}

The reconnect task is started in the startReconnectTask() method, which determines whether to start the task based on the parameters in the URL. The reconnect task is also ultimately submitted to the IDLE_CHECK_TIMER wheel. The definition of the timer wheel is as follows:

private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(

            new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL);

In fact, the implementation of the startReconnectTask() method is similar to the startHeartBeatTask() method shown earlier, so it will not be repeated here.

Let’s now go back to the analysis of the heartbeat task. You can review the NettyClient implementation introduced in Lesson 20, where the canHandleIdle() method returns true, indicating that this implementation can send heartbeat requests on its own without the need for HeaderExchangeClient to start a timer task. NettyClient mainly relies on the timer task in IdleStateHandler to trigger heartbeat events and the NettyClientHandler to send heartbeat requests.

For Client implementations that cannot send heartbeat requests on their own, HeaderExchangeClient starts the HeartbeatTimerTask heartbeat timer task for them. The inheritance diagram of the HeartbeatTimerTask is shown in the following diagram:

Drawing 1.png

Inheritance diagram of TimerTask

Let’s start with the abstract class AbstractTimerTask, which has three fields:

  • channelProvider (of type ChannelProvider): ChannelProvider is an inner interface defined in the AbstractTimerTask abstract class. The timer task will get the Channel related to this task from this object.
  • tick (of type Long): The expiration time of the task.
  • cancel (of type boolean): Whether the task has been canceled.

The AbstractTimerTask abstract class implements the run() method of the TimerTask interface. First, it retrieves the Channel collection related to this task from the ChannelProvider. For the Client, there is only one Channel, while for the Server, there are multiple Channels. It then checks the status of each Channel and processes them with the doTask() method for Channels that are not closed. Finally, it uses the reput() method to reschedule the current task into the timer wheel, waiting for the next expiration.

Here is the specific implementation of the run() method in the AbstractTimerTask class:

public void run(Timeout timeout) throws Exception {

    // Get the Channel collection to operate on from the ChannelProvider

    Collection<Channel> c = channelProvider.getChannels();

    for (Channel channel : c) {

        if (channel.isClosed()) { // Check the status of the Channel

            continue;

        }

        doTask(channel); // Execute the task

    }

    reput(timeout, tick); // Reschedule the current task into the timer wheel, waiting for execution

}
doTask() is an abstract method provided by the AbstractTimerTask for subclasses to implement. Different timing tasks perform different operations. For example, in the HeartbeatTimerTask.doTask() method, it reads the last read and write timestamps, and then calculates the time from the current time. If it is greater than the heartbeat interval, it sends a heartbeat request. The core implementation is as follows:

```java
protected void doTask(Channel channel) {

    // Get the last read and write timestamps

    Long lastRead = lastRead(channel);

    Long lastWrite = lastWrite(channel);

    if ((lastRead != null && now() - lastRead > heartbeat)

            || (lastWrite != null && now() - lastWrite > heartbeat)) {

        // If the last read or write timestamp exceeds the heartbeat time, send a heartbeat request

        Request req = new Request();

        req.setVersion(Version.getProtocolVersion());

        req.setTwoWay(true);

        req.setEvent(HEARTBEAT_EVENT);

        channel.send(req);

    }

}

The lastRead and lastWrite timestamps in this code are obtained from the attached attributes of the Channel to be processed, with keys KEY_READ_TIMESTAMP and KEY_WRITE_TIMESTAMP respectively. You can review the HeartbeatHandler introduced in previous lessons. It belongs to the Transport layer and is a decorator for ChannelHandler. It records the last write operation time in its connected() and sent() methods, records the last read operation time in its connected() and received() methods, and cleans up these two timestamps in its disconnected() method.

In the ReconnectTimerTask, the connection status and idle time of the Channel to be processed are checked, and the disconnected or long idle time Channels are reconnected. The specific logic is not discussed here.

The closing process of HeaderExchangeClient that needs attention is implemented in the close() method as shown below:

public void close(int timeout) {

    startClose(); // Set the closing field to true

    doClose(); // Close the heartbeat and reconnect timers

    channel.close(timeout); // Close the HeaderExchangeChannel

}

In the HeaderExchangeChannel.close(timeout) method, it first sets its own closed field to true, so that it will not continue to send requests. If there are still requests on the current Channel that have not been responded to, it will wait in a loop until a response is received. If no response is received within the timeout, it will create a Response with a status code to close the connection and hand it over to DefaultFuture for processing, similar to receiving a disconnected event. Then it will close the Channel at the Transport layer. Taking NettyChannel as an example, the NettyChannel.close() method first sets its own closed field to true, clears the records in the CHANNEL_MAP cache, and clears the attached attributes of the Channel, and finally closes the io.netty.channel.Channel.

HeaderExchangeServer #

Now let’s take a look at the HeaderExchangeServer. Its inheritance relationship is shown in the following diagram, where we have already introduced the Endpoint, RemotingServer, and Resetable interfaces in detail in previous lessons, so we will not repeat them here.

Drawing 2.png

HeaderExchangeServer’s inheritance diagram

Like HeaderExchangeClient introduced earlier, HeaderExchangeServer is a decorator for RemotingServer, and most of the methods implemented by HeaderExchangeServer delegated to the decorated RemotingServer object.

In the constructor of HeaderExchangeServer, it starts a CloseTimerTask timer task to periodically close long-idle connections. The specific implementation is similar to the two timer tasks in HeaderExchangeClient, so we will not discuss it in detail here.

It should be noted that the NettyServer introduced in Lesson 19 does not start this timer task. It relies on NettyServerHandler and IdleStateHandler to implement it. The principle is similar to NettyClient, and we will not go into it here. If you are interested, you can review Lesson 20 or check the specific implementation of CloseTimerTask.

When introducing Transport Server in Lesson 19, we did not go into the closing process in detail. Here, we will use HeaderExchangeServer to top-down analyze the entire closing process of the Server. Let’s first look at the closing process of the HeaderExchangeServer.close() method:

  1. Set the closing field of the decorated RemotingServer to true, indicating that the Server is closing and no longer accepting connections. You can review the AbstractServer.connected() method introduced in Lesson 19 and find that if the Server is closing or has closed, it will directly close newly established Client connections.
  2. Send a request with a ReadOnly event to the Client (whether to send or not is determined by the configuration in the URL, and the default is to send). After receiving this request, the HeaderExchangeHandler on the Client side adds an attachment with the key “channel.readonly” to the Channel. The upper layer caller determines whether the connection is writable based on this attachment.
  3. Continuously check whether there are still Clients maintaining long connections with the current Server until all Clients disconnect or timeout.
  4. Update the closed field to true, and the Client will no longer send any requests or reply to any responses.
  5. Cancel the CloseTimerTask timer task.
  6. Call the close() method of the underlying RemotingServer object. Taking NettyServer as an example, its close() method first calls the close() method of AbstractPeer to set its own closed field to true, then calls the doClose() method to close the boss Channel (the Channel used to accept Client connections) and close the Channels in the channels collection (these Channels are connections between the Server and Clients), and finally closes the bossGroup and workerGroup thread pools.

The core logic of the HeaderExchangeServer.close() method is as follows:

public void close(final int timeout) {

    startClose(); // Set the closing field of the underlying RemotingServer to true, indicating that the current Server is closing and no longer accepting connections

    if (timeout > 0) {

        final long max = (long) timeout;

        final long start = System.currentTimeMillis();

        if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {

            // Send a ReadOnly event request to notify the Clients

            sendChannelReadOnlyEvent();

        }

        while (HeaderExchangeServer.this.isRunning()

                && System.currentTimeMillis() - start < max) {

            Thread.sleep(10); // Wait for Clients to disconnect in a loop

        }

    }
}
    // 魔数
    protected static final short MAGIC = (short) 0xdabb;
    // 协议头长度
    protected static final byte HEADER_LENGTH = 16;
    // 消息类型位数
    protected static final byte TYPE_REQUEST = (byte) 0x80; // 请求
    protected static final byte TYPE_RESPONSE = (byte) 0x00; // 响应
    // 是否是心跳包位数
    protected static final byte FLAG_REQUEST = (byte) 0x40;
    protected static final byte FLAG_TWOWAY = (byte) 0x20;
    protected static final byte FLAG_EVENT = (byte) 0x10;

在编码过程中,ExchangeCodec 将 Java 对象序列化为字节流,并构建 Dubbo 协议头。在解码过程中,ExchangeCodec 从字节流中读取协议头,并将字节流反序列化为 Java 对象。

至此,我们已经完整地介绍了 Dubbo 的 Transport 层和 Exchange 层的实现。通过这两层的分析,我们可以清晰地看到 Dubbo 是如何进行网络通信和消息传递的。

在下一课时中,我们将来深入介绍 Dubbo 的 Protocol 层实现,以及 Dubbo 如何实现服务的注册与发现。敬请期待!

  • HEADER_LENGTH (int type, value is 16): The number of bytes in the protocol header, 16 bytes, which is 128 bits.
  • MAGIC (short type, value is 0xdabb): The first 16 bits of the protocol header, divided into MAGIC_HIGH and MAGIC_LOW.
  • FLAG_REQUEST (byte type, value is 0x80): Used to set the Req/Res flag.
  • FLAG_TWOWAY (byte type, value is 0x40): Used to set the 2Way flag.
  • FLAG_EVENT (byte type, value is 0x20): Used to set the Event flag.
  • SERIALIZATION_MASK (int type, value is 0x1f): Used to obtain the serialization type flag mask.

In the encode() method of ExchangeCodec, the message types to be encoded are categorized. The encodeRequest() method specifically encodes Request objects, with the following implementation:

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {

    Serialization serialization = getSerialization(channel);

    byte[] header = new byte[HEADER_LENGTH]; // The array is used to temporarily store the protocol header

    // Write the magic number to the first two bytes of the header array

    Bytes.short2bytes(MAGIC, header);

    // Set the serialization flag in the protocol header based on the current serialization in use

    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

    if (req.isTwoWay()) { // Set the 2Way flag in the protocol header

        header[2] |= FLAG_TWOWAY;

    }

    if (req.isEvent()) { // Set the Event flag in the protocol header

        header[2] |= FLAG_EVENT;

    }

    // Record the request ID in the header

    Bytes.long2bytes(req.getId(), header, 4);

    // Serialize the request and count the number of bytes after serialization

    // First, use savedWriteIndex to record the current write position of the ChannelBuffer

    int savedWriteIndex = buffer.writerIndex();

    // Move the write position forward by 16 bytes

    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);

    // Serialize the request based on the selected serialization method

    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

    if (req.isEvent()) { // Serialize events

        encodeEventData(channel, out, req.getData());

    } else { // Serialize Dubbo requests, specific implementation in DubboCodec

        encodeRequestData(channel, out, req.getData(), req.getVersion());

    }

    out.flushBuffer();

    if (out instanceof Cleanable) {

        ((Cleanable) out).cleanup();

    }

    bos.flush();

    bos.close(); // Finish serialization

    int len = bos.writtenBytes(); // Get the number of bytes after serializing the request

    checkPayload(channel, len); // Limit the length of the request in bytes

    Bytes.int2bytes(len, header, 12); // Write the number of bytes to the header array

    // Adjust the current write position of the ChannelBuffer and write the protocol header to the Buffer

    buffer.writerIndex(savedWriteIndex);

    buffer.writeBytes(header); 

    // Finally, move the write position of the ChannelBuffer to the correct position

    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);

}