42 Add Meal Service Invocation Process Full Analysis

42 Add Meal Service Invocation Process Full Analysis #

As an RPC framework, Dubbo provides users with basic functions: service publication and service reference. In the previous lesson, we have analyzed the core process of service publication. In this lesson, we will further analyze the core process of service reference.

Dubbo supports two ways to reference remote services:

  • Direct connection to services, only suitable for debugging services;
  • Reference services based on the registry, which is the service reference method used in production environment.

DubboBootstrap Entry #

In the previous lesson, when we introduced service publication, we mentioned the core process of the DubboBootstrap.start() method. In addition to calling the exportServices() method to complete service publication, it also calls the referServices() method to complete service reference. Here we will not show the specific code of the DubboBootstrap.start() method, but if you are interested, you can refer to the source code for learning.

In the DubboBootstrap.referServices() method, it retrieves a list of all ReferenceConfig objects from the ConfigManager, and obtains the corresponding proxy objects based on the ReferenceConfig. The entry logic is as follows:

private void referServices() {

    if (cache == null) { // Initialize ReferenceConfigCache

        cache = ReferenceConfigCache.getCache();

    }

    configManager.getReferences().forEach(rc -> {

        // Iterate over the ReferenceConfig list

        ReferenceConfig referenceConfig = (ReferenceConfig) rc;

        referenceConfig.setBootstrap(this);

        if (rc.shouldInit()) { // Check if ReferenceConfig has been initialized

            if (referAsync) { // Asynchronous

                CompletableFuture<Object> future = ScheduledCompletableFuture.submit(

                        executorRepository.getServiceExporterExecutor(),

                        () -> cache.get(rc)

                );

                asyncReferringFutures.add(future);

            } else { // Synchronous

                cache.get(rc);

            }

        }

    });

}

Where does the ReferenceConfig come from? In [Lesson 01] dubbo-demo-api-consumer example, we can see the logic for constructing ReferenceConfig objects. These newly created ReferenceConfig objects are added to the ConfigManager for management through the DubboBootstrap.reference() method, as shown below:

public DubboBootstrap reference(ReferenceConfig<?> referenceConfig) {

    configManager.addReference(referenceConfig);

    return this;

}

ReferenceConfigCache #

The core implementation of service reference is in the ReferenceConfig. A ReferenceConfig object corresponds to a service interface, and each ReferenceConfig object encapsulates the network connection with the registry and the network connection with the Provider. This is a very important object.

To avoid performance problems caused by underlying connection leaks, starting from Dubbo 2.4.0, Dubbo provides ReferenceConfigCache to cache ReferenceConfig instances.

In the dubbo-demo-api-consumer example, we can see the basic usage of ReferenceConfigCache:

ReferenceConfig<DemoService> reference = new ReferenceConfig<>();

reference.setInterface(DemoService.class);

... 

// This step is completed in the DubboBootstrap.start() method

ReferenceConfigCache cache = ReferenceConfigCache.getCache();

...

DemoService demoService = ReferenceConfigCache.getCache().get(reference);

In the ReferenceConfigCache, a static Map field (CACHE_HOLDER) is maintained, where the Key is composed of Group, the service interface, and the version, and the Value is a ReferenceConfigCache object. In the ReferenceConfigCache, a KeyGenerator can be passed in to modify the generation logic of the cache Key. The definition of the KeyGenerator interface is as follows:

public interface KeyGenerator {

    String generateKey(ReferenceConfigBase<?> referenceConfig);

}

The default implementation of KeyGenerator is an anonymous inner class in ReferenceConfigCache, and is referenced by the static field DEFAULT_KEY_GENERATOR. The specific implementation is as follows:

public static final KeyGenerator DEFAULT_KEY_GENERATOR = referenceConfig -> {

    String iName = referenceConfig.getInterface();

    if (StringUtils.isBlank(iName)) { // Get the service interface name

        Class<?> clazz = referenceConfig.getInterfaceClass();

        iName = clazz.getName();

    }

    if (StringUtils.isBlank(iName)) {

        throw new IllegalArgumentException("No interface info in ReferenceConfig" + referenceConfig);

    }

    // The format of the Key is group/interface:version

    StringBuilder ret = new StringBuilder();

    if (!StringUtils.isBlank(referenceConfig.getGroup())) {

        ret.append(referenceConfig.getGroup()).append("/");

    }

    ret.append(iName);

    if (!StringUtils.isBlank(referenceConfig.getVersion())) {

        ret.append(":").append(referenceConfig.getVersion());

    }

    return ret.toString();

};

In the ReferenceConfigCache instance, two Map collections are maintained:

  • proxies (ConcurrentMap<Class, ConcurrentMap<String, Object») : This collection is used to store all proxy objects of service interfaces. The first level key is the type of the service interface, the second level key is the Key generated by the KeyGenerator for different service providers as mentioned above, and the value is the proxy object of the service.
  • referredReferences (ConcurrentMap<String, ReferenceConfigBase>) : This collection is used to store processed ReferenceConfig objects.

Let’s go back to the DubboBootstrap.referServices() method to see the logic related to ReferenceConfigCache.

First, the ReferenceConfigCache.getCache() static method adds a ReferenceConfigCache object with a key of “DEFAULT” to the CACHE_HOLDER collection using the default KeyGenerator implementation. This will serve as the default ReferenceConfigCache object.

Next, regardless of synchronous or asynchronous service reference, the ReferenceConfigCache.get() method is called to create and cache the proxy object. Below is the core implementation of the ReferenceConfigCache.get() method:

public <T> T get(ReferenceConfigBase<T> referenceConfig) {

    // Generate the key for the service provider
    String key = generator.generateKey(referenceConfig);

    // Get the interface type
    Class<?> type = referenceConfig.getInterfaceClass();

    // Get the collection of proxy objects corresponding to the interface
    proxies.computeIfAbsent(type, _t -> new ConcurrentHashMap<>());
    ConcurrentMap<String, Object> proxiesOfType = proxies.get(type);

    // Get the proxy object corresponding to the service provider based on the key
    proxiesOfType.computeIfAbsent(key, _k -> {

        // Service reference
        Object proxy = referenceConfig.get();

        // Add the ReferenceConfig to the referredReferences collection
        referredReferences.put(key, referenceConfig);

        return proxy;
    });

    return (T) proxiesOfType.get(key);
}

ReferenceConfig #

As mentioned earlier, ReferenceConfig is the actual entry point for service reference and it creates the related proxy objects. Let’s start by looking at the ReferenceConfig.get() method:

public synchronized T get() {
    if (destroyed) { // Check the current state of ReferenceConfig
        throw new IllegalStateException("...");
    }

    if (ref == null) { // ref points to the proxy object of the service
        init(); // Initialize the ref field
    }

    return ref;
}

In the ReferenceConfig.init() method, the configuration of the service reference is first processed to ensure the correctness of the configuration. The implementation here is not complicated in itself, but it involves a lot of configuration parsing and processing logic, so the code appears to be very long. We will not show it one by one, but if you are interested, you can refer to the source code for learning purposes.

The core logic of the ReferenceConfig.init() method is to call the createProxy() method. Before calling it, the parameters required by the createProxy() method are obtained from the configuration:

public synchronized void init() {
    if (initialized) { // Check the initialization status of ReferenceConfig
        return;
    }

    if (bootstrap == null) { // Check the initialization status of DubboBootstrap
        bootstrap = DubboBootstrap.getInstance();
        bootstrap.init();
    }

    ... // Check other configurations

    Map<String, String> map = new HashMap<String, String>();
    map.put(SIDE_KEY, CONSUMER_SIDE); // Add the side parameter
    ReferenceConfigBase.appendRuntimeParameters(map); // Add the Dubbo version, release, timestamp, and pid parameters
    map.put(INTERFACE_KEY, interfaceName); // Add the interface parameter
    ... // Process other parameters

    String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
    if (StringUtils.isEmpty(hostToRegistry)) {
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("...");
    }
    map.put(REGISTER_IP_KEY, hostToRegistry); // Add the ip parameter

    ref = createProxy(map); // Call the createProxy() method
    ...

    initialized = true;
    // Trigger the ReferenceConfigInitializedEvent event
    dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}

The ReferenceConfig.createProxy() method handles various scenarios for service reference, such as connecting directly to a single/multiple Providers or using a single/multiple registry centers. The core process of the createProxy() method can be roughly summarized into the following 5 steps.

  1. Determine whether the protocol is the injvm protocol based on the parameters passed in. If it is, the service will be referenced directly using the InjvmProtocol.
  2. Construct the urls collection. Dubbo supports two modes of service reference: connecting directly to Providers or relying on registry centers. If it is the direct connection mode, one or multiple Provider addresses can be specified through the url parameter, which will be parsed and filled into the urls collection. If the registry center is used for service reference, the AbstractInterfaceConfig.loadRegistries() method will be called to load all registry centers.
  3. If the urls collection only contains one URL, an appropriate Protocol extension implementation will be selected through the Protocol adapter to create an Invoker object. In the direct connection mode, the URL is the dubbo protocol, and the DubboProtocol implementation will be used. If relying on registry centers, the RegistryProtocol implementation will be used.
  4. If there are multiple registry centers in the urls collection, use ZoneAwareCluster as the default implementation of Cluster to generate the corresponding Invoker object. If the urls collection contains multiple direct service addresses, use the Cluster adapter to select the appropriate extension implementation to generate the Invoker object.
  5. Use the ProxyFactory adapter to select the appropriate ProxyFactory extension implementation and wrap the Invoker object into a proxy object for the service interface.

From the above process, we can see that there are two core operations in the createProxy() method: first, using the Protocol adapter to select the appropriate Protocol extension implementation to create the Invoker object; second, using the ProxyFactory adapter to select the appropriate ProxyFactory to create the proxy object.

Now let’s take a look at the specific implementation of the createProxy() method:

private T createProxy(Map<String, String> map) {

    if (shouldJvmRefer(map)) {
        // Create an injvm protocol URL
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // Use the Protocol adapter to select the corresponding Protocol implementation and create the Invoker object
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
        if (logger.isInfoEnabled()) {
            logger.info("Using injvm service " + interfaceClass.getName());
        }
    } else {
        urls.clear();
        if (url != null && url.length() > 0) {
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (StringUtils.isEmpty(url.getPath())) {
                        url = url.setPath(interfaceName);
                    }
                    if (UrlUtils.isRegistry(url)) {
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else {
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                checkRegistry();
                // Load the addresses of the registry
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // Organize the parameters in the map to refer parameters and add them to the RegistryURL
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("...");
                }
            }
        }
        if (urls.size() == 1) {
            // When there is only one registry center or only one direct service provider, use the Protocol adapter to select the corresponding Protocol implementation and create the Invoker object
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        } else {
            // When there are multiple registry centers or multiple direct service providers, create the Invoker object based on each URL
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // In the case of multiple registry centers, use ZoneAwareCluster as the default implementation of Cluster, and select between multiple registry centers
                URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else {
                // In the case of multiple direct service providers, use the Cluster adapter to select the appropriate extension implementation
                invoker = CLUSTER.join(new StaticDirectory(invokers));
            }
        }
    }

}

if (shouldCheck() && !invoker.isAvailable()) {

// Determine whether to check the availability of the Provider based on the check configuration

invoker.destroy();

throw new IllegalStateException("...");

}

…// Logic related to metadata processing

// Select the appropriate ProxyFactory extension implementation through the ProxyFactory adapter and create a proxy object

return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

}

RegistryProtocol #

In the scenario of directly connecting to the Provider, the DubboProtocol.refer() method is used to complete the service reference. The implementation of the DubboProtocol.refer() method has been detailed in the previous [Lesson 25]. Here we will focus on how Dubbo Consumer completes the service reference through the RegistryProtocol in the scenario with the registry.

In the RegistryProtocol.refer() method, the registry URL is first obtained based on the URL, and then the doRefer method is called to generate the Invoker. The refer() method uses the MergeableCluster to handle the scenario of multiple group references.

public Invoker refer(Class type, URL url) throws RpcException {

url = getRegistryUrl(url); // Get the registry URL from the URL

// Get the Registry instance, the RegistryFactory object is injected through the automatic loading mechanism of Dubbo SPI

Registry registry = registryFactory.getRegistry(url);

if (RegistryService.class.equals(type)) {

    return proxyFactory.getInvoker((T) registry, type, url);

}

// Get some parameters for this service reference from the refer parameter of the registry URL, including the group

Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));

String group = qs.get(GROUP_KEY);

if (group != null && group.length() > 0) {

    if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {

        // If multiple group services can be referenced this time, the Cluser implementation uses MergeableCluster implementation

        // The getMergeableCluster() method will find the MergeableCluster instance through Dubbo SPI

        return doRefer(getMergeableCluster(), registry, type, url);

    }

}

// If there is no group parameter or only one group is specified, select the Cluster implementation through the Cluster adapter

return doRefer(cluster, registry, type, url);

}

In the doRefer() method, the RegistryDirectory instance is first initialized based on the URL, then the Subscribe URL is generated and registered. Then the service is subscribed through the Registry. Finally, multiple Invokers are merged into one Invoker and returned to the upper layer through the Cluster. The specific implementation is as follows:

private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {

// Create RegistryDirectory instance

RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);

directory.setRegistry(registry);

directory.setProtocol(protocol);

// Generate SubscribeUrl with the protocol as consumer and the specific parameters specified in the refer parameter of the RegistryURL

Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());

URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);

if (directory.isShouldRegister()) {

    directory.setRegisteredConsumerUrl(subscribeUrl); // Add category=consumers and check=false parameters to the SubscribeUrl

    registry.register(directory.getRegisteredConsumerUrl()); // Service registration, add the Consumer node corresponding to the Consumer in the Zookeeper

}

directory.buildRouterChain(subscribeUrl); // Create a service router based on the SubscribeUrl

// Subscribe to the service, the toSubscribeUrl() method will modify the category parameter in the SubscribeUrl to "providers,configurators,routers"

// The subscribe() method of RegistryDirectory has been analyzed in detail earlier, which will subscribe to the service through the Registry and add corresponding listeners

directory.subscribe(toSubscribeUrl(subscribeUrl));

// There may be multiple Providers in the registry, corresponding to multiple Invokers

// Here, the multiple Invoker objects are encapsulated into one Invoker object by the Cluster selected earlier

Invoker<T> invoker = cluster.join(directory);

// Load the corresponding listener implementation based on the registry.protocol.listener parameter in the URL

List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);

if (CollectionUtils.isEmpty(listeners)) {

    return invoker;

}

// To facilitate the callback in the listener, here the Directory object, Cluster object, Invoker object and SubscribeUrl used in this reference

// are encapsulated into a RegistryInvokerWrapper and passed to the listener

RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);

for (RegistryProtocolListener listener : listeners) {

    listener.onRefer(this, registryInvokerWrapper);

}

return registryInvokerWrapper;

}

The RegistryDirectory, Router interface, Cluster interface, and their related extension implementations have been analyzed in detail in previous lessons and will not be repeated here.

Summary #

In this lesson, we focused on the entire process of Dubbo service reference.

  • First, we introduced DubboBootStrap, the entry facade class, and the core methods related to service reference, including referServices() and reference().
  • Next, we analyzed the ReferenceConfigCache, which caches ReferenceConfig objects, and the core process of service reference implemented by ReferenceConfig.
  • Finally, we explained the core implementation of RegistryProtocol in referencing services from the registry.