27 Add Meal Direct Hit Dubbo Heart With You to Explore the Mysteries of Invoker Below

27 Add Meal Direct Hit Dubbo Heart with You to Explore the Mysteries of Invoker Below #

After sending an oneway request, the DubboInvoker immediately creates an AsyncRpcResult object in a completed state (mainly because its responseFuture is already in a completed state), as we discussed in the previous lesson.

In this lesson, we will continue to introduce the handling of twoway requests and responses by DubboInvoker, including response decoding, synchronous/asynchronous responses, and other related content. After analyzing DubboInvoker, we will also introduce the Invoker decorator in Dubbo related to Listener and Filter.

Further Exploration of DubboInvoker #

How does DubboInvoker handle twoway requests? Let’s focus on it. First, DubboInvoker calls the getCallbackExecutor() method to return different thread pool implementations based on different InvokeModes. The code is as follows:

protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {

    ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);

    if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {

        return new ThreadlessExecutor(sharedExecutor);

    } else {

        return sharedExecutor;

    }

}

InvokeMode has three optional values: SYNC, ASYNC, and FUTURE. For the SYNC mode, which is the default call mode in Dubbo, the ThreadlessExecutor is returned as the thread pool. For the other two asynchronous modes, the corresponding shared thread pool is selected based on the URL.

In SYNC mode, the client thread will block after sending the request, waiting for the server to return the response. The meaning of SYNC mode is shown in the following diagram:

Lark20201027-180625.png

Diagram of SYNC mode

After obtaining the thread pool, DubboInvoker calls the ExchangeClient.request() method to package the Invocation into a Request and send it out, while creating a corresponding DefaultFuture. Note that a callback is added here to extract the AppResponse object. AppResponse represents the specific response returned by the server, which has three fields:

  • result (Object type): The response result, which is the result value returned by the server. Note that this is a business result value. For example, in the Demo example of Lesson 01 (the Demo in the dubbo-demo module), the Provider-side DemoServiceImpl returns the string “Hello Dubbo xxx”.
  • exception (Throwable type): The exception information returned by the server.
  • attachments (Map type): Additional information returned by the server.

You may not be familiar with the AppResponse returned by the request, but you may be familiar with its subclass, DecodeableRpcResult. DecodeableRpcResult represents a response, and its corresponding counterpart is DecodeableRpcInvocation, which represents a request. In Lesson 24, when we introduced the encoding process of the Dubbo request body in DubboCodec, we have already detailed the DecodeableRpcInvocation. If you review the decodeBody() method of DubboCodec, you will find the presence of DecodeableRpcResult.

1. DecodeableRpcResult #

The decoding process of DecodeableRpcResult is roughly as follows:

  • First, determine the serialization method currently in use and decode the byte stream.
  • Then, read a byte flag, which has six optional enumerations. Let’s analyze RESPONSE_VALUE_WITH_ATTACHMENTS as an example.
  • When the flag is RESPONSE_VALUE_WITH_ATTACHMENTS, the handleValue() method is called to handle the return value. It reads the return value based on the return value type recorded in RpcInvocation and sets it to the result field.
  • Finally, the handleAttachment() method is called to read the returned additional information and set it to the attachments field of DecodeableRpcResult.
public Object decode(Channel channel, InputStream input) throws IOException {

    // Deserialization
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);
    
    byte flag = in.readByte(); // Read a byte flag
    
    // Determine the information included in the current result based on the flag and call different processing methods
    switch (flag) {
        case DubboCodec.RESPONSE_NULL_VALUE:
            break;
        case DubboCodec.RESPONSE_VALUE:
            handleValue(in);
            break;
        case DubboCodec.RESPONSE_WITH_EXCEPTION:
            handleException(in);
            break;
        case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
            handleAttachment(in);
            break;
        case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
            handleValue(in);
            handleAttachment(in);
            break;
        case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
        default:
            throw new IOException("...");
    }

    if (in instanceof Cleanable) {
        ((Cleanable) in).cleanup();
    }

    return this;

}

The code for other branches in the decode() method is not shown here. If you are interested, you can refer to the source code of DecodeableRpcResult for further analysis.

2. AsyncRpcResult #

In DubboInvoker, there is also an AsyncRpcResult class, which represents an asynchronous and unfinished RPC call. It records the information of the corresponding RPC call (such as the associated RpcContext and Invocation objects), including the following core fields.

  • responseFuture (CompletableFuture type): This responseFuture field has a close relationship with the DefaultFuture mentioned earlier and is a Future on the callback chain of DefaultFuture. The callbacks added on top of AsyncRpcResult are actually added to this Future.

  • storedContext, storedServerContext (RpcContext type): Used to store related RpcContext objects. We know that RpcContext is bound to a thread, and the thread executing the callback methods added on top of AsyncRpcResult may have processed multiple different AsyncRpcResults, so we need to pass and save the current RpcContext.

  • executor (Executor type): The thread pool associated with this RPC call.

  • invocation (Invocation type): The Invocation object associated with this RPC call.

In the constructor of AsyncRpcResult, in addition to receiving the CompletableFuture object returned by the request, the current RpcContext is also saved to storedContext and storedServerContext. The specific implementation is as follows:

public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {

    this.responseFuture = future;

    this.invocation = invocation;

    this.storedContext = RpcContext.getContext();

    this.storedServerContext = RpcContext.getServerContext();

}

Through the whenCompleteWithContext() method, we can add callback methods to AsyncRpcResult, and this callback method will be wrapped and registered on the responseFuture. The specific implementation is as follows:

public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {

    // Register callbacks on top of responseFuture

    this.responseFuture = this.responseFuture.whenComplete((v, t) -> {

        beforeContext.accept(v, t);

        fn.accept(v, t);

        afterContext.accept(v, t);

    });

    return this;

}

The beforeContext here first records the RpcContext of the current thread to tmpContext, and then sets the RpcContext stored in the constructor to the current thread to prepare for the execution of the callback later. The afterContext will restore the RpcContext of the thread to its original state. The specific implementation is as follows:

private RpcContext tmpContext;

private RpcContext tmpServerContext;

private BiConsumer<Result, Throwable> beforeContext = (appResponse, t) -> {

    // Record the RpcContext of the current thread to tmpContext

    tmpContext = RpcContext.getContext();

    tmpServerContext = RpcContext.getServerContext();

    // Set the RpcContext stored in the constructor to the current thread

    RpcContext.restoreContext(storedContext);

    RpcContext.restoreServerContext(storedServerContext);

};

private BiConsumer<Result, Throwable> afterContext = (appResponse, t) -> {

    // Restore the RpcContext stored in tmpContext to the RpcContext bound to the current thread

    RpcContext.restoreContext(tmpContext);

    RpcContext.restoreServerContext(tmpServerContext);

};

This way, AsyncRpcResult can continuously add callbacks without losing the state of RpcContext. In short, AsyncRpcResult is designed for asynchronous requests.

In the previous analysis, we saw that the RpcInvocation.InvokeMode field can specify the call as SYNC mode, that is, synchronous call mode. How does the asynchronous design of AsyncRpcResult support synchronous calls? In the AbstractProtocol.refer() method, Dubbo will wrap the Invoker object (DubboInvoker object) returned by the DubboProtocol.protocolBindingRefer() method with AsyncToSyncInvoker.

AsyncToSyncInvoker is a decorator for Invoker and is responsible for converting asynchronous calls into synchronous calls. The core implementation of its invoke() method is as follows:

public Result invoke(Invocation invocation) throws RpcException {

    Result asyncResult = invoker.invoke(invocation);

    if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {

        // Call the get() method to block and wait for the response to return

        asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

    }

    return asyncResult;

}

The AsyncRpcResult.get() method actually calls the get() method of the responseFuture field. For synchronous requests, it will first call the ThreadlessExecutor.waitAndDrain() method to block and wait for the response. The specific implementation is as follows:

public Result get() throws InterruptedException, ExecutionException {

    if (executor != null && executor instanceof ThreadlessExecutor) {

        // Special handling for ThreadlessExecutor, call waitAndDrain() here to wait for response

        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;

        threadlessExecutor.waitAndDrain();

    }

    // In scenarios where the executor is not a ThreadlessExecutor thread pool, directly call the get() method of Future (which ultimately is DefaultFuture) to block

    return responseFuture.get();

}

ThreadlessExecutor is optimized for synchronous requests, and we have already discussed it in detail in Lesson 20, so we won’t repeat it here.

Finally, it should be noted that AsyncRpcResult implements the Result interface, as shown in the figure above. Drawing 1.png

AsyncRpcResult Inheritance Diagram

AsyncRpcResult implements the Result interface. For example, methods like getValue(), recreate(), getAttachments(), etc., first call the getAppResponse() method to get the AppResponse object from the responseFuture, and then call the corresponding methods of the AppResponse object. Let’s take the recreate() method as an example to analyze it:

    
public Result getAppResponse() { // Exception handling logic is omitted
    
    if (responseFuture.isDone()) { // Check if responseFuture is completed
    
        return responseFuture.get(); // Get AppResponse
    
    }

    // Generate a default value based on the return value of the invoked method

    return createDefaultValue(invocation); 

}

public Object recreate() throws Throwable {

    RpcInvocation rpcInvocation = (RpcInvocation) invocation;

    if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {

        return RpcContext.getContext().getFuture();

    }

    // Call AppResponse.recreate() method

    return getAppResponse().recreate();

}

The implementation of the AppResponse.recreate() method is simple, as shown below:


public Object recreate() throws Throwable {

    if (exception != null) { // If there is an exception, throw the exception directly

        // Exception handling logic is omitted

        throw exception;

    }

    return result; // Return the result directly when there is no exception

}

Here, we notice that in the recreate() method, AsyncRpcResult specially handles the FUTURE mode. If the return type of the service interface is CompletableFuture, it belongs to the FUTURE mode. The CompletableFuture object obtained in the FUTURE mode is actually passed into RpcContext through AbstractInvoker. Here is the code snippet in the AbstractInvoker.invoke() method:


RpcContext.getContext().setFuture(

    new FutureAdapter(asyncResult.getResponseFuture()));

Here, asyncResult.getResponseFuture() is actually the responseFuture in AsyncRpcResult, which is the DefaultFuture mentioned earlier. Therefore, whether it is SYNC mode, ASYNC mode, or FUTURE mode, they all revolve around DefaultFuture.

In fact, Dubbo 2.6.x and earlier versions provided some asynchronous programming capabilities, but there were several problems with the asynchronous approach:

  • The way to obtain the Future is not direct enough, and the business needs to manually retrieve it from RpcContext.
  • The Future interface cannot implement automatic callback. Although the custom ResponseFuture (a class in Dubbo 2.6.x) supports callbacks, it has limited support for asynchronous scenarios and does not support mutual coordination or composition between Futures.
  • Provider-side asynchronous is not supported.

The Future used in Dubbo 2.6.x and earlier versions was introduced in Java 5, so there are some problems in terms of the design of these functionalities. However, CompletableFuture, introduced in Java 8, further enriches the Future interface and solves these problems well. Dubbo has already upgraded its support to Java 8 in version 2.7.0, and enhanced the current asynchronous functionalities based on CompletableFuture, making up for the shortcomings mentioned above.

Because CompletableFuture implements the CompletionStage and Future interfaces, it can still be used in a blocking manner like before, by calling get() or isDone() methods to obtain the result. This ensures that synchronous calls can still be made. Of course, in practice, it is not recommended to use the blocking get() method to obtain the result, because it loses the performance improvement brought by asynchronous operations.

In addition, CompletableFuture provides good callback methods, such as whenComplete(), whenCompleteAsync(), etc., which can execute the action logic added in these methods after the logic is completed. At the same time, CompletableFuture supports mutual coordination or composition between Futures very well, such as thenApply(), thenApplyAsync(), etc.

Thanks to the enhancements brought by CompletableFuture, we can use callbacks more smoothly, without blocking the calling thread while waiting for a response. Instead, we can inform CompletableFuture to execute a specific function after the current logic is completed. The sayHelloAsync() method in the demo (the Demo module in the dubbo-demo) that returns a CompletableFuture uses the FUTURE mode.

Alright, the principles and underlying implementation of synchronous and asynchronous calls involved in DubboInvoker have been introduced so far. We can summarize it with a flowchart, as shown below:

Lark20201027-180621.png

Flowchart of DubboInvoker Core Process

When the client sends a request, it first creates the corresponding DefaultFuture (which records the request ID and other information), and then uses the asynchronous sending feature of Netty to send the request to the server. It is important to note that this entire sending process does not block any threads. Afterwards, the DefaultFuture is returned to the upper layer. During this return process, the DefaultFuture will be wrapped into AsyncRpcResult and callbacks can also be added.

When the client receives the response result, it will be handed over to the associated thread pool (ExecutorService) or business thread (in the ThreadlessExecutor scenario) for processing. After obtaining the actual result returned by the server, it is set into the DefaultFuture, and the complete() method is called to set it as completed. At this point, the callbacks registered on the DefaultFuture will be triggered and the callback logic will be executed.

Invoker Decorators #

In addition to the DubboInvoker implementation mentioned above, there are many decorator implementations for the Invoker interface. Here, we will focus on the Listener and Filter related Invoker implementations.

1. ListenerInvokerWrapper #

In the previous Lesson 23, we briefly mentioned the InvokerListener interface. We can provide its implementation to listen for refer events and destroy events, and accordingly implement the referred() and destroyed() methods.

The ListenerInvokerWrapper is a decorator for the Invoker interface, as shown in the following diagram:

Drawing 3.png

Inheritance Diagram of ProtocolListenerWrapper

The ProtocolListenerWrapper itself is a decorator for the Protocol interface. In its export() and refer() methods, it wraps the original Invoker with ListenerExporterWrapper and ListenerInvokerWrapper respectively.

The ListenerInvokerWrapper is a decorator for the Invoker interface. In its constructor, in addition to the decorated Invoker, there is a list of InvokerListeners. Inside the constructor, it traverses the entire InvokerListener list and calls the referred() method of each InvokerListener to notify them of the event that the Invoker is referred. The core logic is as follows:


public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {

    this.invoker = invoker; // The underlying decorated Invoker object

    this.listeners = listeners; // List of listeners

    if (CollectionUtils.isNotEmpty(listeners)) {

        for (InvokerListener listener : listeners) {

            if (listener != null) {// Trigger all InvokerListeners during service reference

                listener.referred(invoker); 

            }

}

}

} In the ListenerInvokerWrapper.destroy() method, the destroy() method of the decorated Invoker object is called first, and then the destroyed() method of all InvokerListeners is called in a loop to notify them that the Invoker has been destroyed. The implementation is relatively simple, so it is not shown here. If you are interested, you can refer to the source code for learning.

The corresponding ExporterListener, which is implemented by implementing the exported() method and unexported() method, can listen to the service export and unexport events.

Similarly, in the export() method of ProtocolListenerWrapper, a ListenerExporterWrapper is added on top of the original Invoker, and the exported() method of all ExporterListeners is called in a loop in the constructor of ListenerExporterWrapper to notify them of the service export event. The core logic is shown below:

public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
    this.exporter = exporter;
    this.listeners = listeners;
    if (CollectionUtils.isNotEmpty(listeners)) {
        RuntimeException exception = null;
        for (ExporterListener listener : listeners) {
            if (listener != null) {
                listener.exported(this);
            }
        }
    }
}

The logic of the unexported() method of ListenerExporterWrapper is similar to the implementation of the exported() method and will not be repeated here.

The ListenerInvokerWrapper and ListenerExporterWrapper introduced here are both annotated with @SPI. We can provide corresponding extension implementations and configure the SPI file to listen to these events.

The Filter interface is a very important extension interface provided by Dubbo for users. It links various Filters into a Filter chain and associates them with the Invoker instance. The core logic of constructing the Filter chain is located in the ProtocolFilterWrapper.buildInvokerChain() method, which is called by the refer() method and export() method of ProtocolFilterWrapper.

The core logic of the buildInvokerChain() method is as follows:

  • First, determine which activated Filter extension implementations are currently available based on the configuration information carried in the URL, and form a Filter collection.
  • Traverse the Filter collection and wrap each Filter implementation into an anonymous Invoker. In this anonymous Invoker, the invoke() method of the Filter is called to execute the logic of the Filter, and the logic inside the Filter determines whether to pass the invocation to the next Filter for execution.

The specific implementation of the buildInvokerChain() method is as follows:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // Determine which activated Filter extension implementations are currently available based on the configuration information carried in the URL, and form a Filter collection
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            // Traverse the Filter collection and wrap each Filter implementation into an anonymous Invoker
            last = new Invoker<T>() {
                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    Result asyncResult;
                    try {
                        // Call the invoke() method of the Filter to execute the logic of the Filter, and the logic inside the Filter determines whether to pass the invocation to the next Filter for execution
                        asyncResult = filter.invoke(next, invocation);
                    } catch (Exception e) {
                        ... // Omitted logic for exception listeners
                    } finally {
                    }
                    return asyncResult.whenCompleteWithContext((r, t) -> {
                        ... // Omitted listener processing logic
                    });
                }
            };
        }
    }
    return last;
}

Inside the Filter interface, there is also a Listener interface. Some Filter implementations also implement this internal Listener interface. When the invoke() method is executed successfully, the onResponse() method of the Listener is called for notification. When an exception occurs during the execution of the invoke() method, the onError() method of the Listener is called for notification.

In addition, there is an abstract class called ListenableFilter, which extends the Filter interface and adds a listeners collection (ConcurrentMap collection) to record the listeners that need to be triggered for each request. It should be noted that, before executing the invoke() call, we can call the addListener() method to add a Filter.Listener instance for listening. After completing the invoke() call, these added Filter.Listener instances will be immediately removed from the listeners collection, which means that these Filter.Listener instances are not shared between calls.

Summary #

In this lesson, we mainly introduced the core implementation of the Invoker interface in Dubbo, which is one of the most important implementations in Dubbo.

Following the previous lesson, we analyzed the processing logic of DubboInvoker for twoway requests, which involved the core classes such as DecodeableRpcResult and AsyncRpcResult. We explained in detail the implementation principles of synchronous and asynchronous invocations in Dubbo, and explained the related improvements in Dubbo version 2.7.x. Finally, we introduced several decorators of the Invoker interface, including the ListenerInvokerWrapper used for registering listeners and the Invoker decorators related to Filters.

In the next lesson, we will delve into the implementation of proxies in the Dubbo RPC layer.