46 Add Meal Deep Into Service Downgrading in the Service Publish Subscribe Below

46 Add Meal Deep Into Service Downgrading in the Service Publish Subscribe Below #

In the second part of the course (lessons 13-15), when introducing the implementation of the registry part in the traditional Dubbo framework, we mentioned the interfaces such as Registry and RegistryFactory that interact with the registry. To integrate the functionality of the ServiceDiscovery interface with the Registry, Dubbo provides an implementation called ServiceDiscoveryRegistry, which has the following inheritance relationship:

Drawing 0.png

ServiceDiscoveryRegistry and ServiceDiscoveryRegistryFactory inheritance diagram

From the diagram, we can see that ServiceDiscoveryRegistryFactory (extension name is service-discovery-registry) is the factory class corresponding to ServiceDiscoveryRegistry. It inherits the common capabilities provided by AbstractRegistryFactory.

ServiceDiscoveryRegistry is an implementation of the registry that is focused on service instances (ServiceInstance), with underlying dependencies on components such as ServiceDiscovery and WritableMetadataService introduced in the previous two lessons.

The core fields in ServiceDiscoveryRegistry are as follows:

  • serviceDiscovery (ServiceDiscovery type): used for publishing and subscribing to ServiceInstances.
  • subscribedServices (Set type): records the currently subscribed service names.
  • serviceNameMapping (ServiceNameMapping type): used for the conversion between Service ID and Service Name.
  • writableMetadataService (WritableMetadataService type): used for publishing and querying metadata.
  • registeredListeners (Set type): records the unique identifiers of registered ServiceInstancesChangedListener.
  • subscribedURLsSynthesizers (List type): combines the information of ServiceInstances with metadata to obtain the complete URL of the subscribed service.

In the constructor of ServiceDiscoveryRegistry, these fields are initialized:

public ServiceDiscoveryRegistry(URL registryURL) {
    // Initialize the parent class, which includes the time wheel and retry scheduler in FailbackRegistry, as well as the local file cache in AbstractRegistry
    super(registryURL);
    // Initialize the ServiceDiscovery object
    this.serviceDiscovery = createServiceDiscovery(registryURL);
    // Parse the "subscribed-services" parameter from the registryURL and split it by commas to obtain the subscribedServices set
    this.subscribedServices = parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
    // Get the DefaultServiceNameMapping object
    this.serviceNameMapping = ServiceNameMapping.getDefaultExtension();
    // Initialize the WritableMetadataService object
    String metadataStorageType = getMetadataStorageType(registryURL);
    this.writableMetadataService = WritableMetadataService.getExtension(metadataStorageType);
    // Initialize the supported SubscribedURLsSynthesizer implementations
    this.subscribedURLsSynthesizers = initSubscribedURLsSynthesizers();
}

In the createServiceDiscovery() method, not only is the corresponding implementation of ServiceDiscovery loaded based on the registryURL, but an EventPublishingServiceDiscovery decorator is also added on top, which triggers the corresponding events before and after methods like register() and initialize() are called. The specific implementation is as follows:

protected ServiceDiscovery createServiceDiscovery(URL registryURL) {
    // Get the corresponding ServiceDiscovery implementation based on the registryURL
    ServiceDiscovery originalServiceDiscovery = getServiceDiscovery(registryURL);
    // Add an EventPublishingServiceDiscovery decorator on top of the ServiceDiscovery, 
    // which triggers the corresponding events before and after methods like register() and initialize() are called
    ServiceDiscovery serviceDiscovery = enhanceEventPublishing(originalServiceDiscovery);
    execute(() -> { // Initialize ServiceDiscovery
        serviceDiscovery.initialize(registryURL.addParameter(INTERFACE_KEY, ServiceDiscovery.class.getName())
                .removeParameter(REGISTRY_TYPE_KEY));
    });
    return serviceDiscovery;
}

The core of the Registry interface is service publishing and subscription. Since ServiceDiscoveryRegistry implements the Registry interface, it also implements the functionality of service registration and publishing.

Service Registration #

In the register() method of ServiceDiscoveryRegistry, it first checks the “side” parameter in the URL to be published, and then calls the register() method of the parent class. We know that the FailbackRegistry.register() method will invoke the doRegister() method of the subclass. In the case of ServiceDiscoveryRegistry, the doRegister() method directly depends on the exportURL() method of WritableMetadataService to publish the metadata.

public final void register(URL url) {
    if (!shouldRegister(url)) { // Check if the "side" parameter in the URL is "provider"
        return;
    }
    super.register(url);
}

@Override
public void doRegister(URL url) {
    // Publish the metadata to the MetadataService
    if (writableMetadataService.exportURL(url)) {
        ... // Log INFO message
    }
}

} else { … // Output WARN log } }

The implementation logic of the ServiceDiscoveryRegistry.unregister() method is similar, and it is not shown here. If you are interested, you can refer to the source code for learning.

Service Subscription #

Next, let’s take a look at the implementation of the ServiceDiscoveryRegistry.subscribe() method. In this method, it first checks the “side” parameter in the URL to be published, and then calls the subscribe() method of the parent class. We know that the FailbackRegistry.subscribe() method will callback the doSubscribe() method of the subclass. In the doSubscribe() method of ServiceDiscoveryRegistry, the complete subscription process is executed:

  1. Call the WriteMetadataService.subscribeURL() method to record the currently subscribed URL in the subscribedServiceURLs collection;
  2. Get the Service Name based on the subscribed URL;
  3. Get the ServiceInstance collection based on the Service Name;
  4. Call the corresponding MetadataService service based on the ServiceInstance to get metadata, which involves cleaning up historical data and updating the cache;
  5. Merge the ServiceInstance information with the corresponding metadata information to get the complete URL;
  6. Trigger NotifyListener;
  7. Add ServiceInstancesChangedListener.

Let’s take a look at the specific implementation of the ServiceDiscoveryRegistry.doSubscribe() method:

protected void subscribeURLs(URL url, NotifyListener listener) {
    // Record the subscribed URL
    writableMetadataService.subscribeURL(url);
    // Get the Service Name to be subscribed
    Set<String> serviceNames = getServices(url);
    if (CollectionUtils.isEmpty(serviceNames)) {
        throw new IllegalStateException("...");
    }
    // Perform the subsequent subscription operation
    serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));
}

Let’s analyze this process step by step.

1. Get Service Name #

First, let’s look at the specific implementation of the getServices() method: it first tries to get the subscribed Service Name collection based on the “provided-by” parameter value of the subscribeURL. If it fails, it then tries to get the corresponding Service Name collection based on the Service ID. If it still fails, it tries to get the Service Name collection from the “subscribed-services” parameter value of the registryURL. Now let’s look at the specific implementation of the getServices() method:

protected Set<String> getServices(URL subscribedURL) {
    Set<String> subscribedServices = new LinkedHashSet<>();
    // Try to get the provided-by parameter value from the subscribedURL,
    // which encapsulates all the Service Names
    String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
    if (StringUtils.isNotEmpty(serviceNames)) {
        // Parse the provided-by parameter value to get all the Service Names
        subscribedServices = parseServices(serviceNames);
    }
    if (isEmpty(subscribedServices)) { 
        // If the provided-by parameter is not specified, try to construct the Service ID based on the subscribedURL,
        // and then use the get() method of ServiceNameMapping to find the corresponding Service Name
        subscribedServices = findMappedServices(subscribedURL);
        if (isEmpty(subscribedServices)) {
            // If subscribedServices is still empty, return the subscribed-services parameter value of registryURL
            subscribedServices = getSubscribedServices();
        }
    }
    return subscribedServices;
}

2. Find Service Instance #

Next, let’s look at the specific implementation of the overloaded subscribeURLs(url, listener, serviceName) method. In this method, it looks up the corresponding ServiceInstance collection from the ServiceDiscovery based on the Service Name, and registers a ServiceInstancesChangedListener listener.

protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
    // Get the ServiceInstance object based on the Service Name
    List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
    // Call another overloaded subscribeURLs() method
    subscribeURLs(url, listener, serviceName, serviceInstances);
    // Add the ServiceInstancesChangedListener listener
    registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {
        @Override
        public void onEvent(ServiceInstancesChangedEvent event) {

subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances())); } }); }

In the subscribeURLs(url, listener, serviceName, serviceInstances) overload, it mainly constructs the corresponding and complete subscribedURL collection based on the ServiceInstance instances obtained earlier, and triggers the NotifyListener listener that is passed in, as shown below:

protected void subscribeURLs(URL subscribedURL, NotifyListener listener, String serviceName,
                             Collection<ServiceInstance> serviceInstances) {
    List<URL> subscribedURLs = new LinkedList<>();
    // Try to get the subscribedURL collection through MetadataService
    subscribedURLs.addAll(getExportedURLs(subscribedURL, serviceInstances));
    if (subscribedURLs.isEmpty()) { // If the above attempt fails
        // Try to get the subscribedURL collection through SubscribedURLsSynthesizer
        subscribedURLs.addAll(synthesizeSubscribedURLs(subscribedURL, serviceInstances));
    }
    // Trigger the NotifyListener listener
    listener.notify(subscribedURLs);
}

The construction of the complete subscribedURL can be divided into two branches.

  • The first branch: combine the subscribedURL passed in, as well as the corresponding parameters for each ServiceInstance obtained from the metadata, to assemble the complete subscribeURL for each ServiceInstance. This part is implemented in the getExportedURLs() method, which is the core of the subscribe operation.

  • The second branch: when the above operation fails to obtain the complete subscribedURL collection, the SubscribedURLsSynthesizer will be used to combine the subscribedURL and synthesize the complete subscribedURL for each ServiceInstance. This part is implemented in the synthesizeSubscribedURLs() method, currently mainly for the REST protocol.

3. Core Implementation of the getExportedURLs() Method #

The getExportedURLs() method mainly revolves around the serviceRevisionExportedURLsCache collection, which is a Map> type collection. The first-level key is the Service Name, the second-level key is the Revision, and the final value is the latest URL collection for each Service Name.

(1) Clearing expired URLs #

In the getExportedURLs() method, the expungeStaleRevisionExportedURLs() method is called first to destroy all expired URL information. The specific implementation is as follows:

private void expungeStaleRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
    // Get Service Name from the first ServiceInstance
    String serviceName = serviceInstances.get(0).getServiceName();
    // Get the URL collection corresponding to the Service Name in the serviceRevisionExportedURLsCache
    Map<String, List<URL>> revisionExportedURLsMap = serviceRevisionExportedURLsCache
        .computeIfAbsent(serviceName, s -> new LinkedHashMap());
    if (revisionExportedURLsMap.isEmpty()) { // If no URLs are cached, no further cleanup is required, just return.
        return;
    }
    // Get the revision cache for the Service Name in the serviceRevisionExportedURLsCache
    Set<String> existedRevisions = revisionExportedURLsMap.keySet();
    // Get the current latest revision from the ServiceInstance
    Set<String> currentRevisions = serviceInstances.stream()
            .map(ServiceInstanceMetadataUtils::getExportedServicesRevision)
            .collect(Collectors.toSet());
    // Get the stale revisions to be deleted: staleRevisions = existedRevisions(copy) - currentRevisions
    Set<String> staleRevisions = new HashSet<>(existedRevisions);
    staleRevisions.removeAll(currentRevisions);
    // Remove the URL collections corresponding to the staleRevisions set from the revisionExportedURLsMap
    staleRevisions.forEach(revisionExportedURLsMap::remove);
}

We can see that here, the latest revision is obtained from the metadata collection of each ServiceInstance using the ServiceInstanceMetadataUtils utility class (with the key dubbo.exported-services.revision). So where is this revision information written? Let’s take a look at a new interface - ServiceInstanceCustomizer, defined as follows:

@SPI
public interface ServiceInstanceCustomizer extends Prioritized {
    void customize(ServiceInstance serviceInstance);
}

Regarding the ServiceInstanceCustomizer interface, three points need to be noted: ① The interface is annotated with @SPI, indicating that it is an extension point; ② The interface inherits from the Prioritized interface; ③ The customize() method defined in the interface can be used to customize the ServiceInstance information, including controlling the data in the metadata collection.

In other words, multiple implementations of ServiceInstanceCustomizer can be called in sequence to achieve ServiceInstance customization. The following diagram shows all the implementation classes of the ServiceInstanceCustomizer interface: Drawing 1.png

Inheritance diagram of ServiceInstanceCustomizer Let’s first take a look at the abstract class ServiceInstanceMetadataCustomizer. Its main purpose is to customize the metadata key-value pairs in the ServiceInstance object. The customization logic is implemented in the customize() method, as shown below:

public final void customize(ServiceInstance serviceInstance) {
    // Get the metadata field of the ServiceInstance object
    Map<String, String> metadata = serviceInstance.getMetadata();
    // Generate the key-value pair to be added to the metadata collection
    String propertyName = resolveMetadataPropertyName(serviceInstance);
    String propertyValue = resolveMetadataPropertyValue(serviceInstance);
    // Check if the key-value pair to be added is not empty
    if (!isBlank(propertyName) && !isBlank(propertyValue)) {
        String existedValue = metadata.get(propertyName);
        boolean put = existedValue == null || isOverride();
        if (put) { // Whether to override the original value
            metadata.put(propertyName, propertyValue);
        }
    }
}

The abstract methods resolveMetadataPropertyName(), resolveMetadataPropertyValue(), and isOverride() are implemented in the subclasses of ServiceInstanceMetadataCustomizer.

In the implementation of ExportedServicesRevisionMetadataCustomizer, the resolveMetadataPropertyName() method returns the fixed string “dubbo.exported-services.revision”, and the resolveMetadataPropertyValue() method retrieves all the URLs published by the current ServiceInstance object using the WritableMetadataService, and then calculates the revision value. The implementation is as follows:

protected String resolveMetadataPropertyValue(ServiceInstance serviceInstance) {
    // Get the method of storing metadata of the current ServiceInstance object (local or remote)
    String metadataStorageType = getMetadataStorageType(serviceInstance);
    // Get the corresponding WritableMetadataService object and retrieve all the metadata published by the current ServiceInstance
    WritableMetadataService writableMetadataService = getExtension(metadataStorageType);
    SortedSet<String> exportedURLs = writableMetadataService.getExportedURLs();
    // Calculate the revision value of the entire exportedURLs collection
    URLRevisionResolver resolver = new URLRevisionResolver();
    return resolver.resolve(exportedURLs);
}

It should be noted that the core implementation of calculating the revision value is as follows: first, get the method signature and corresponding URL parameters of each service interface method, then calculate the hash code and sum them up, and if the revision value is not obtained in this way, return the placeholder string “N/A”. The implementation of the URLRevisionResolver.resolve() method is relatively simple, and it will not be shown here. If you’re interested, you can refer to the source code for learning.

In the implementation of SubscribedServicesRevisionMetadataCustomizer, the resolveMetadataPropertyName() method returns the fixed string “dubbo.subscribed-services.revision”, and the resolveMetadataPropertyValue() method retrieves all the URLs referenced by the current ServiceInstance object using the WritableMetadataService, and then calculates the revision value. The implementation is as follows:

protected String resolveMetadataPropertyValue(ServiceInstance serviceInstance) {
    String metadataStorageType = getMetadataStorageType(serviceInstance);
    WritableMetadataService writableMetadataService = getExtension(metadataStorageType);
    // Get the subscribedServiceURLs collection
    SortedSet<String> subscribedURLs = writableMetadataService.getSubscribedURLs();
    URLRevisionResolver resolver = new URLRevisionResolver();
    // Calculate the revision value
    return resolver.resolve(subscribedURLs);
}

In the implementation of MetadataServiceURLParamsMetadataCustomizer, the resolveMetadataPropertyName() method returns the fixed string “dubbo.metadata-service.url-params”, and the resolveMetadataPropertyValue() method returns the parameters of the MetadataService service URL.

Regarding the implementation of RefreshServiceMetadataCustomizer, let’s first focus on its execution order. It overrides the getPriority() method, and the implementation is as follows:

public int getPriority() {
    return MIN_PRIORITY; // Lowest priority
}

This ensures that RefreshServiceMetadataCustomizer is executed after the ServiceInstanceMetadataCustomizer implementation introduced earlier (ServiceInstanceMetadataCustomizer has a priority of NORMAL_PRIORITY).

In the implementation of the customize() method, RefreshServiceMetadataCustomizer retrieves the URL revision of the services published and referenced by the ServiceInstance and updates them in the metadata center. The implementation is as follows:

public void customize(ServiceInstance serviceInstance) {
    String metadataStoredType = getMetadataStorageType(serviceInstance);
    WritableMetadataService writableMetadataService = getExtension(metadataStoredType);
    // Get the two revisions from the ServiceInstance.metadata collection and call the refreshMetadata() method to update them
    writableMetadataService.refreshMetadata(getExportedServicesRevision(serviceInstance),
            getSubscribedServicesRevision(serviceInstance));
}

Among the implementations of the WritableMetadataService interface, only RemoteWritableMetadataService implements the refreshMetadata() method, which checks whether the two revision values have changed. If they have, it updates the respective URL collections in the metadata center. The implementation is as follows:

   private List<URL> getExportedURLs(ServiceInstance serviceInstance) {
        List<URL> exportedURLs = null;
        try {
            // 获取当前URL的元数据
            exportedURLs = getMetadataService().getExportedURLs(serviceInstance.toURL());
        } catch (Throwable t) {
            throwException(serviceName(), t.getMessage(), t);
        }
        return exportedURLs;
    }

这里的 getMetadataService() 方法是一个抽象方法,具体的实现是在 ServiceDiscoveryRegistry 内部类 ServiceInstanceMetadataCustomizer 中。具体实现如下:

private MetadataService getMetadataService() {
        URL appMetadataURL = registryParamMetadataIdentifierURL;
        if (metadataServiceCache != null) {
            return metadataServiceCache.get(appMetadataURL);
        }
        if (metadataService == null) {
            synchronized (this) {
                if (metadataService == null) {
                    setMetadataService(buildMetadataService(appMetadataURL));
                }
            }
        }
        return metadataService;
    }

    private MetadataService buildMetadataService(URL appMetadataURL) {
        if (metadataService != null) {
            return metadataService;
        }
        putting.exclude();
        metadataService = MetadataService.class.cast(proxyFactory.getProxy(
                putting
                        .metadataType(MetadataService.class.getName())
                        .url(appMetadataURL)
                        .build()));
        return metadataService;
    }

这里的 proxyFactory 是 Dubbo 中的 ProxyFactory 实例,它会根据传入的 URL 创建一个服务的代理对象。在这个代理对象中,会自动调用远程服务对应接口中的方法。在这里,会调用 MetadataService 接口的 getExportedURLs() 方法,并传入当前 ServiceInstance 对应的 URL 作为参数。所以,具体的请求 URL 是通过 getMetadataService().getExportedURLs(serviceInstance.toURL()) 这行代码实现的。

总结 #

在 ServiceDiscoveryRegistry 类中,当 serviceRevisionExportedURLsCache 缓存中没有某个 ServiceInstance 的 revision 值时,会调用该 ServiceInstance 的 MetadataService 接口的实现,通过 URL 查询该 ServiceInstance 发布的 URL 集合。具体的调用逻辑是通过 ProxyFactory 创建一个 MetadataService 接口的代理对象,并调用其 getExportedURLs() 方法。返回的结果即为该 ServiceInstance 发布的 URL 列表。

private List<URL> getExportedURLs(ServiceInstance providerServiceInstance) {
    List<URL> exportedURLs = null;
    // Get the metadata storage type of the specified ServiceInstance instance
    String metadataStorageType = getMetadataStorageType(providerServiceInstance);
    try {
        // Create a local proxy for the MetadataService interface
        MetadataService metadataService = MetadataServiceProxyFactory.getExtension(metadataStorageType)
                .getProxy(providerServiceInstance);
        if (metadataService != null) {
            // Request the MetadataService service of this ServiceInstance through the local proxy
            SortedSet<String> urls = metadataService.getExportedURLs();
            exportedURLs = toURLs(urls);
        }
    } catch (Throwable e) {
        exportedURLs = null; // Set exportedURLs to null
    }
    return exportedURLs;
}

Here there is a new interface called MetadataServiceProxyFactory, which is a factory class for creating local proxies for MetadataService. The inheritance relationship is shown below:

Drawing 2.png

MetadataServiceProxyFactory inheritance diagram

The BaseMetadataServiceProxyFactory provides common functionality for caching local proxies for the MetadataService. It maintains a collection of proxies (HashMap), where the key is a combination of the Service Name and the revision value of a ServiceInstance, and the value is the local proxy object for the MetadataService of that ServiceInstance. The creation of local proxies for the MetadataService is implemented in the createProxy() abstract method, which is implemented by subclasses of BaseMetadataServiceProxyFactory.

Now let’s take a look at the two implementations of BaseMetadataServiceProxyFactory: DefaultMetadataServiceProxyFactory and RemoteMetadataServiceProxyFactory.

In the createProxy() method of DefaultMetadataServiceProxyFactory, it first obtains the URL for the MetadataService interface through the MetadataServiceURLBuilder. Then, it refers to the MetadataService service published by the specified ServiceInstance using the Protocol interface, and obtains the corresponding Invoker object. Finally, it creates the local proxy for the MetadataService based on the Invoker object using the ProxyFactory.

protected MetadataService createProxy(ServiceInstance serviceInstance) {
    MetadataServiceURLBuilder builder = null;
    ExtensionLoader<MetadataServiceURLBuilder> loader
            = ExtensionLoader.getExtensionLoader(MetadataServiceURLBuilder.class);
    Map<String, String> metadata = serviceInstance.getMetadata();
    // In Spring Cloud, the metadata collection will include the entire METADATA_SERVICE_URLS_PROPERTY_NAME key
    String dubboURLsJSON = metadata.get(METADATA_SERVICE_URLS_PROPERTY_NAME);
    if (StringUtils.isNotEmpty(dubboURLsJSON)) {
        builder = loader.getExtension(SpringCloudMetadataServiceURLBuilder.NAME);
    } else {
        builder = loader.getExtension(StandardMetadataServiceURLBuilder.NAME);
    }
    // Build the URL collection corresponding to the MetadataService service
    List<URL> urls = builder.build(serviceInstance); 
    // Reference the service and create the Invoker
    // Note that even if the MetadataService interface uses multiple protocols, only the first protocol will be used here
    Invoker<MetadataService> invoker = protocol.refer(MetadataService.class, urls.get(0));
    // Create the local proxy object for the MetadataService
    return proxyFactory.getProxy(invoker);
}

Now let’s take a look at the logic for creating the URL for the MetadataService service in the MetadataServiceURLBuilder interface. The following diagram shows the implementation of the MetadataServiceURLBuilder interface:

Drawing 3.png

The SpringCloudMetadataServiceURLBuilder is an implementation that is compatible with Spring Cloud, but we won’t go into details here. Let’s focus on the implementation of StandardMetadataServiceURLBuilder, which constructs the URL for the MetadataService service based on the URL parameters carried by the ServiceInstance’s metadata, the Service Name, and the host of the ServiceInstance:

public List<URL> build(ServiceInstance serviceInstance) {
    // Get the value of the "dubbo.metadata-service.url-params" key from the metadata collection
    // This key is written by MetadataServiceURLParamsMetadataCustomizer
    Map<String, Map<String, String>> paramsMap = getMetadataServiceURLsParams(serviceInstance);
    List<URL> urls = new ArrayList<>(paramsMap.size());
    // Get the Service Name
    String serviceName = serviceInstance.getServiceName();
    // Get the host that the ServiceInstance listens on
    String host = serviceInstance.getHost();
    // The MetadataService interface may be published in multiple protocols
    // Iterate through the paramsMap collection and generate a URL for each protocol
    for (Map.Entry<String, Map<String, String>> entry : paramsMap.entrySet()) {
        String protocol = entry.getKey();
        Map<String, String> params = entry.getValue();
        int port = Integer.parseInt(params.get(PORT_KEY));
        URLBuilder urlBuilder = new URLBuilder()
                .setHost(host)
.setPort(port)
.setProtocol(protocol)
.setPath(MetadataService.class.getName());

params.forEach((name, value) -> urlBuilder.addParameter(name, valueOf(value))); urlBuilder.addParameter(GROUP_KEY, serviceName); urls.add(urlBuilder.build()); } return urls; }

Next, let’s take a look at the implementation of the RemoteMetadataServiceProxyFactory class. The createProxy() method creates a RemoteMetadataServiceProxy object and returns it. In Lesson 44, when we introduced the MetadataService interface, we focused on the implementations of the WritableMetadataService sub-interface and did not mention the RemoteMetadataServiceProxy implementation. The following diagram shows the position of the RemoteMetadataServiceProxy in the inheritance hierarchy:

Drawing 4.png

RemoteMetadataServiceProxy inheritance diagram

As the local proxy of RemoteWritableMetadataService, the implementation of methods like getExportedURLs() and getServiceDefinition() in RemoteMetadataServiceProxy relies entirely on MetadataReport for implementation. Let’s take the getExportedURLs() method as an example:

public SortedSet<String> getExportedURLs(String serviceInterface, String group, String version, String protocol) {
// Use the getMetadataReport() method to obtain the implementation object of MetadataReport, and use its getExportedURLs() method to query. The query conditions are wrapped into a ServiceMetadataIdentifier object, including service interface, group, version, and revision, among other information. In the case of the ZookeeperMetadataReport implementation, the useful information is the revision and protocol.
return toSortedStrings(getMetadataReport().getExportedURLs(
new ServiceMetadataIdentifier(serviceInterface, group, version, PROVIDER_SIDE, revision, protocol)));
}

At this point, the URLs of the different revision versions in the serviceRevisionExportedURLsCache cache have been updated with the latest data.

(4) Generate SubscribedURL #

After obtaining the latest version of URL collection, the cloneExportedURLs() method is called to generate the final subscribedURL collection for the service based on the template URL (i.e., subscribedURL) and the metadata published by each ServiceInstance.

private List<URL> cloneExportedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
if (isEmpty(serviceInstances)) {
return emptyList();
}
List<URL> clonedExportedURLs = new LinkedList<>();
serviceInstances.forEach(serviceInstance -> {
// Get the host of the ServiceInstance
String host = serviceInstance.getHost();
// Get the template URL collection of the ServiceInstance. The getTemplateExportedURLs() method retrieves the corresponding URL collection from the serviceRevisionExportedURLsCache cache based on the Service Name and the revision of the current ServiceInstance. Additionally, it filters based on the protocol, group, version, and other parameters of the subscribedURL.
getTemplateExportedURLs(subscribedURL, serviceInstance)
.stream()
// Remove timestamp, pid, and other parameters
.map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
.map(templateURL -> templateURL.removeParameter(PID_KEY))
.map(templateURL -> {
// Get the port number corresponding to the protocol from the ServiceInstance.metadata collection
String protocol = templateURL.getProtocol();
int port = getProtocolPort(serviceInstance, protocol);
if (Objects.equals(templateURL.getHost(), host)
&& Objects.equals(templateURL.getPort(), port)) { // use templateURL if equals
return templateURL;
}
// Override the host and port parameters
URLBuilder clonedURLBuilder = from(templateURL)
.setHost(host)
.setPort(port);
return clonedURLBuilder.build();
})
.forEach(clonedExportedURLs::add); // Record the newly generated URL
});
return clonedExportedURLs;
}

The implementation of the getProtocolPort() method retrieves the endpoints list (key: dubbo.endpoints) from the ServiceInstance.metadata collection. The implementation code is as follows:

public static Integer getProtocolPort(ServiceInstance serviceInstance, String protocol) {
Map<String, String> metadata = serviceInstance.getMetadata();
// Query from the metadata collection
String rawEndpoints = metadata.get("dubbo.endpoints");
if (StringUtils.isNotEmpty(rawEndpoints)) {
// Deserialize the JSON-formatted data. Here, Endpoint is an inner class of ServiceDiscoveryRegistry with only two fields: port and protocol.
List<Endpoint> endpoints = JSON.parseArray(rawEndpoints, Endpoint.class);
for (Endpoint endpoint : endpoints) {
// Get the corresponding port based on the Protocol
if (endpoint.getProtocol().equals(protocol)) {
    return endpoint.getPort();
}
}
}
return null;
}

The ServiceInstance.metadata collection sets another implementation of the ServiceInstanceCustomizer interface for the Endpoint collection - ProtocolPortsMetadataCustomizer, mainly to notify the Consumer end of the different ports listened by different Protocols. The specific implementation of the ProtocolPortsMetadataCustomizer.customize() method is as follows:

public void customize(ServiceInstance serviceInstance) {
    // Get WritableMetadataService
    String metadataStoredType = getMetadataStorageType(serviceInstance);
    WritableMetadataService writableMetadataService = getExtension(metadataStoredType);
    Map<String, Integer> protocols = new HashMap<>();
    // Get the URLs corresponding to various Protocol exported by the current ServiceInstance first
    writableMetadataService.getExportedURLs()
            .stream().map(URL::valueOf)
            // Filter out the MetadataService interface
            .filter(url -> !MetadataService.class.getName().equals(url.getServiceInterface()))
            .forEach(url -> {
                // Record the mapping relationship between Protocol and port
                protocols.put(url.getProtocol(), url.getPort());
            });
    // Convert the mapping relationship in protocols into an Endpoint object, then serialize it into a JSON string, and set it to the metadata collection of the ServiceInstance
    setEndpoints(serviceInstance, protocols);
}

Up to this point, the core process of the getExportedURLs() method has been introduced.

4. SubscribedURLsSynthesizer

Finally, let's take a look at the implementation of the synthesizeSubscribedURLs() method, which uses the SubscribedURLsSynthesizer interface, which is defined as follows:

@SPI
public interface SubscribedURLsSynthesizer extends Prioritized {
    // Whether supports the URL type
    boolean supports(URL subscribedURL);
    // Synthesize the complete subscribedURL collection based on subscribedURL and ServiceInstance information
    List<URL> synthesize(URL subscribedURL, Collection<ServiceInstance> serviceInstances);
}

At present, Dubbo only provides the implementation of the rest protocol - RestProtocolSubscribedURLsSynthesizer, which will synthesize the complete URL based on the service interface and ServiceInstance's host, port, Service Name, etc. in the subscribedURL. The specific implementation is as follows:

public List<URL> synthesize(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
    // Get Protocol
    String protocol = subscribedURL.getParameter(PROTOCOL_KEY); 
    return serviceInstances.stream().map(serviceInstance -> {
        URLBuilder urlBuilder = new URLBuilder()
                .setProtocol(protocol)
                // Use the host and port of ServiceInstance
                .setHost(serviceInstance.getHost()) 
                .setPort(serviceInstance.getPort())
                // Set the business interface
                .setPath(subscribedURL.getServiceInterface()) 
                .addParameter(SIDE_KEY, PROVIDER)
                // Set Service Name
                .addParameter(APPLICATION_KEY, serviceInstance.getServiceName())
                .addParameter(REGISTER_KEY, TRUE.toString());
        return urlBuilder.build();
    }).collect(Collectors.toList());
}

At this point, we have finished introducing the content of the entire ServiceDiscoveryRegistry.

Summary

In this lesson, we focus on the core implementation of service publishing and service subscription functions in the service introspection architecture of Dubbo, as well as the compatibility implementation of the Registry interface in the traditional Dubbo architecture, which is the core implementation of ServiceDiscoveryRegistry.

First, we explain the core implementation of service registration in ServiceDiscoveryRegistry, and then we introduce the implementation of service subscription function in ServiceDiscoveryRegistry in detail, which involves operations such as querying Service Instance and Service Name, and calling MetadataService service, finally obtaining SubscribedURL.

In the next lesson, we will start to introduce the content related to the configuration center in the service introspection architecture of Dubbo. Remember to come to class on time.