26 Add Meal Direct Hit Dubbo Heart With You to Explore the Mysteries of Invoker Above

26 Add Meal Direct Hit Dubbo Heart with You to Explore the Mysteries of Invoker Above #

In the previous lesson, when introducing DubboProtocol, we saw that the upper-level business Bean is encapsulated into Invoker objects and passed into the DubboProtocol.export() method. This Invoker object is then encapsulated into DubboExporter and stored in the exporterMap collection for caching.

When the ProtocolServer exposed by DubboProtocol receives a request, after a series of decoding processes, it eventually reaches the ExchangeHandler object DubboProtocol.requestHandler, which retrieves the requested Invoker from the exporterMap collection and invokes its invoke() method to handle the request.

The DubboProtocol.protocolBindingRefer() method encapsulates the underlying ExchangeClient collection into DubboInvoker and is then encapsulated into a proxy object by the upper-level logic. This allows the business layer to complete remote calls as if they were calling local Beans.

In-depth Look at Invoker #

First, let’s take a look at the AbstractInvoker abstract class, which implements the Invoker interface. The inheritance relationship is shown in the following diagram:

Drawing 0.png

AbstractInvoker inheritance diagram

From the diagram, we can see that the core DubboInvoker inherits from the AbstractInvoker abstract class. The AbstractInvoker class has the following core fields:

  • type (Class<T> type): the type of the business interface encapsulated by this Invoker object, for example, the DemoService interface in the Demo example.
  • url (URL type): the URL object associated with the current Invoker, which contains all the configuration information.
  • attachment (Map type): some additional information associated with the current Invoker, which can come from the associated URL. In one of the overloaded constructors of AbstractInvoker, the convertAttachment() method is called, which retrieves the specified KV values from the associated URL object and records them in the attachment collection.
  • available (volatile boolean type) and destroyed (AtomicBoolean type): these two fields are used to control the current Invoker’s status. available has a default value of true, and destroyed has a default value of false. The destroy() method sets available to false and destroyed to true.

The AbstractInvoker class implements the invoke() method in the Invoker interface. This method feels a bit like the template method pattern. It first processes the configuration information in the URL and the additional information carried by RpcContext, adds them to Invocation as additional information, then calls the doInvoke() method to initiate the remote call (which is implemented by the subclass of AbstractInvoker), and finally obtains an AsyncRpcResult object to return.

public Result invoke(Invocation inv) throws RpcException {

    // First, convert the incoming Invocation to RpcInvocation

    RpcInvocation invocation = (RpcInvocation) inv;

    invocation.setInvoker(this);

    // Add the attachment collection introduced earlier as additional information to the Invocation

    if (CollectionUtils.isNotEmptyMap(attachment)) {

        invocation.addObjectAttachmentsIfAbsent(attachment);

    }

    // Add RpcContext's additional information as additional information to the Invocation

    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();

    if (CollectionUtils.isNotEmptyMap(contextAttachments)) {

        invocation.addObjectAttachments(contextAttachments);

    }

    // Set the invoke mode of this call, asynchronous or synchronous

    invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));

    // If it is an asynchronous call, add a unique ID to this call

    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    AsyncRpcResult asyncResult;

    try { // Call the doInvoke() method implemented by the subclass

        asyncResult = (AsyncRpcResult) doInvoke(invocation);

    } catch (InvocationTargetException e) {// Omitted exception handling logic

    } catch (RpcException e) { // Omitted exception handling logic

    } catch (Throwable e) {

        asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);

    }

    RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));

    return asyncResult;

}

Next, let’s take a closer look at the RpcContext class.

RpcContext #

RpcContext is a thread-level context information, with each thread bound to a RpcContext object. It relies on ThreadLocal to implement this. RpcContext is mainly used to store temporary states of a request in a thread. When a thread handles a new request (for the Provider side) or initiates a new request (for the Consumer side), the content stored in RpcContext will be updated.

Now let’s take a look at the core fields of RpcContext, which are InternalThreadLocal. The definitions of these two fields are as follows:

// The RpcContext used to store context information when making a request
        // 如果集合为空,先进行初始化
    
        if (lookup == null) {
    
            // 初始化集合
    
            indexedVariables = lookup = newIndexedVariableTable();
    
        }
    
        // 不需要使用对象的hashCode作为数组的下标
    
        int i = index;
    
        Object oldValue = lookup[i];
    
        // 将value存储到lookup集合中
    
        lookup[i] = value;
    
        // 如果oldValue不等于new value,则说明之前存储的不为空
    
        return oldValue != value; //
    
    }

可以看到,setIndexedVariable() 方法首先判断 indexedVariables 是否为空,如果为空,则创建一个新的 indexedVariableTable,并将其赋值给 indexedVariables。然后,根据传入的 index 获取原 indexedVariables 数组中的值,将新值 value 存储到 indexedVariables 数组的对应位置,并判断 oldValue 是否等于 value,如果不等,则说明原来的值不为空。最后,返回 oldValue != value 的结果。

InternalThreadLocalMap 中的 indexedVariables 数组使用 Object[] 类型存储值。在 setIndexedVariable() 方法中,会直接将传入的值存储到 indexedVariables 数组中的对应位置,这里不需要使用对象的 hashCode 作为数组的下标,因为 index 字段正好作为数组的下标使用。

笔者这里总结一下:

  • InternalThreadLocal 是一个线程本地变量的封装类,用于在当前线程中存储和访问绑定的值。
  • InternalThreadLocalMap 是 InternalThreadLocal 的底层实现,采用数组结构存储绑定的数据。
  • 数组下标由 index 字段决定,每次存储时会自增。
  • 通过 InternalThreadLocalMap 与当前线程绑定的 InternalThreadLocal 进行读写操作。
  • InternalThreadLocalMap 的 indexedVariables 数组使用 Object[] 类型存储数据,不需要使用对象的 hashCode 作为数组下标。
if (index < lookup.length) { // Store value at the specified index

    Object oldValue = lookup[index];

    lookup[index] = value;

    return oldValue == UNSET;

} else {

    // When the index exceeds the length of the indexedVariables array, the indexedVariables array needs to be expanded

    expandIndexedVariableTableAndSet(index, value);

    return true;

}

}

After clarifying the process of setting the InternalThreadLocal variable, let’s analyze the process of reading the InternalThreadLocal variable. The entry point is in the get() method of InternalThreadLocal.

public final V get() {

    // Get the InternalThreadLocalMap associated with the current thread

    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();

    // Read the corresponding data from the InternalThreadLocalMap based on the index field of the current InternalThreadLocal object

    Object v = threadLocalMap.indexedVariable(index);

    if (v != InternalThreadLocalMap.UNSET) {

        return (V) v; // If it's not UNSET, it means that valid data has been read, return directly

    }

    // If UNSET is read, the initialize() method will be called for initialization. First, the initialValue() method will be called for initialization, and then the setIndexedVariable() method and the addToVariablesToRemove() method will be called to store the initialized value

    return initialize(threadLocalMap);

}

As we can see, in RpcContext, the LOCAL and SERVER_LOCAL fields of InternalThreadLocal have implemented the initialValue() method, and their implementation is to create and return the RpcContext object.

After understanding the underlying principle of InternalThreadLocal, let’s go back to RpcContext for further analysis. As the context information of the call, RpcContext can record a lot of information, and here are some key fields.

  • attachments (Map type): can be used to record additional information of the call context, which will be added to the Invocation and passed to the remote node.
  • values (Map type): used to record key-value pairs of the context, but will not be passed to the remote node.
  • methodName, parameterTypes, arguments: used to record the method name, parameter type list, and specific parameter list of the call, consistent with the information in the associated Invocation object.
  • localAddress, remoteAddress (InetSocketAddress type): record your own and the remote address.
  • request, response (Object type): can be used to record the underlying associated request and response.
  • asyncContext (AsyncContext type): asynchronous context, where RpcContext related to asynchronous calls and Future related to asynchronous requests can be stored.

DubboInvoker #

From the analysis of DubboProtocol, we know that the protocolBindingRefer() method will create a collection of underlying ExchangeClient objects based on the type of the business interface being called and the URL, and wrap them into DubboInvoker objects and return them. DubboInvoker is an implementation of AbstractInvoker, and in its doInvoke() method, it will first choose the ExchangeClient object to be used for this call, then determine whether this call needs a return value, and finally call the ExchangeClient.request() method to send the request, wrap the returned Future in a simple way and return it.

protected Result doInvoke(final Invocation invocation) throws Throwable {

    RpcInvocation inv = (RpcInvocation) invocation;

    // The method name of this call

    final String methodName = RpcUtils.getMethodName(invocation);

    // Add additional information to Invocation, here add URL's path and version to the additional information

    inv.setAttachment(PATH_KEY, getUrl().getPath());

    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient; // Choose an ExchangeClient instance

    if (clients.length == 1) {

        currentClient = clients[0];

    } else {

        currentClient = clients[index.getAndIncrement() % clients.length];

    }

    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

    // Calculate the timeout for this call based on the method name and configuration

    int timeout = calculateTimeout(invocation, methodName);
if (isOneway) { // Requests that don't require attention to the return value

    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

    currentClient.send(inv, isSent);

    return AsyncRpcResult.newDefaultAsyncResult(invocation);

} else { // Requests that require attention to the return value

    // Get the thread pool for handling responses. For synchronous requests, ThreadlessExecutor will be used. The principle of ThreadlessExecutor has been analyzed earlier, so it will not be repeated here. For asynchronous requests, a shared thread pool will be used. The design and implementation of the ExecutorRepository interface has been analyzed in detail earlier, so it will not be repeated here.

    ExecutorService executor = getCallbackExecutor(getUrl(), inv);

    // Use the selected ExchangeClient to execute the request() method and send the request.

    CompletableFuture<AppResponse> appResponseFuture =

            currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);

    // Encapsulate AppResponse as AsyncRpcResult and return it.

    AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);

    result.setExecutor(executor);

    return result;

}

In the DubboInvoker.invoke() method, there are some details that need attention. First, it determines whether this call is a oneway call based on the URL and the configuration in the Invocation.

public static boolean isOneway(URL url, Invocation inv) {

    boolean isOneway;

    if (Boolean.FALSE.toString().equals(inv.getAttachment(RETURN_KEY))) {

        isOneway = true; // First, pay attention to the "return" attachment property in Invocation.

    } else {

        isOneway = !url.getMethodParameter(getMethodName(inv), RETURN_KEY, true); // Then, pay attention to the "return" configuration for the corresponding method in URL.

    }

    return isOneway;

}

Oneway means that after the client sends a message, it does not expect a response from the server. Therefore, oneway communication is more suitable for requests that do not care about server responses. The diagram below shows the comparison between oneway and twoway communication.

Lark20201023-161312.png

Comparison between oneway and twoway communication modes

As can be seen, the send() method is used for sending oneway requests, while the request() method is used for sending twoway requests. As we have analyzed before, the request() method will create a DefaultFuture object and a timeout detection task, while the send() method will not create these things. It directly sends the Invocation packaged as a oneway type Request.

In the server’s HeaderExchangeHandler.receive() method, different branches are executed for oneway requests and twoway requests: twoway requests are processed by the handleRequest() method, which pays attention to the call result and forms a Response to return to the client; oneway requests are directly handed over to the upper-level DubboProtocol.requestHandler and no Response is returned after the method call is completed.

Let’s use the following example code to briefly explain the relevant fragments in the HeaderExchangeHandler.request() method.

public void received(Channel channel, Object message) throws RemotingException {

    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);

    if (message instanceof Request) {

        if (request.isTwoWay()) {

            handleRequest(exchangeChannel, request);

        } else {

            handler.received(exchangeChannel, request.getData());

        }

    } else ... // Other branches omitted

}

Summary #

In this lesson, we focused on introducing the most core interface in Dubbo - Invoker. First, we introduced the common capabilities provided by the AbstractInvoker abstract class. Then we analyzed the functions of RpcContext and the components involved, such as InternalThreadLocal, InternalThreadLocalMap, etc. Finally, we explained the implementation of DubboInvoker for the doinvoke() method and distinguished between oneway and twoway types of requests.

In the next lesson, we will continue to introduce the implementation of DubboInvoker.