31 Add Meal in Depth Directory Implementation Exploring the Mysteries of Service Discovery Mechanism

31 Add Meal In-Depth Directory Implementation Exploring the Mysteries of Service Discovery Mechanism #

In this lesson, we enter the “Cluster” module. Today, we will share a bonus article titled “Deep Dive into Dubbo Directory Implementation, Exploring the Secrets of Service Directory”.

In a production environment, to ensure the reliability, throughput, and fault tolerance of services, we usually run the same server program on multiple servers and provide services externally in the form of a cluster. The number of service instances in each server cluster varies depending on the performance requirements, ranging from several instances to hundreds of instances.

For client programs, several problems arise:

  • Should the client program be aware of each server address?
  • Which server program should the client program call for each request?
  • How should failed requests be handled? Retry or throw an exception?
  • If retrying, should the same service instance be requested again, or should other service instances be tried?
  • How does the server cluster achieve load balancing, and what are the criteria for load balancing?

To address these issues, Dubbo has a dedicated module called dubbo-cluster for implementing cluster functionality.

Drawing 0.png

dubbo-cluster structure diagram

As the first lesson in the analysis of the dubbo-cluster module, let’s first understand the architecture of the dubbo-cluster module and its core Cluster interface.

Cluster Architecture #

The main function of the dubbo-cluster module is to disguise multiple providers as one provider for consumers to call. This involves cluster fault tolerance, routing rule processing, and load balancing. The following diagram shows the core components of dubbo-cluster:

Lark20201110-175555.png

Core interface diagram of Cluster

From the diagram, we can see that the dubbo-cluster module mainly includes the following four core interfaces:

  • The Cluster interface is the interface for cluster fault tolerance. It ensures that when some provider nodes fail, the consumer’s call requests are sent to normal provider nodes, thereby guaranteeing the availability of the entire system.
  • The Directory interface represents a collection of multiple Invokers, which forms the basis for subsequent routing rule processing, load balancing strategies, and cluster fault tolerance.
  • The Router interface abstracts the router. When a request passes through the router, it matches the providers that meet the specified rules.
  • The LoadBalance interface is the interface for load balancing. The consumer selects the most suitable provider node to handle the request from the provider collection according to the specified load balancing strategy.

The core process of the Cluster layer is as follows: when a call enters the Cluster, the Cluster creates an AbstractClusterInvoker object. In this AbstractClusterInvoker, it first obtains the current Invoker collection from the Directory. Then, it routes the collection according to the Router collection to obtain the Invoker collection that meets the conditions. Finally, it gets the Invoker object that needs to be called based on the load balancing strategy specified by LoadBalance.

After understanding the core architecture and basic components of the dubbo-cluster module, we will introduce the definition of each interface and their related implementations in the order shown in the diagram.

Detailed Explanation of Directory Interface #

The Directory interface represents a collection that consists of multiple Invokers. Subsequent operations such as routing processing, load balancing, and cluster fault tolerance are implemented based on Directory.

Now, let’s analyze the relevant content of the Directory interface in depth. First, let’s look at the methods defined in the Directory interface:

public interface Directory<T> extends Node {

    // Service interface type
    Class<T> getInterface();

    // The list() method filters the Invoker collection maintained by itself based on the Invocation request provided, and returns the Invoker collection that meets the conditions.
    List<Invoker<T>> list(Invocation invocation) throws RpcException;

    // The getAllInvokers() method returns all Invoker objects maintained by the current Directory object.
    List<Invoker<T>> getAllInvokers();

    // Consumer-side URL
    URL getConsumerUrl();

}

AbstractDirectory is the abstract implementation of the Directory interface. In addition to maintaining the URL information of the consumer, it also maintains a RouterChain object, which records the current set of Router objects used, i.e., the routing rules to be introduced in subsequent lessons.

AbstractDirectory #

The implementation of the list() method in AbstractDirectory is also relatively simple, as it directly delegates to the doList() method. doList() is an abstract method that is implemented by the subc // The incoming url parameter is the URL of the registry center, for example, zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?…, where the refer parameter contains consumer information, for example, refer=application=dubbo-demo-api-consumer&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&pid=13423&register.ip=192.168.124.3&side=consumer(after URLDecode).

super(url);

shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);

shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);

this.serviceType = serviceType;

this.serviceKey = url.getServiceKey();

// Parse the refer parameter value to get the Consumer attribute information

this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));

// Reconstruct the URL with the KV in queryMap as parameters, keeping the protocol and path parts unchanged

this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);

String group = directoryUrl.getParameter(GROUP_KEY, “”);

this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));

}

After initialization, let’s take a look at the subscribe() method, which is called when the Consumer subscribes. In this method, the subscribe() method of the Registry is called to complete the subscription operation, and the current RegistryDirectory object is also added as a NotifyListener listener to the Registry. The specific implementation is as follows:

public void subscribe(URL url) {

setConsumerUrl(url);

// Add the current RegistryDirectory object as a ConfigurationListener to the CONSUMER_CONFIGURATION_LISTENER

CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);

serviceConfigurationListener = new ReferenceConfigurationListener(this, url);

// Complete the subscription operation. The operations on the registry have been introduced in the previous lesson, so I won't repeat them here.

registry.subscribe(url, this);

}

In addition to being a NotifyListener listener, RegistryDirectory has two ConfigurationListener inner classes internally (the inheritance relationship is shown in the above figure). To maintain continuity, we will explain the specific principles of these two listeners in detail in the following lessons, so I won’t go into details here.

RegistryDirectory’s ConfigurationListener implementation

As we know from the previous introduction to Registry, when registering NotifyListeners, we listen to the providers, configurators, and routers directories. Therefore, when changes occur in these three directories, the notify() method of RegistryDirectory will be triggered.

In the notify() method of RegistryDirectory, the changed URLs are first classified according to the category, divided into configurators, routers, and providers, and processed differently for URLs of different types:

  • Convert URLs of the configurators type into Configurator and save them in the configurators field.
  • Convert URLs of the routers type into Router and add them to the routerChain through the routerChain.addRouters() method.
  • Convert URLs of the providers type into Invoker objects and record them in the invokers and urlInvokerMap collections.

The specific implementation of the notify() method is as follows:

public synchronized void notify(List<URL> urls) {

// Classify according to category into configurators, routers, and providers

Map<String, List<URL>> categoryUrls = urls.stream()

        .filter(Objects::nonNull)

        .filter(this::isValidCategory)

        .filter(this::isNotCompatibleFor26x)

        .collect(Collectors.groupingBy(this::judgeCategory));

// Get URLs of the configurators type and convert them into Configurator objects

List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());

this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

// Get URLs of the routers type and convert them into Router objects, then add them to the RouterChain

List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());

toRouters(routerURLs).ifPresent(this::addRouters);

// Get URLs of the providers type and process them by calling the refreshOverrideAndInvoker() method

List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());

... // In Dubbo 3.0, the AddressListener listener will be triggered, but the AddressListener interface has not been implemented yet, so this code is omitted

refreshOverrideAndInvoker(providerURLs);

}

Here we first focus on the processing of URLs of the providers type. The specific implementation is in the refreshInvoker() method, as follows:

private void refreshInvoker(List<URL> invokerUrls) {

// If the invokerUrls collection is not empty, its length is 1, and the protocol is empty, it means that all providers of this service have gone offline. The Invoker corresponding to the current provider will be destroyed.

if (invokerUrls.size() == 1 && invokerUrls.get(0) != null

        && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {

    this.forbidden = true; // Set the forbidden flag to true, subsequent requests will throw exceptions directly

    this.invokers = Collections.emptyList();

    routerChain.setInvokers(this.invokers); // Clear the Invoker collection in the RouterChain

    destroyAllInvokers(); // Close all Invoker objects

} else {

    this.forbidden = false; // Set the forbidden flag to false, RegistryDirectory can handle subsequent requests normally

    Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // Save a local reference

    if (invokerUrls == Collections.<URL>emptyList()) {

        invokerUrls = new ArrayList<>();

    }

    if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {

        // If the invokerUrls collection is empty and cachedInvokerUrls is not empty, use the data cached in cachedInvokerUrls,

        // indicating that the providers directory in the registry has not changed, and the invokerUrls is empty, indicating that the cachedInvokerUrls collection caches the latest values

        invokerUrls.addAll(this.cachedInvokerUrls);

    } else {

        // If the invokerUrls collection is not empty, update the cachedInvokerUrls collection with the invokerUrls collection

        // indicating that the providers have changed, and the invokerUrls collection contains all the service providers in the registry at this time

        this.cachedInvokerUrls = new HashSet<>();
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;

        this.urlInvokerMap = newUrlInvokerMap;

        // 比较新旧两组Invoker集合,销毁掉已经下线的Invoker

        destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);

    }

}

The core logic of the refreshInvoker() method is the conversion of Provider URL to Invoker objects, which is the toInvokers() method. Let’s dive into the toInvokers() method to see its specific conversion logic:

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {

... // Return an empty Map directly when the urls collection is empty

Set<String> keys = new HashSet<>();

String queryProtocols = this.queryMap.get(PROTOCOL_KEY); // Get the protocols supported by the consumer, that is, the protocol specified by the protocol parameter

for (URL providerUrl : urls) {

    if (queryProtocols != null && queryProtocols.length() > 0) {

        boolean accept = false;

        String[] acceptProtocols = queryProtocols.split(",");

        for (String acceptProtocol : acceptProtocols) { // Traverse all the protocols supported by the consumer

            if (providerUrl.getProtocol().equals(acceptProtocol)) {

                accept = true;

                break;

            }

        }

        if (!accept) {

            continue; // If the current URL does not support the protocol of the consumer, the subsequent conversion to Invoker logic cannot be executed

        }

    }

    if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {

        continue; // Skip URLs with empty protocol

    }

    // If the consumer does not support the protocol of the URL (check whether there is a corresponding Protocol extension implementation through the SPI), the URL will also be skipped

    if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {

        logger.error("...");

        continue;

    }

    // Merge URL parameters, this merge process will be introduced later in this lesson

    URL url = mergeUrl(providerUrl);

    // Get the string representation of the complete URL, which is the key in the urlInvokerMap collection

    String key = url.toFullString();

    if (keys.contains(key)) { // Skip duplicate URLs

        continue;

    }

    keys.add(key); // Record the key

    // Match the Invoker object in the urlInvokerMap cache. If the cache is hit, simply add the Invoker to the newUrlInvokerMap collection;
    // If the cache is not hit, create a new Invoker object and add it to the newUrlInvokerMap collection

    Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;

    Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);

    if (invoker == null) {

        try {

            boolean enabled = true;

            if (url.hasParameter(DISABLED_KEY)) { // Check the disable and enable parameters in the URL to determine whether an Invoker object can be created

                enabled = !url.getParameter(DISABLED_KEY, false);

            } else {

                enabled = url.getParameter(ENABLED_KEY, true);

            }

            if (enabled) { // Create the corresponding Invoker object through the Protocol.refer() method here

                invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);

            }

        } catch (Throwable t) {

            logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);

        }

        if (invoker != null) { // Record the mapping relationship between the key and the Invoker object in newUrlInvokerMap
newUrlInvokerMap.put(key, invoker);

}

} else {// Cache hit, directly transfer Invoker in urlInvokerMap to newUrlInvokerMap

newUrlInvokerMap.put(key, invoker);

}

}

keys.clear();

return newUrlInvokerMap;

}


**The code of the toInvokers() method is a bit long, but the core logic is to call the Protocol.refer() method to create the Invoker object, and the other logic is to determine whether to call the method.**

Inside the toInvokers() method, we can see that the mergeUrl() method is called to merge the URL parameters. In the mergeUrl() method, the configurators URL in the configurators directory of the registry center (override protocol), as well as the configurations dynamically added by the service governance console, are merged with the Provider URL, which overrides some of the original information of the Provider URL. The specific implementation is as follows:

```java
private URL mergeUrl(URL providerUrl) {

// First, remove the attributes in the Provider URL that only take effect on the Provider side, such as threadname, threadpool, corethreads, threads, queues, and so on.

// Then, override the corresponding configurations of the Provider URL with the configurations on the Consumer side. Among them, the version, group, methods, timestamp and other parameters have the priority of the Provider side's configurations.

// Finally, merge the Filter and Listener configurations from both the Provider and Consumer sides.

providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap);

// Merge configurators type URLs. configurators type URLs can be divided into three types:

// The first type is the URLs added under the configurators directory of the registry center (override protocol).

// The second type is the dynamic configurations obtained through the ConsumerConfigurationListener listener (listens to application-level configurations).

// The third type is the dynamic configurations obtained through the ReferenceConfigurationListener listener (listens to service-level configurations).

// Here, you only need to understand that in addition to the configurators in the configurators directory of the registry center, there are also configurations that can be dynamically added through the service governance console,

// The ConsumerConfigurationListener and ReferenceConfigurationListener listeners are used to listen to dynamic configurations from the service governance console.

// As for the specific use of the service governance console, it will be detailed later.

providerUrl = overrideWithConfigurator(providerUrl);

// Add check=false, which means that the Provider's availability is only checked when it is called.

providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false));

// Reassign the overrideDirectoryUrl with the providerUrl after the first step of parameter merge (including the properties overridden by the override protocol)。

this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters());

... // Omit the compatibility logic for Dubbo's lower versions

return providerUrl;

}

After the conversion of URL to Invoker object (toInvokers() method), in the last part of the refreshInvoker() method, the toMergeInvokerList() method is called to merge the Invokers in each group into one Invoker, depending on the configuration of the multiGroup. Now let’s take a look at the implementation of the toMergeInvokerList() method:

private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {

List<Invoker<T>> mergedInvokers = new ArrayList<>();

Map<String, List<Invoker<T>>> groupMap = new HashMap<>();

for (Invoker<T> invoker : invokers) { // Group the Invokers by group

String group = invoker.getUrl().getParameter(GROUP_KEY, "");

groupMap.computeIfAbsent(group, k -> new ArrayList<>());

groupMap.get(group).add(invoker);

}

if (groupMap.size() == 1) { // If there is only one group, directly use the Invoker collection corresponding to that group as mergedInvokers

mergedInvokers.addAll(groupMap.values().iterator().next());

} else if (groupMap.size() > 1) { // Merge the Invoker collections corresponding to each group into one Invoker

for (List<Invoker<T>> groupList : groupMap.values()) {

// This uses StaticDirectory and Cluster to merge the Invokers in each group

StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);

staticDirectory.buildRouterChain();

mergedInvokers.add(CLUSTER.join(staticDirectory));

}

} else {

mergedInvokers = invokers;

}

return mergedInvokers;

}

Here, the functionality of the Cluster interface is used. We will continue to analyze the Cluster interface and its implementation in subsequent lessons. You can now understand the Cluster as a black box, knowing that its join() method converts multiple Invoker objects into one Invoker.

So far, we have completed the analysis of the complete process of dynamic Provider discovery by RegistryDirectory.

Finally, let’s analyze another core method of RegistryDirectory – the doList() method. This method is an implementation left to the subclass by AbstractDirectory, and it is the core part of obtaining the Invoker collection through the Directory interface. The specific implementation is as follows:

public List<Invoker<T>> doList(Invocation invocation) {

if (forbidden) { // Check for the forbidden field. When this field is set to true in the refreshInvoker() process, it means that there is no available Provider, so an exception is thrown

throw new RpcException("...");

}

if (multiGroup) {

// Special handling when multiGroup is true. In the refreshInvoker() method, filtering has been done using the Router for the multiGroup scenario, so the interface is returned directly here

return this.invokers == null ? Collections.emptyList() : this.invokers;

}

List<Invoker<T>> invokers = null;

// Use the RouterChain.route() method to filter the Invoker collection and finally obtain the Invoker collection that meets the routing conditions

invokers = routerChain.route(getConsumerUrl(), invocation);

return invokers == null ? Collections.emptyList() : invokers;

}

Conclusion #

In this lesson, we first introduced the overall architecture of the dubbo-cluster module and briefly explained the functionalities of the four core interfaces Cluster, Directory, Router, and LoadBalance. Next, we deeply analyzed the definition of the Directory interface and the core implementations of the StaticDirectory and RegistryDirectory classes. The RegistryDirectory involves the dynamic discovery of Provider URLs and the handling of dynamic configurations, which makes it a bit more complex. I hope you can study and understand it patiently. If you have any questions or concerns about this part, feel free to leave a comment and discuss it with me.