25 Starting From Protocol Overview of Service Exposure and Service Invocation Full Process Below #
In the previous lesson, we discussed the core process of Dubbo service publishing based on the implementation of DubboProtocol. In this lesson, we will continue to introduce the implementation of the service reference in DubboProtocol.
Reference Process #
Now let’s introduce the implementation of service reference in DubboProtocol. The core implementation is in the protocolBindingRefer()
method:
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url); // Perform serialization optimization and register classes that need to be optimized
// Create DubboInvoker object
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// Add the created DubboInvoker object to the invokers collection
invokers.add(invoker);
return invoker;
}
We won’t delve into the specific implementation of DubboInvoker for now. Here, let’s focus on the getClients()
method, which creates a collection of clients for sending requests and receiving responses. It is divided into two parts: one for shared connections and the other for dedicated connections. The specific implementation is as follows:
private ExchangeClient[] getClients(URL url) {
// Determine whether shared connections are used
boolean useShareConnect = false;
// The CONNECTIONS_KEY parameter value determines the number of connections to be established later
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
if (connections == 0) { // If there is no configuration for the number of connections, use shared connections by default
useShareConnect = true;
// Determine the number of shared connections to be established, default to 1
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
// Create a collection of shared ExchangeClients
shareClients = getSharedClient(url, connections);
}
// Organize the collection of ExchangeClients to be returned
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
// When shared connections are not used, a separate ExchangeClient instance will be created
clients[i] = initClient(url);
}
}
return clients;
}
When using dedicated connections, a fixed number of clients are created for each service, and each client maintains an underlying connection. As shown in the diagram below, two dedicated connections are started for each service:
Diagram of dedicated connections for services
When using shared connections, different network addresses (host:port) are distinguished, and a fixed number of shared connections are established for each address. As shown in the diagram below, Provider 1 exposes multiple services, and Consumer references multiple services from Provider 1. “Shared connection” means that when the Consumer calls multiple services in Provider 1, it uses a fixed number of shared TCP long connections to transmit data, which reduces the number of server-side connections.
Diagram of shared connections for services
How are shared connections created? The implementation details of creating shared connections are in the getSharedClient()
method. It first queries the shared client collection corresponding to the Key (a string formed by concatenating host and port) from the referenceClientMap
cache (a Map>
type). If all the clients in the retrieved client collection are available, it directly uses these cached clients. Otherwise, it creates new clients to replace the unavailable clients in the cache. The code example is as follows:
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress(); // Get the address of the remote end (host:port)
// Get the ReferenceCountExchangeClient collection that is connected to the address from the referenceClientMap collection
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
// The checkClientCanUse() method checks whether all clients in the client collection are available
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients); // When all clients are available, increase the reference count
return clients;
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) { // Lock the clients for the specified address, locking by partition improves concurrency
clients = referenceClientMap.get(key);
if (checkClientCanUse(clients)) { // double check, check whether all clients are available again
batchClientRefIncr(clients); // Increase the reference count of the clients
return clients;
}
connectNum = Math.max(connectNum, 1); // At least one shared connection
// If the current clients collection is empty, initialize all shared clients through the initClient() method
if (CollectionUtils.isEmpty(clients)) {
clients = buildReferenceCountExchangeClientList(url, connectNum);
referenceClientMap.put(key, clients);
} else { // If only some of the shared clients are unavailable, only these unavailable clients need to be processed
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
// Increase the reference count
referenceCountExchangeClient.incrementAndGetCount();
}
}
// Clean up the locks collection to prevent memory leaks. If the key corresponds to a service that is down or offline,
// if it is not cleaned up here, the Object object used for locking cannot be garbage collected, resulting in memory leaks
locks.remove(key);
return clients;
}
}
Here, the implementation uses ReferenceCountExchangeClient
as the decorator for the ExchangeClient. It adds reference counting functionality to the original ExchangeClient object.
In addition to holding the decorated ExchangeClient object, ReferenceCountExchangeClient also has a referenceCount
field (AtomicInteger type), which records the number of times the client has been referenced. From the diagram below, we can see that the reference count is increased in the constructor and incrementAndGetCount()
method, and decreased in the close()
method.
Reference count modification call stack
With this, for shared connections to the same address, two basic requirements can be met:
- When the reference count is reduced to 0, the ExchangeClient connection is closed.
- When the reference count is not reduced to 0, the underlying ExchangeClient cannot be closed.
There is one more detail to note: the ReferenceCountExchangeClient.close()
method. After closing the underlying ExchangeClient object, it immediately creates a LazyConnectExchangeClient, also known as a “ghost connection”. The specific logic is as follows, where the LazyConnectExchangeClient is mainly used as a fallback for exceptional cases:
public void close(int timeout) {
// When the reference count is reduced to 0, close the underlying ExchangeClient,
// which includes stopping heartbeat and reconnection tasks, as well as closing the underlying channel.
// This has been explained in detail when introducing the HeaderExchangeClient in the previous section, so I won't go into details here.
if (referenceCount.decrementAndGet() <= 0) {
if (timeout == 0) {
client.close();
} else {
client.close(timeout);
}
// Create a LazyConnectExchangeClient and point the client field to this object
replaceWithLazyClient();
}
}
private void replaceWithLazyClient() {
// Add some LazyConnectExchangeClient-specific parameters on top of the original URL
URL lazyUrl = URLBuilder.from(url)
.addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE)
.addParameter(RECONNECT_KEY, Boolean.FALSE)
.addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString())
.addParameter("warning", Boolean.TRUE.toString())
.addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true)
.addParameter("_client_memo", "referencecounthandler.replacewithlazyclient")
.build();
// If the client field already points to a LazyConnectExchangeClient, there is no need to create a new one as a fallback again.
if (!(client instanceof LazyConnectExchangeClient) || client.isClosed()) {
// The ChannelHandler still uses the same handler used by the original ExchangeClient, which is the requestHandler field in DubboProtocol.
client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
}
}
The LazyConnectExchangeClient is also a decorator for ExchangeClient. It adds lazy loading functionality on top of the original ExchangeClient object. The LazyConnectExchangeClient does not create the underlying Client that holds the connection in the constructor. Instead, it creates the Client only when a request needs to be sent, as shown in the following figure:
Call hierarchy of initClient() method
The implementation of the initClient() method is as follows:
private void initClient() throws RemotingException {
if (client != null) { // The underlying Client has already been initialized, no need to initialize here
return;
}
connectLock.lock();
try {
if (client != null) { return; } // double check
// Create the ExchangeClient object through the Exchangers facade class
this.client = Exchangers.connect(url, requestHandler);
} finally {
connectLock.unlock();
}
}
In these methods that send requests, besides initializing the underlying ExchangeClient
through the initClient()
method, the warning()
method is also called, which determines whether to log WARN-level messages based on the parameters carried by the current URL. In order to prevent a situation where a large number of logs are printed in an instant, there is a frequency limit for printing logs, which is set to print 1 log every 5000 requests by default. As you can see from the fallback scenario shown earlier, we have enabled the option to print logs.
After analyzing the core process of creating a shared client in the getSharedClient()
method, let’s return to the DubboProtocol
class and continue to introduce the process of creating an exclusive client.
The entry point for creating an exclusive client is the DubboProtocol.initClient()
method. It first sets some default parameters in the URL and then determines whether to use the LazyConnectExchangeClient
for encapsulation and lazy loading based on the LAZY_CONNECT_KEY
parameter. The code is as follows:
private ExchangeClient initClient(URL url) {
// Get the client extension name and perform validation (omitted)
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// Set the extension name for Codec2
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// Set the default heartbeat interval
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
ExchangeClient client;
// If the feature of lazy connection creation is configured, create LazyConnectExchangeClient
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else { // If the lazy connection feature is not used, directly create HeaderExchangeClient
client = Exchangers.connect(url, requestHandler);
}
return client;
}
The LazyConnectExchangeClient
decorator and the Exchangers
facade class mentioned here have been analyzed in depth earlier and will not be repeated.
There are a few more methods in the DubboProtocol
class that have not been introduced. Here, you only need to have a basic understanding of their implementations.
batchClientRefIncr()
method: It iterates through the collection passed in and increments the reference of eachReferenceCountExchangeClient
object.buildReferenceCountExchangeClient()
method: It will call theinitClient()
method introduced earlier to create a client object, then wrap it with aReferenceCountExchangeClient
layer for decoration, and finally return it. This method is mainly used to create a shared client.
destroy method #
When the DubboProtocol
is destroyed, the destroy()
method is called to release the underlying resources, including the ProtocolServer
objects created in the export process and the Client
objects created in the refer process.
The DubboProtocol.destroy()
method first closes each ProtocolServer
object in the serverMap
collection one by one. The related code snippet is as follows:
for (String key : new ArrayList<>(serverMap.keySet())) {
ProtocolServer protocolServer = serverMap.remove(key);
if (protocolServer == null) { continue;}
RemotingServer server = protocolServer.getRemotingServer();
// In the close() method, it sends a ReadOnly request, blocks for a specified time, closes the underlying scheduled tasks and related thread pools, and finally disconnects all connections and closes the server. The logic of these operations has been analyzed in detail in the previous introduction of HeaderExchangeServer, NettyServer, and other implementations, so I won't go into details here.
server.close(ConfigurationUtils.getServerShutdownTimeout());
}
The ConfigurationUtils.getServerShutdownTimeout()
method returns the default blocking time of 10 seconds, which can be configured using dubbo.service.shutdown.wait
or dubbo.service.shutdown.wait.seconds
.
After that, the DubboProtocol.destroy()
method closes each Client
in the referenceClientMap
collection one by one, and the logic is the same as the aforementioned closing of ProtocolServer
, so it won’t be repeated here. However, it is worth noting the existence of ReferenceCountExchangeClient
mentioned earlier. The underlying client will only be truly destroyed when the reference count reaches 0.
Finally, the DubboProtocol.destroy()
method calls the destroy()
method of the parent class AbstractProtocol
to destroy all Invoker
objects. The implementation of the AbstractProtocol.destroy()
method has already been introduced before, so it won’t be repeated here.
Summary #
In this lesson, we continue the previous lesson and use the DubboProtocol
as an example to introduce the core process of service reference implementation in the Protocol layer of Dubbo. We first introduced the core logic of initializing the client in the DubboProtocol
, analyzed the models of shared connection and independent connection, and then explained the functions and implementations of decorators such as ReferenceCountExchangeClient
and LazyConnectExchangeClient
. Finally, we explained the implementation of the destroy()
method to release underlying resources.
If you have any questions or ideas about the DubboProtocol
, please feel free to leave a comment and share them with me. In the next lesson, we will start to introduce the related implementations of the “heart” of Dubbo - the Invoker
interface. This is an extra article for us, so remember to attend the class on time.