45 Add Meal Deep Into Service Downgrading in the Service Publish Subscribe Above

45 Add Meal Deep Into Service Downgrading in the Service Publish Subscribe Above #

In Lesson 43, when we introduced Dubbo’s service discovery solution, we saw that in addition to requiring support for metadata, we also need support for service publish-subscribe functionality to create a complete service discovery architecture.

In this lesson, we will explain the specific implementation of service instance publishing and subscribing in Dubbo: first, we will explain the core definition of the ServiceDiscovery interface, and then focus on the implementation of ZookeeperServiceDiscovery with ZooKeeper as the registry center, which also involves the implementation of related event listeners.

ServiceDiscovery Interface #

The ServiceDiscovery interface mainly encapsulates the operations of publishing and subscribing to ServiceInstance, which can be temporarily understood as a registry for ServiceInstance. The definition of the ServiceDiscovery interface is as follows:

@SPI("zookeeper")
public interface ServiceDiscovery extends Prioritized {
    // Initialize the current ServiceDiscovery instance with the registry URL
    void initialize(URL registryURL) throws Exception;
    // Destroy the current ServiceDiscovery instance
    void destroy() throws Exception;
    // Publish the incoming ServiceInstance instance
    void register(ServiceInstance serviceInstance) throws RuntimeException;
    // Update the incoming ServiceInstance instance
    void update(ServiceInstance serviceInstance) throws RuntimeException;
    // Unregister the incoming ServiceInstance instance
    void unregister(ServiceInstance serviceInstance) throws RuntimeException;
    // Get all Service Names
    Set<String> getServices();
    // The default number of items per page for paginated queries
    default int getDefaultPageSize() {
        return 100;
    }
    // Paginated query of ServiceInstance based on ServiceName
    default List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
        List<ServiceInstance> allInstances = new LinkedList<>();
        int offset = 0;
        int pageSize = getDefaultPageSize();
        // Paginated query of ServiceInstance
        Page<ServiceInstance> page = getInstances(serviceName, offset, pageSize);
        allInstances.addAll(page.getData());
        while (page.hasNext()) {
            offset += page.getDataSize();
            page = getInstances(serviceName, offset, pageSize);
            allInstances.addAll(page.getData());
        }
        return unmodifiableList(allInstances);
    }
    default Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize) throws NullPointerException,
            IllegalArgumentException {
        return getInstances(serviceName, offset, pageSize, false);
    }
    default Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly) throws
            NullPointerException, IllegalArgumentException, UnsupportedOperationException {
        throw new UnsupportedOperationException("Current implementation does not support pagination query method.");
    }
    default Map<String, Page<ServiceInstance>> getInstances(Iterable<String> serviceNames, int offset, int requestSize) throws
            NullPointerException, IllegalArgumentException {
        Map<String, Page<ServiceInstance>> instances = new LinkedHashMap<>();
        for (String serviceName : serviceNames) {
            instances.put(serviceName, getInstances(serviceName, offset, requestSize));
        }
        return unmodifiableMap(instances);
    }
    // Add ServiceInstance listener
    default void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
            throws NullPointerException, IllegalArgumentException {
    }
    // Trigger ServiceInstancesChangedEvent event
    default void dispatchServiceInstancesChangedEvent(String serviceName) {
        dispatchServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
    }
    default void dispatchServiceInstancesChangedEvent(String serviceName, String... otherServiceNames) {
        dispatchServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
        if (otherServiceNames != null) {
            Stream.of(otherServiceNames)
                    .filter(StringUtils::isNotEmpty)
                    .forEach(this::dispatchServiceInstancesChangedEvent);
        }
    }
    default void dispatchServiceInstancesChangedEvent(String serviceName, Collection<ServiceInstance> serviceInstances) {
        dispatchServiceInstancesChangedEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
    }
    default void dispatchServiceInstancesChangedEvent(ServiceInstancesChangedEvent event) {
        getDefaultExtension().dispatch(event);
    }
}

The ServiceDiscovery interface is annotated with @SPI and is an extension point. Different ServiceDiscovery implementations exist for different registries. The inheritance diagram of ServiceDiscovery is shown in the following diagram:

Lark20201229-160604.png

ServiceDiscovery Inheritance Diagram

When creating a ServiceDiscovery object in Dubbo, it is created through the ServiceDiscoveryFactory factory class. The ServiceDiscoveryFactory interface is also an extension point, and Dubbo provides a default implementation called DefaultServiceDiscoveryFactory. The inheritance diagram of ServiceDiscoveryFactory is shown in the following diagram:

Lark20201229-160606.png

ServiceDiscoveryFactory Inheritance Diagram

In AbstractServiceDiscoveryFactory, a ConcurrentMap collection (the discoveries field) is maintained to cache ServiceDiscovery objects. It also provides an abstract method createDiscovery() to create a ServiceDiscovery instance.

public ServiceDiscovery getServiceDiscovery(URL registryURL) {
    String key = registryURL.toServiceStringWithoutResolving();
    return discoveries.computeIfAbsent(key, k -> createDiscovery(registryURL));
}

In DefaultServiceDiscoveryFactory, the createDiscovery() method is implemented to use the Dubbo SPI mechanism to obtain the corresponding ServiceDiscovery object. The implementation is as follows:

protected ServiceDiscovery createDiscovery(URL registryURL) {
    String protocol = registryURL.getProtocol();
    ExtensionLoader<ServiceDiscovery> loader = getExtensionLoader(ServiceDiscovery.class);
    return loader.getExtension(protocol);
}

Analysis of ZookeeperServiceDiscovery Implementation #

Dubbo provides multiple ServiceDiscovery implementations for different registry systems. Let’s take ZookeeperServiceDiscovery as an example to illustrate how Dubbo integrates with ZooKeeper as a registry system to publish and subscribe to service instances.

In ZookeeperServiceDiscovery, it encapsulates a ServiceDiscovery object from Apache Curator to interact with ZooKeeper. In the initialize() method, CuratorFramework and Curator ServiceDiscovery objects are initialized as shown below:

public void initialize(URL registryURL) throws Exception {
    ... // omit the logic of initializing EventDispatcher
    // initialize CuratorFramework
    this.curatorFramework = buildCuratorFramework(registryURL);
    // determine the rootPath, default is "/services"
    this.rootPath = ROOT_PATH.getParameterValue(registryURL);
    // initialize Curator ServiceDiscovery and start it
    this.serviceDiscovery = buildServiceDiscovery(curatorFramework, rootPath);
    this.serviceDiscovery.start();
}

The methods in ZookeeperServiceDiscovery mostly call the corresponding methods from the Curator ServiceDiscovery object. For example, the register(), update(), and unregister() methods all call the corresponding methods from the Curator ServiceDiscovery object to add, update, and remove the ServiceInstance. Let’s take the register() method as an example:

public void register(ServiceInstance serviceInstance) throws RuntimeException {
    doInServiceRegistry(serviceDiscovery -> {
        serviceDiscovery.registerService(build(serviceInstance));
    });
}

// The `build()` method converts the `ServiceInstance` object from Dubbo to the `ServiceInstance` object from Curator
public static org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> build(ServiceInstance serviceInstance) {
    ServiceInstanceBuilder builder = null;
    // Get the Service Name
    String serviceName = serviceInstance.getServiceName();
    String host = serviceInstance.getHost();
    int port = serviceInstance.getPort();
    // Get the metadata
    Map<String, String> metadata = serviceInstance.getMetadata();
    // The generated id format is "host:ip"
    String id = generateId(host, port);
    // `ZookeeperInstance` is the payload of Curator ServiceInstance
    ZookeeperInstance zookeeperInstance = new ZookeeperInstance(null, serviceName, metadata);
    builder = builder().id(id).name(serviceName).address(host).port(port)
            .payload(zookeeperInstance);
    return builder.build();
}

In addition to the functionality of publishing service instances mentioned above, when subscribing to service instances, ZookeeperServiceDiscovery also uses the ZookeeperServiceDiscovery to query information about service instances. These methods directly depend on the implementation in Apache Curator. For example, the getServices() method calls the queryForNames() method from the Curator ServiceDiscovery object to query for service names, and the getInstances() method calls the queryForInstances() method from the Curator ServiceDiscovery object to query for service instances.

EventListener Interface #

In addition to implementing the ServiceDiscovery interface, ZookeeperServiceDiscovery also implements the EventListener interface, as shown in the following diagram:

Drawing 2.png ZookeeperServiceDiscovery Inheritance Diagram

In other words, the ZookeeperServiceDiscovery is also an implementation of the EventListener interface, which allows it to listen to certain events as an EventListener. Let’s start by looking at the definition of the EventListener interface in Dubbo, which focuses on three methods: onEvent() method, getPriority() method, and the findEventType() utility method.

@SPI
@FunctionalInterface
public interface EventListener<E extends Event> extends java.util.EventListener, Prioritized {
    // This method is called when an event of interest to this EventListener occurs
    void onEvent(E event); 
    // The priority of the current EventListener
    default int getPriority() { 
        return MIN_PRIORITY;
    }
    // Get the type of Event that the incoming EventListener object listens to
    static Class<? extends Event> findEventType(EventListener<?> listener) {
        return findEventType(listener.getClass());
    }

    static Class<? extends Event> findEventType(Class<?> listenerClass) {
        Class<? extends Event> eventType = null;
        // Check whether the incoming listenerClass is an implementation of Dubbo's EventListener interface
        if (listenerClass != null && EventListener.class.isAssignableFrom(listenerClass)) {
            eventType = findParameterizedTypes(listenerClass)
                    .stream()
                    .map(EventListener::findEventType) // Get the Event generics defined in listenerClass
                    .filter(Objects::nonNull)
                    .findAny()
                    // Get the Event generics defined in the parent class of listenerClass
                    .orElse((Class) findEventType(listenerClass.getSuperclass()));
        }
        return eventType;
    }
    ... // The findEventType() method is used to filter whether the incoming parameterizedType is an Event or a subclass of Event (implementation details omitted here)
}

Dubbo has implemented many EventListener interfaces, as shown in the following diagram:

Drawing 3.png

EventListener Inheritance Diagram

Let’s focus on the implementation of ZookeeperServiceDiscovery. In its onEvent() method (as well as the addServiceInstancesChangedListener() method), it calls the registerServiceWatcher() method to re-register:

public void onEvent(ServiceInstancesChangedEvent event) {
    // The serviceName of the ServiceInstancesChangedEvent
    String serviceName = event.getServiceName();
    // Re-register the watcher
    registerServiceWatcher(serviceName);
}
protected void registerServiceWatcher(String serviceName) {
    // Build the path to be monitored
    String path = buildServicePath(serviceName);
    // Create the ZookeeperServiceDiscoveryChangeWatcher listener and record it in the watcherCaches cache
    CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key ->
            new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName));
    // Add the constructed ZookeeperServiceDiscoveryChangeWatcher listener to the path
    // to monitor changes in child nodes
    curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
}

ZookeeperServiceDiscoveryChangeWatcher is the CuratorWatcher implementation associated with ZookeeperServiceDiscovery. Its process() method focuses on NodeChildrenChanged and NodeDataChanged events, and calls the dispatchServiceInstancesChangedEvent() method of the associated ZookeeperServiceDiscovery object. The implementation is as follows:

public void process(WatchedEvent event) throws Exception {
    // Get the type of the event being listened to
    Watcher.Event.EventType eventType = event.getType();
    // Only interested in NodeChildrenChanged and NodeDataChanged event types here
    if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) {
        // Call the dispatchServiceInstancesChangedEvent() method to dispatch the ServiceInstancesChangedEvent event
        zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
    }
}

From the above analysis, we can see that the core of ZookeeperServiceDiscoveryChangeWatcher is to convert events from ZooKeeper into Dubbo’s internal ServiceInstancesChangedEvent events.

EventDispatcher interface #

From our analysis of the ZookeeperServiceDiscovery implementation, we can see that it does not override the dispatchServiceInstancesChangedEvent() method. Therefore, the dispatchServiceInstancesChangedEvent() method called in ZookeeperServiceDiscoveryChangeWatcher is the default implementation in the ServiceDiscovery interface. In this default implementation, the default implementation of EventDispatcher is obtained through Dubbo SPI, and the ServiceInstancesChangedEvent event is dispatched. The implementation is as follows:

default void dispatchServiceInstancesChangedEvent(ServiceInstancesChangedEvent event) {
    EventDispatcher.getDefaultExtension().dispatch(event);
}

Now let’s take a look at the specific definition of the EventDispatcher interface:

@SPI("direct")
public interface EventDispatcher extends Listenable<EventListener<?>> {
    // This thread pool is used for invoking the triggered EventListeners in a serial manner, also known as the direct mode
    Executor DIRECT_EXECUTOR = Runnable::run;
    // Dispatch the triggered event to the corresponding EventListener objects
    void dispatch(Event event);
    // Get the thread pool used in the direct mode
    default Executor getExecutor() {
        return DIRECT_EXECUTOR;
    }
    // Utility method for getting the default implementation of the EventDispatcher interface
    static EventDispatcher getDefaultExtension() {
        return ExtensionLoader.getExtensionLoader(EventDispatcher.class).getDefaultExtension();
    }
}

The EventDispatcher interface is annotated with @SPI and is an extension point. Dubbo provides two concrete implementations, ParallelEventDispatcher and DirectEventDispatcher, as shown in the following diagram:

Drawing 4.png

In the AbstractEventDispatcher, two core fields are maintained.

  • listenersCache (ConcurrentMap, List>) is used to record the EventListener collections that listen to events of various types. When AbstractEventDispatcher is initialized, all implementations of EventListener are loaded and added to the listenersCache collection by calling the addEventListener() method.
  • executor (Executor type) is initialized in the constructor of the AbstractEventDispatcher. When AbstractEventDispatcher receives the corresponding event, this thread pool is used to trigger the corresponding EventListener collection.

The addEventListener(), removeEventListener(), and getAllEventListeners() methods in AbstractEventDispatcher are all implemented by manipulating the listenersCache collection. The specific implementation is relatively simple, and I will not show it here. If you are interested, you can refer to the source code for learning.

Another method to pay attention to in AbstractEventDispatcher is the dispatch() method. This method filters out the EventListener objects that meet the conditions from the listenersCache collection and notifies them in serial or parallel mode. The specific implementation is as follows:

public void dispatch(Event event) {
    // Get the thread pool for notifying EventListeners, which is serial mode by default, also known as the direct implementation
    Executor executor = getExecutor();
    executor.execute(() -> {
        sortedListeners(entry -> entry.getKey().isAssignableFrom(event.getClass()))
                .forEach(listener -> {
                    if (listener instanceof ConditionalEventListener) { // Special handling for ConditionalEventListener
                        ConditionalEventListener predicateEventListener = (ConditionalEventListener) listener;
                        if (!predicateEventListener.accept(event)) {
                            return;
                        }
                    }
                    // Notify the EventListener
                    listener.onEvent(event);
                });
    });
}
// The sortedListeners method here filters and sorts the listenerCache
protected Stream<EventListener> sortedListeners(Predicate<Map.Entry<Class<? extends Event>, List<EventListener>>> predicate) {
    return listenersCache
            .entrySet()
            .stream()
            .filter(predicate)
            .map(Map.Entry::getValue)
            .flatMap(Collection::stream)
            .sorted();
}

AbstractEventDispatcher has already implemented the core logic of EventDispatcher, which is to dispatch Event events and notify EventListeners. Then, it only needs to be determined whether it is parallel notification mode or serial notification mode in ParallelEventDispatcher and DirectEventDispatcher.

In ParallelEventDispatcher, the thread pool for notifying EventListeners is ForkJoinPool, which means it is in parallel mode. In DirectEventDispatcher, EventDispatcher.DIRECT_EXECUTOR thread pool is used, which means it is in serial mode. The implementation of these two EventDispatchers is relatively simple, and I will not show it here.

Let’s go back to ZookeeperServiceDiscovery. In its constructor, it obtains the default implementation of EventDispatcher and calls the addEventListener() method to add the ZookeeperServiceDiscovery object to the listenersCache collection to listen for the ServiceInstancesChangedEvent event. ZookeeperServiceDiscovery directly inherits the default implementation of the dispatchServiceInstancesChangedEvent() method in the ServiceDiscovery interface and does not override it. In this method, it obtains the default implementation of EventDispatcher and calls the dispatch() method to dispatch the ServiceInstancesChangedEvent event.

Summary #

In this lesson, we focused on the basics of service instance publishing and subscription in the Dubbo service discovery solution.

First, we explained the core definition of the ServiceDiscovery interface, which defines the core methods for service instance publishing and subscription. Then, we analyzed the ZookeeperServiceDiscovery implementation of using ZooKeeper as the registry center. We also explained the related implementation of adding listeners to ZookeeperServiceDiscovery and the mechanism of ZookeeperServiceDiscovery handling the ServiceInstancesChangedEvent event.

In the next lesson, we will continue to introduce the implementation of service instance publishing and subscription in the Dubbo service discovery solution. Make sure to tune in on time.