25 Starting From Protocol Overview of Service Exposure and Service Invocation Full Process Below

25 Starting From Protocol Overview of Service Exposure and Service Invocation Full Process Below #

In the previous lesson, we discussed the core process of Dubbo service publishing based on the implementation of DubboProtocol. In this lesson, we will continue to introduce the implementation of the service reference in DubboProtocol.

Reference Process #

Now let’s introduce the implementation of service reference in DubboProtocol. The core implementation is in the protocolBindingRefer() method:

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {

    optimizeSerialization(url); // Perform serialization optimization and register classes that need to be optimized
    
    // Create DubboInvoker object
    
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    
    // Add the created DubboInvoker object to the invokers collection
    
    invokers.add(invoker);
    
    return invoker;

}

We won’t delve into the specific implementation of DubboInvoker for now. Here, let’s focus on the getClients() method, which creates a collection of clients for sending requests and receiving responses. It is divided into two parts: one for shared connections and the other for dedicated connections. The specific implementation is as follows:

private ExchangeClient[] getClients(URL url) {

    // Determine whether shared connections are used
    
    boolean useShareConnect = false;
    
    // The CONNECTIONS_KEY parameter value determines the number of connections to be established later
    
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    
    List<ReferenceCountExchangeClient> shareClients = null;
    
    if (connections == 0) { // If there is no configuration for the number of connections, use shared connections by default
    
        useShareConnect = true;
        
        // Determine the number of shared connections to be established, default to 1
        
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
    
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
    
        // Create a collection of shared ExchangeClients
        
        shareClients = getSharedClient(url, connections);
    
    }
    
    // Organize the collection of ExchangeClients to be returned
    
    ExchangeClient[] clients = new ExchangeClient[connections];
    
    for (int i = 0; i < clients.length; i++) {
    
        if (useShareConnect) {
    
            clients[i] = shareClients.get(i);
    
        } else {
    
            // When shared connections are not used, a separate ExchangeClient instance will be created
    
            clients[i] = initClient(url);
    
        }
    
    }
    
    return clients;

}

When using dedicated connections, a fixed number of clients are created for each service, and each client maintains an underlying connection. As shown in the diagram below, two dedicated connections are started for each service:

Lark20201020-171207.png

Diagram of dedicated connections for services

When using shared connections, different network addresses (host:port) are distinguished, and a fixed number of shared connections are established for each address. As shown in the diagram below, Provider 1 exposes multiple services, and Consumer references multiple services from Provider 1. “Shared connection” means that when the Consumer calls multiple services in Provider 1, it uses a fixed number of shared TCP long connections to transmit data, which reduces the number of server-side connections.

Lark20201020-171159.png

Diagram of shared connections for services

How are shared connections created? The implementation details of creating shared connections are in the getSharedClient() method. It first queries the shared client collection corresponding to the Key (a string formed by concatenating host and port) from the referenceClientMap cache (a Map> type). If all the clients in the retrieved client collection are available, it directly uses these cached clients. Otherwise, it creates new clients to replace the unavailable clients in the cache. The code example is as follows:

private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {

    String key = url.getAddress(); // Get the address of the remote end (host:port)
    
    // Get the ReferenceCountExchangeClient collection that is connected to the address from the referenceClientMap collection
    
    List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
    
    // The checkClientCanUse() method checks whether all clients in the client collection are available
    
    if (checkClientCanUse(clients)) {
    
        batchClientRefIncr(clients); // When all clients are available, increase the reference count
    
        return clients;
    
    }
    
    locks.putIfAbsent(key, new Object());
    
    synchronized (locks.get(key)) { // Lock the clients for the specified address, locking by partition improves concurrency
    
        clients = referenceClientMap.get(key);
    
        if (checkClientCanUse(clients)) { // double check, check whether all clients are available again
    
            batchClientRefIncr(clients); // Increase the reference count of the clients
    
            return clients;
    
        }
    
        connectNum = Math.max(connectNum, 1); // At least one shared connection
    
        // If the current clients collection is empty, initialize all shared clients through the initClient() method
    
        if (CollectionUtils.isEmpty(clients)) {
    
            clients = buildReferenceCountExchangeClientList(url, connectNum);
    
            referenceClientMap.put(key, clients);
    
        } else { // If only some of the shared clients are unavailable, only these unavailable clients need to be processed
    
            for (int i = 0; i < clients.size(); i++) {
    
                ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
    
                if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
    
                    clients.set(i, buildReferenceCountExchangeClient(url));
    
                    continue;
    
                }
    
                // Increase the reference count
    
                referenceCountExchangeClient.incrementAndGetCount();
    
            }
    
        }
    
        // Clean up the locks collection to prevent memory leaks. If the key corresponds to a service that is down or offline,
    
        // if it is not cleaned up here, the Object object used for locking cannot be garbage collected, resulting in memory leaks
    
        locks.remove(key);
    
        return clients;
    
    }

}

Here, the implementation uses ReferenceCountExchangeClient as the decorator for the ExchangeClient. It adds reference counting functionality to the original ExchangeClient object.

In addition to holding the decorated ExchangeClient object, ReferenceCountExchangeClient also has a referenceCount field (AtomicInteger type), which records the number of times the client has been referenced. From the diagram below, we can see that the reference count is increased in the constructor and incrementAndGetCount() method, and decreased in the close() method.

Drawing 2.png

Reference count modification call stack

With this, for shared connections to the same address, two basic requirements can be met:

  1. When the reference count is reduced to 0, the ExchangeClient connection is closed.
  2. When the reference count is not reduced to 0, the underlying ExchangeClient cannot be closed.

There is one more detail to note: the ReferenceCountExchangeClient.close() method. After closing the underlying ExchangeClient object, it immediately creates a LazyConnectExchangeClient, also known as a “ghost connection”. The specific logic is as follows, where the LazyConnectExchangeClient is mainly used as a fallback for exceptional cases:

public void close(int timeout) {

    // When the reference count is reduced to 0, close the underlying ExchangeClient,
    // which includes stopping heartbeat and reconnection tasks, as well as closing the underlying channel.
    // This has been explained in detail when introducing the HeaderExchangeClient in the previous section, so I won't go into details here.
    
    if (referenceCount.decrementAndGet() <= 0) { 

        if (timeout == 0) {

            client.close();

        } else {

            client.close(timeout);

        } 

        // Create a LazyConnectExchangeClient and point the client field to this object

        replaceWithLazyClient(); 

    }

}

private void replaceWithLazyClient() {

    // Add some LazyConnectExchangeClient-specific parameters on top of the original URL
    
    URL lazyUrl = URLBuilder.from(url)

            .addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE)

            .addParameter(RECONNECT_KEY, Boolean.FALSE)

            .addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString())

            .addParameter("warning", Boolean.TRUE.toString())

            .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true)

            .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient")

            .build();

    // If the client field already points to a LazyConnectExchangeClient, there is no need to create a new one as a fallback again.

    if (!(client instanceof LazyConnectExchangeClient) || client.isClosed()) {

        // The ChannelHandler still uses the same handler used by the original ExchangeClient, which is the requestHandler field in DubboProtocol.

        client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());

    }

}

The LazyConnectExchangeClient is also a decorator for ExchangeClient. It adds lazy loading functionality on top of the original ExchangeClient object. The LazyConnectExchangeClient does not create the underlying Client that holds the connection in the constructor. Instead, it creates the Client only when a request needs to be sent, as shown in the following figure:

Drawing 3.png

Call hierarchy of initClient() method

The implementation of the initClient() method is as follows:

private void initClient() throws RemotingException {

    if (client != null) { // The underlying Client has already been initialized, no need to initialize here

        return;

    }

    connectLock.lock();

    try {

        if (client != null) { return; } // double check

        // Create the ExchangeClient object through the Exchangers facade class

        this.client = Exchangers.connect(url, requestHandler);
} finally {
    connectLock.unlock();
}

}

In these methods that send requests, besides initializing the underlying ExchangeClient through the initClient() method, the warning() method is also called, which determines whether to log WARN-level messages based on the parameters carried by the current URL. In order to prevent a situation where a large number of logs are printed in an instant, there is a frequency limit for printing logs, which is set to print 1 log every 5000 requests by default. As you can see from the fallback scenario shown earlier, we have enabled the option to print logs.

After analyzing the core process of creating a shared client in the getSharedClient() method, let’s return to the DubboProtocol class and continue to introduce the process of creating an exclusive client.

The entry point for creating an exclusive client is the DubboProtocol.initClient() method. It first sets some default parameters in the URL and then determines whether to use the LazyConnectExchangeClient for encapsulation and lazy loading based on the LAZY_CONNECT_KEY parameter. The code is as follows:

private ExchangeClient initClient(URL url) {

    // Get the client extension name and perform validation (omitted)

    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

    // Set the extension name for Codec2

    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);

    // Set the default heartbeat interval

    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

    ExchangeClient client;    

    // If the feature of lazy connection creation is configured, create LazyConnectExchangeClient

    if (url.getParameter(LAZY_CONNECT_KEY, false)) {

        client = new LazyConnectExchangeClient(url, requestHandler);

    } else { // If the lazy connection feature is not used, directly create HeaderExchangeClient

        client = Exchangers.connect(url, requestHandler);

    }

    return client;

}

The LazyConnectExchangeClient decorator and the Exchangers facade class mentioned here have been analyzed in depth earlier and will not be repeated.

There are a few more methods in the DubboProtocol class that have not been introduced. Here, you only need to have a basic understanding of their implementations.

  • batchClientRefIncr() method: It iterates through the collection passed in and increments the reference of each ReferenceCountExchangeClient object.
  • buildReferenceCountExchangeClient() method: It will call the initClient() method introduced earlier to create a client object, then wrap it with a ReferenceCountExchangeClient layer for decoration, and finally return it. This method is mainly used to create a shared client.

destroy method #

When the DubboProtocol is destroyed, the destroy() method is called to release the underlying resources, including the ProtocolServer objects created in the export process and the Client objects created in the refer process.

The DubboProtocol.destroy() method first closes each ProtocolServer object in the serverMap collection one by one. The related code snippet is as follows:

for (String key : new ArrayList<>(serverMap.keySet())) {

    ProtocolServer protocolServer = serverMap.remove(key);

    if (protocolServer == null) { continue;}

    RemotingServer server = protocolServer.getRemotingServer();

    // In the close() method, it sends a ReadOnly request, blocks for a specified time, closes the underlying scheduled tasks and related thread pools, and finally disconnects all connections and closes the server. The logic of these operations has been analyzed in detail in the previous introduction of HeaderExchangeServer, NettyServer, and other implementations, so I won't go into details here.

    server.close(ConfigurationUtils.getServerShutdownTimeout());

}

The ConfigurationUtils.getServerShutdownTimeout() method returns the default blocking time of 10 seconds, which can be configured using dubbo.service.shutdown.wait or dubbo.service.shutdown.wait.seconds.

After that, the DubboProtocol.destroy() method closes each Client in the referenceClientMap collection one by one, and the logic is the same as the aforementioned closing of ProtocolServer, so it won’t be repeated here. However, it is worth noting the existence of ReferenceCountExchangeClient mentioned earlier. The underlying client will only be truly destroyed when the reference count reaches 0.

Finally, the DubboProtocol.destroy() method calls the destroy() method of the parent class AbstractProtocol to destroy all Invoker objects. The implementation of the AbstractProtocol.destroy() method has already been introduced before, so it won’t be repeated here.

Summary #

In this lesson, we continue the previous lesson and use the DubboProtocol as an example to introduce the core process of service reference implementation in the Protocol layer of Dubbo. We first introduced the core logic of initializing the client in the DubboProtocol, analyzed the models of shared connection and independent connection, and then explained the functions and implementations of decorators such as ReferenceCountExchangeClient and LazyConnectExchangeClient. Finally, we explained the implementation of the destroy() method to release underlying resources.

If you have any questions or ideas about the DubboProtocol, please feel free to leave a comment and share them with me. In the next lesson, we will start to introduce the related implementations of the “heart” of Dubbo - the Invoker interface. This is an extra article for us, so remember to attend the class on time.