21 Exchange Layer Deep Analysis Fully Understanding Request Response Model Above

21 Exchange Layer Deep Analysis Fully Understanding RequestResponse Model Above #

In the previous lessons, we delved into the Transport layer of Dubbo Remoting, understanding the unified transport layer interface abstracted by Dubbo for end-to-end communication, and analyzed the implementation based on Netty. Of course, the integration with other NIO frameworks is similar, so we won’t go into detail in this lesson.

In this lesson, we will introduce the layer above the Transport layer, which is the top layer of the Dubbo Remoting layer - the Exchange layer. Dubbo abstracts the information exchange behavior into the Exchange layer. According to the official documentation, the Exchange layer encapsulates the semantics of request-response, focusing on the interactive mode of question and answer, and implements synchronous to asynchronous conversion. In the Exchange layer, Request and Response are the center, and they are implemented for interfaces such as Channel, ChannelHandler, Client, and RemotingServer.

Let’s start with the basic classes of Request and Response in the Exchange layer.

Request and Response #

The Request and Response classes in the Exchange layer are the core objects for requests and responses. Let’s first look at the core fields of the Request class:

public class Request {

    // Used to generate a self-incrementing ID for the request. When it increments to Long.MAX_VALUE, it overflows to Long.MIN_VALUE, and we can continue to use this negative number as the message ID

    private static final AtomicLong INVOKE_ID = new AtomicLong(0);

    private final long id; // Request ID

    private String version; // Request version

    // Bidirectional flag for the request. If this field is set to true, the server needs to return a response to the client after receiving the request

    private boolean twoWay = true;

    // Event flag, for example, heartbeat requests, read-only requests, etc., will have this flag

    private boolean event = false;

    // After the request is sent to the server, the Decoder decodes the binary data into a Request object. If an exception occurs during the decoding process, this flag will be set, and then it will be further processed by other ChannelHandlers based on this flag

    private boolean broken = false;

    // Request body, can be any Java object or null

    private Object data;

}

Next, let’s look at the core fields of the Response class:

public class Response {

    // Response ID, consistent with the ID of the corresponding request

    private long id = 0;

    // Current protocol version, consistent with the version of the request message

    private String version;

    // Response status code, with more than 10 optional values such as OK, CLIENT_TIMEOUT, SERVER_TIMEOUT, etc.

    private byte status = OK;

    private boolean event = false;

    private String errorMsg; // Readable error response message

    private Object result; // Response body

}

ExchangeChannel & DefaultFuture #

In the previous lessons, we introduced the functionality of the Channel interface and the implementation of the Channel interface in the Transport layer. In the Exchange layer, the ExchangeChannel interface is defined, which abstracts the network connection of the Exchange layer on top of the Channel interface. The definition of the ExchangeChannel interface is as follows:

Drawing 0.png

ExchangeChannel interface

Among them, the request() method is responsible for sending requests. From the diagram, we can see that there are two overloaded versions, one of which can specify the timeout of the request, and the return value is a Future object.

Drawing 1.png

Inheritance diagram of HeaderExchangeChannel

From the above diagram, we can see that HeaderExchangeChannel is an implementation of ExchangeChannel. It is a decorator for Channel, encapsulating a Channel object. The implementation of the send() and request() methods relies on the underlying decorated Channel object.

public void send(Object message, boolean sent) throws RemotingException {

    if (message instanceof Request || message instanceof Response

            || message instanceof String) {

        channel.send(message, sent);

    } else {

        Request request = new Request();

        request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);

request.setData(message);

channel.send(request, sent);

}

}

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {

Request req = new Request();  // Create Request object

req.setVersion(Version.getProtocolVersion());

req.setTwoWay(true);

req.setData(request);

DefaultFuture future = DefaultFuture.newFuture(channel,

   req, timeout, executor); // Create DefaultFuture

channel.send(req);

return future;

}


Note that the request() method here returns a DefaultFuture object. As we learned in the previous lesson, the io.netty.channel.Channel's send() method returns a ChannelFuture object, which represents whether the sending operation is completed. The DefaultFuture represents whether the request-response is completed. In other words, to consider the response as complete, it should be a Future. 

Now let's take a closer look at the DefaultFuture and HeaderExchangeChannel involved in the request sending process.

First, let's understand the specific implementation of DefaultFuture, which inherits the CompletableFuture in the JDK, and maintains two static collections.

- CHANNELS (Map collection): Manages the association between requests and channels, where the key is the request ID and the value is the channel that sends the request.
- FUTURES (Map collection): Manages the association between requests and DefaultFutures, where the key is the request ID and the value is the corresponding Future of the request.


The core instance fields of DefaultFuture include the following:

- request (Request type) and id (Long type): Corresponding to the request and its ID.
- channel (Channel type): The channel that sends the request.
- timeout (int type): The timeout for the entire request-response interaction.
- start (long type): The creation time of this DefaultFuture.
- sent (volatile long type): The time when the request is sent.
- timeoutCheckTask (Timeout type): When this timer task expires, it indicates that the response from the peer has timed out.
- executor (ExecutorService type): The thread pool associated with the request.

When creating a DefaultFuture object through the DefaultFuture.newFuture() method, you need to initialize the above fields and create a timeout timer task for the request:
```java
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {

    // Create DefaultFuture object and initialize its core fields

    final DefaultFuture future = new DefaultFuture(channel, request, timeout);

    future.setExecutor(executor); 

    // Special handling for ThreadlessExecutor, ThreadlessExecutor can be associated with a waitingFuture, which is created here in DefaultFuture

    if (executor instanceof ThreadlessExecutor) {

        ((ThreadlessExecutor) executor).setWaitingFuture(future);

    }

    // Create a timer task to process the case of response timeout

    timeoutCheck(future);

    return future;

}

After the creation of the DefaultFuture object is completed in the HeaderExchangeChannel’s request() method, the request is sent out via the underlying Dubbo Channel. During the sending process, the sent() method of the HeaderExchangeHandler is triggered. The DefaultFuture.sent() method is called to update the sent field and record the timestamp when the request is sent. If the response times out later, the sent timestamp will be added to the prompt message.

After a while, the Consumer will receive the response returned by the peer. After reading the complete response, the received() method of each ChannelHandler in the Dubbo Channel will be triggered. This includes the WrappedChannelHandler introduced in the previous lesson. For example, the AllChannelHandler subclass will wrap the subsequent call of ChannelHandler.received() method as a task and submit it to the thread pool. The response will be submitted to the thread pool associated with DefaultFuture, such as the ThreadlessExecutor mentioned in the previous lesson. Then the business thread will continue with the subsequent ChannelHandler calls. (You can also review the introduction of the Transport layer Dispatcher and ThreadlessExecutor in the previous lesson.)

When the response is passed to the HeaderExchangeHandler, it will be processed by calling the handleResponse() method. In this method, the DefaultFuture.received() method will be called, which will find the DefaultFuture object associated with the response (by searching the FUTURES collection based on the request ID) and call the doReceived() method to set the DefaultFuture as completed.

public static void received(Channel channel, Response response, boolean timeout) { // Omitted try/finally block

    // Clean up the mapping between the request ID and the DefaultFuture recorded in FUTURES

    DefaultFuture future = FUTURES.remove(response.getId()); 

    if (future != null) {

        Timeout t = future.timeoutCheckTask;

        if (!timeout) { // Not timed out, cancel the timer task

            t.cancel();

        }
```java
future.doReceived(response); // Call the doReceived() method

} else { // If no associated DefaultFuture is found, log it (omitted)}

// Clear the mapping relationship between the request ID and Channel recorded in CHANNELS

CHANNELS.remove(response.getId()); 

}

// Code snippet of the DefaultFuture.doReceived() method

private void doReceived(Response res) {

if (res == null) {

throw new IllegalStateException("response cannot be null");

}

if (res.getStatus() == Response.OK) { // Normal response

this.complete(res.getResult());

} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // Timeout

this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));

} else { // Other exceptions

this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));

}

// The following is a fallback processing for ThreadlessExecutor, mainly to prevent business threads from blocking on ThreadlessExecutor

if (executor != null && executor instanceof ThreadlessExecutor) {

ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;

if (threadlessExecutor.isWaiting()) {

// The notifyReturn() method submits a task to ThreadlessExecutor, so that the business thread will not be blocked, and the submitted task will try to set DefaultFuture as an exceptional completion

threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned..."));

}

}

}

Let’s take a look at the scenario of a response timeout. In the timeoutCheck() method called when creating DefaultFuture, a TimeoutCheckTask timing task is created and added to the time wheel. The specific implementation is as follows:

private static void timeoutCheck(DefaultFuture future) {

TimeoutCheckTask task = new TimeoutCheckTask(future.getId());

future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);

}

TIME_OUT_TIMER is a HashedWheelTimer object in Dubbo, which is the implementation of the time wheel. This is a static field that all DefaultFuture objects share.

TimeoutCheckTask is an inner class of DefaultFuture, implementing the TimerTask interface, and can be submitted to the time wheel to wait for execution. When the response times out, TimeoutCheckTask creates a Response and calls the previously introduced DefaultFuture.received() method. Here is an example code:

public void run(Timeout timeout) {

// Check if the DefaultFuture object associated with this task has been completed

if (future.getExecutor() != null) { // Submit to thread pool for execution, pay attention to the case of ThreadlessExecutor

future.getExecutor().execute(() -> notifyTimeout(future));

} else {

notifyTimeout(future);

}

}

private void notifyTimeout(DefaultFuture future) {

// If no response is received from the peer, create a Response here to represent a timeout response

Response timeoutResponse = new Response(future.getId());

timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);

timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));

// Mark the associated DefaultFuture as a timeout exception completion

DefaultFuture.received(future.getChannel(), timeoutResponse, true);

}

HeaderExchangeHandler #

When introducing DefaultFuture earlier, we briefly explained the process of request-response. In fact, both sending a request and processing a response involve HeaderExchangeHandler. So here we will introduce the content of HeaderExchangeHandler.

HeaderExchangeHandler is a decorator for ExchangeHandler. It maintains an ExchangeHandler object. The ExchangeHandler interface is one of the interfaces for interaction between the exchange layer and the upper layer. The upper layer caller can implement this interface to complete its own functionality. Then it is decorated by HeaderExchangeHandler, which has the ability to handle requests and responses in the exchange layer. Finally, it is decorated by the Transport ChannelHandler, which has the ability of the transport layer. The following figure shows this relationship:

Lark20201013-153600.png

Overview of ChannelHandler inheritance relationship

As a decorator, the connected(), disconnected(), sent(), received(), and caught() methods of HeaderExchangeHandler will ultimately be forwarded to the ExchangeHandler provided by the upper layer for processing. Here we need to focus on the processing logic of HeaderExchangeHandler for requests and responses.

Lark20201013-153557.png

Message classification in the received() method

Combining the figure above, we can see that in the received() method, the received messages are classified and processed.

  • Read-only requests are processed by the handlerEvent() method, which sets the channel.readonly flag on the channel. The upper layer callers will read this value in subsequent invocations.

    void handlerEvent(Channel channel, Request req) throws RemotingException {
        if (req.getData() != null && req.getData().equals(READONLY_EVENT)) {
            channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
        }
    }
    
  • Two-way requests are processed by the handleRequest() method. First, it handles requests that failed to decode and returns an exception response. Then it passes the successfully decoded requests to the ExchangeHandler implemented by the upper layer for processing and adds a callback. After the upper layer ExchangeHandler has processed the request, it triggers the callback, fills in the response result and response code based on the processing result, and sends it to the remote side.

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) { // Failed to decode the request
            Object data = req.getData();
            // Set the error message and response code
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST); 
            channel.send(res); // Return the exception response to the remote side
            return;
        }
        Object msg = req.getData();
        // Processed by the ExchangeHandler implemented by the upper layer
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> { // Callback after processing
            if (t == null) { // Normal response
                res.setStatus(Response.OK);
                res.setResult(appResult);
            } else { // Exception occurred during processing, set the error message and error code
                res.setStatus(Response.SERVICE_ERROR);
                res.setErrorMessage(StringUtils.toString(t));
            }
            channel.send(res); // Send the response
        });
    }
    
  • One-way requests are directly delegated to the received() method implemented by the upper layer ExchangeHandler. Since no response is needed, HeaderExchangeHandler does not pay attention to the processing result.

  • As for the processing of responses, it was mentioned earlier. HeaderExchangeHandler will set the associated DefaultFuture to a completed or exceptionally completed state through the handleResponse() method. The specific content will not be discussed here.

  • For messages of type String, HeaderExchangeHandler classifies them based on the role of the current service. The details are related to Dubbo’s support for telnet and will be discussed in detail in the next lesson. It will not be analyzed further here.

Next, let’s take a look at the sent() method. This method notifies the sent() method implemented by the upper layer ExchangeHandler and also calls the DefaultFuture.sent() method to record the specific sending time of the request. This logic has been introduced earlier and will not be repeated here.

In the connected() method, a corresponding HeaderExchangeChannel is created for the Dubbo Channel and they are bound together. Then the upper layer ExchangeHandler is notified to process the connect event.

In the disconnected() method, the upper layer ExchangeHandler is first notified for processing, and then DefaultFuture.closeChannel() is called to notify DefaultFuture that the connection is disconnected (actually by creating and delivering a Response with the status code CHANNEL_INACTIVE). This way, the business thread will no longer be blocked. Finally, the HeaderExchangeChannel is unbound from the underlying Dubbo channel.

Conclusion #

In this lesson, we focused on the implementation of Channel and ChannelHandler interfaces in the Dubbo Exchange layer.

We first introduced the basic abstractions of the request-response model in the exchange layer, namely the Request class and the Response class. Then we introduced the implementation of ExchangeChannel for the Channel interface, and also explained the DefaultFuture object obtained after sending a request, which was a small question left over from the previous lesson. Finally, we explained how HeaderExchangeHandler associates the Transporter layer’s ChannelHandler object with the upper layer’s ExchangeHandler object.