45 Add Meal Deep Into Service Downgrading in the Service Publish Subscribe Above #
In Lesson 43, when we introduced Dubbo’s service discovery solution, we saw that in addition to requiring support for metadata, we also need support for service publish-subscribe functionality to create a complete service discovery architecture.
In this lesson, we will explain the specific implementation of service instance publishing and subscribing in Dubbo: first, we will explain the core definition of the ServiceDiscovery interface, and then focus on the implementation of ZookeeperServiceDiscovery with ZooKeeper as the registry center, which also involves the implementation of related event listeners.
ServiceDiscovery Interface #
The ServiceDiscovery interface mainly encapsulates the operations of publishing and subscribing to ServiceInstance, which can be temporarily understood as a registry for ServiceInstance. The definition of the ServiceDiscovery interface is as follows:
@SPI("zookeeper")
public interface ServiceDiscovery extends Prioritized {
// Initialize the current ServiceDiscovery instance with the registry URL
void initialize(URL registryURL) throws Exception;
// Destroy the current ServiceDiscovery instance
void destroy() throws Exception;
// Publish the incoming ServiceInstance instance
void register(ServiceInstance serviceInstance) throws RuntimeException;
// Update the incoming ServiceInstance instance
void update(ServiceInstance serviceInstance) throws RuntimeException;
// Unregister the incoming ServiceInstance instance
void unregister(ServiceInstance serviceInstance) throws RuntimeException;
// Get all Service Names
Set<String> getServices();
// The default number of items per page for paginated queries
default int getDefaultPageSize() {
return 100;
}
// Paginated query of ServiceInstance based on ServiceName
default List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
List<ServiceInstance> allInstances = new LinkedList<>();
int offset = 0;
int pageSize = getDefaultPageSize();
// Paginated query of ServiceInstance
Page<ServiceInstance> page = getInstances(serviceName, offset, pageSize);
allInstances.addAll(page.getData());
while (page.hasNext()) {
offset += page.getDataSize();
page = getInstances(serviceName, offset, pageSize);
allInstances.addAll(page.getData());
}
return unmodifiableList(allInstances);
}
default Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize) throws NullPointerException,
IllegalArgumentException {
return getInstances(serviceName, offset, pageSize, false);
}
default Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly) throws
NullPointerException, IllegalArgumentException, UnsupportedOperationException {
throw new UnsupportedOperationException("Current implementation does not support pagination query method.");
}
default Map<String, Page<ServiceInstance>> getInstances(Iterable<String> serviceNames, int offset, int requestSize) throws
NullPointerException, IllegalArgumentException {
Map<String, Page<ServiceInstance>> instances = new LinkedHashMap<>();
for (String serviceName : serviceNames) {
instances.put(serviceName, getInstances(serviceName, offset, requestSize));
}
return unmodifiableMap(instances);
}
// Add ServiceInstance listener
default void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
}
// Trigger ServiceInstancesChangedEvent event
default void dispatchServiceInstancesChangedEvent(String serviceName) {
dispatchServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
}
default void dispatchServiceInstancesChangedEvent(String serviceName, String... otherServiceNames) {
dispatchServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
if (otherServiceNames != null) {
Stream.of(otherServiceNames)
.filter(StringUtils::isNotEmpty)
.forEach(this::dispatchServiceInstancesChangedEvent);
}
}
default void dispatchServiceInstancesChangedEvent(String serviceName, Collection<ServiceInstance> serviceInstances) {
dispatchServiceInstancesChangedEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
default void dispatchServiceInstancesChangedEvent(ServiceInstancesChangedEvent event) {
getDefaultExtension().dispatch(event);
}
}
The ServiceDiscovery
interface is annotated with @SPI
and is an extension point. Different ServiceDiscovery
implementations exist for different registries. The inheritance diagram of ServiceDiscovery
is shown in the following diagram:
ServiceDiscovery Inheritance Diagram
When creating a ServiceDiscovery
object in Dubbo, it is created through the ServiceDiscoveryFactory
factory class. The ServiceDiscoveryFactory
interface is also an extension point, and Dubbo provides a default implementation called DefaultServiceDiscoveryFactory
. The inheritance diagram of ServiceDiscoveryFactory
is shown in the following diagram:
ServiceDiscoveryFactory Inheritance Diagram
In AbstractServiceDiscoveryFactory
, a ConcurrentMap
collection (the discoveries
field) is maintained to cache ServiceDiscovery
objects. It also provides an abstract method createDiscovery()
to create a ServiceDiscovery
instance.
public ServiceDiscovery getServiceDiscovery(URL registryURL) {
String key = registryURL.toServiceStringWithoutResolving();
return discoveries.computeIfAbsent(key, k -> createDiscovery(registryURL));
}
In DefaultServiceDiscoveryFactory
, the createDiscovery()
method is implemented to use the Dubbo SPI mechanism to obtain the corresponding ServiceDiscovery
object. The implementation is as follows:
protected ServiceDiscovery createDiscovery(URL registryURL) {
String protocol = registryURL.getProtocol();
ExtensionLoader<ServiceDiscovery> loader = getExtensionLoader(ServiceDiscovery.class);
return loader.getExtension(protocol);
}
Analysis of ZookeeperServiceDiscovery Implementation #
Dubbo provides multiple ServiceDiscovery
implementations for different registry systems. Let’s take ZookeeperServiceDiscovery
as an example to illustrate how Dubbo integrates with ZooKeeper as a registry system to publish and subscribe to service instances.
In ZookeeperServiceDiscovery
, it encapsulates a ServiceDiscovery
object from Apache Curator to interact with ZooKeeper. In the initialize()
method, CuratorFramework and Curator ServiceDiscovery objects are initialized as shown below:
public void initialize(URL registryURL) throws Exception {
... // omit the logic of initializing EventDispatcher
// initialize CuratorFramework
this.curatorFramework = buildCuratorFramework(registryURL);
// determine the rootPath, default is "/services"
this.rootPath = ROOT_PATH.getParameterValue(registryURL);
// initialize Curator ServiceDiscovery and start it
this.serviceDiscovery = buildServiceDiscovery(curatorFramework, rootPath);
this.serviceDiscovery.start();
}
The methods in ZookeeperServiceDiscovery
mostly call the corresponding methods from the Curator ServiceDiscovery object. For example, the register()
, update()
, and unregister()
methods all call the corresponding methods from the Curator ServiceDiscovery object to add, update, and remove the ServiceInstance
. Let’s take the register()
method as an example:
public void register(ServiceInstance serviceInstance) throws RuntimeException {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.registerService(build(serviceInstance));
});
}
// The `build()` method converts the `ServiceInstance` object from Dubbo to the `ServiceInstance` object from Curator
public static org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> build(ServiceInstance serviceInstance) {
ServiceInstanceBuilder builder = null;
// Get the Service Name
String serviceName = serviceInstance.getServiceName();
String host = serviceInstance.getHost();
int port = serviceInstance.getPort();
// Get the metadata
Map<String, String> metadata = serviceInstance.getMetadata();
// The generated id format is "host:ip"
String id = generateId(host, port);
// `ZookeeperInstance` is the payload of Curator ServiceInstance
ZookeeperInstance zookeeperInstance = new ZookeeperInstance(null, serviceName, metadata);
builder = builder().id(id).name(serviceName).address(host).port(port)
.payload(zookeeperInstance);
return builder.build();
}
In addition to the functionality of publishing service instances mentioned above, when subscribing to service instances, ZookeeperServiceDiscovery
also uses the ZookeeperServiceDiscovery
to query information about service instances. These methods directly depend on the implementation in Apache Curator. For example, the getServices()
method calls the queryForNames()
method from the Curator ServiceDiscovery object to query for service names, and the getInstances()
method calls the queryForInstances()
method from the Curator ServiceDiscovery object to query for service instances.
EventListener Interface #
In addition to implementing the ServiceDiscovery
interface, ZookeeperServiceDiscovery
also implements the EventListener
interface, as shown in the following diagram:
ZookeeperServiceDiscovery Inheritance Diagram
In other words, the ZookeeperServiceDiscovery is also an implementation of the EventListener interface, which allows it to listen to certain events as an EventListener. Let’s start by looking at the definition of the EventListener interface in Dubbo, which focuses on three methods: onEvent() method, getPriority() method, and the findEventType() utility method.
@SPI
@FunctionalInterface
public interface EventListener<E extends Event> extends java.util.EventListener, Prioritized {
// This method is called when an event of interest to this EventListener occurs
void onEvent(E event);
// The priority of the current EventListener
default int getPriority() {
return MIN_PRIORITY;
}
// Get the type of Event that the incoming EventListener object listens to
static Class<? extends Event> findEventType(EventListener<?> listener) {
return findEventType(listener.getClass());
}
static Class<? extends Event> findEventType(Class<?> listenerClass) {
Class<? extends Event> eventType = null;
// Check whether the incoming listenerClass is an implementation of Dubbo's EventListener interface
if (listenerClass != null && EventListener.class.isAssignableFrom(listenerClass)) {
eventType = findParameterizedTypes(listenerClass)
.stream()
.map(EventListener::findEventType) // Get the Event generics defined in listenerClass
.filter(Objects::nonNull)
.findAny()
// Get the Event generics defined in the parent class of listenerClass
.orElse((Class) findEventType(listenerClass.getSuperclass()));
}
return eventType;
}
... // The findEventType() method is used to filter whether the incoming parameterizedType is an Event or a subclass of Event (implementation details omitted here)
}
Dubbo has implemented many EventListener interfaces, as shown in the following diagram:
EventListener Inheritance Diagram
Let’s focus on the implementation of ZookeeperServiceDiscovery. In its onEvent() method (as well as the addServiceInstancesChangedListener() method), it calls the registerServiceWatcher() method to re-register:
public void onEvent(ServiceInstancesChangedEvent event) {
// The serviceName of the ServiceInstancesChangedEvent
String serviceName = event.getServiceName();
// Re-register the watcher
registerServiceWatcher(serviceName);
}
protected void registerServiceWatcher(String serviceName) {
// Build the path to be monitored
String path = buildServicePath(serviceName);
// Create the ZookeeperServiceDiscoveryChangeWatcher listener and record it in the watcherCaches cache
CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key ->
new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName));
// Add the constructed ZookeeperServiceDiscoveryChangeWatcher listener to the path
// to monitor changes in child nodes
curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
}
ZookeeperServiceDiscoveryChangeWatcher is the CuratorWatcher implementation associated with ZookeeperServiceDiscovery. Its process() method focuses on NodeChildrenChanged and NodeDataChanged events, and calls the dispatchServiceInstancesChangedEvent() method of the associated ZookeeperServiceDiscovery object. The implementation is as follows:
public void process(WatchedEvent event) throws Exception {
// Get the type of the event being listened to
Watcher.Event.EventType eventType = event.getType();
// Only interested in NodeChildrenChanged and NodeDataChanged event types here
if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) {
// Call the dispatchServiceInstancesChangedEvent() method to dispatch the ServiceInstancesChangedEvent event
zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
}
}
From the above analysis, we can see that the core of ZookeeperServiceDiscoveryChangeWatcher is to convert events from ZooKeeper into Dubbo’s internal ServiceInstancesChangedEvent events.
EventDispatcher interface #
From our analysis of the ZookeeperServiceDiscovery implementation, we can see that it does not override the dispatchServiceInstancesChangedEvent() method. Therefore, the dispatchServiceInstancesChangedEvent() method called in ZookeeperServiceDiscoveryChangeWatcher is the default implementation in the ServiceDiscovery interface. In this default implementation, the default implementation of EventDispatcher is obtained through Dubbo SPI, and the ServiceInstancesChangedEvent event is dispatched. The implementation is as follows:
default void dispatchServiceInstancesChangedEvent(ServiceInstancesChangedEvent event) {
EventDispatcher.getDefaultExtension().dispatch(event);
}
Now let’s take a look at the specific definition of the EventDispatcher interface:
@SPI("direct")
public interface EventDispatcher extends Listenable<EventListener<?>> {
// This thread pool is used for invoking the triggered EventListeners in a serial manner, also known as the direct mode
Executor DIRECT_EXECUTOR = Runnable::run;
// Dispatch the triggered event to the corresponding EventListener objects
void dispatch(Event event);
// Get the thread pool used in the direct mode
default Executor getExecutor() {
return DIRECT_EXECUTOR;
}
// Utility method for getting the default implementation of the EventDispatcher interface
static EventDispatcher getDefaultExtension() {
return ExtensionLoader.getExtensionLoader(EventDispatcher.class).getDefaultExtension();
}
}
The EventDispatcher interface is annotated with @SPI and is an extension point. Dubbo provides two concrete implementations, ParallelEventDispatcher and DirectEventDispatcher, as shown in the following diagram:
In the AbstractEventDispatcher, two core fields are maintained.
- listenersCache (ConcurrentMap, List>) is used to record the EventListener collections that listen to events of various types. When AbstractEventDispatcher is initialized, all implementations of EventListener are loaded and added to the listenersCache collection by calling the addEventListener() method.
- executor (Executor type) is initialized in the constructor of the AbstractEventDispatcher. When AbstractEventDispatcher receives the corresponding event, this thread pool is used to trigger the corresponding EventListener collection.
The addEventListener(), removeEventListener(), and getAllEventListeners() methods in AbstractEventDispatcher are all implemented by manipulating the listenersCache collection. The specific implementation is relatively simple, and I will not show it here. If you are interested, you can refer to the source code for learning.
Another method to pay attention to in AbstractEventDispatcher is the dispatch() method. This method filters out the EventListener objects that meet the conditions from the listenersCache collection and notifies them in serial or parallel mode. The specific implementation is as follows:
public void dispatch(Event event) {
// Get the thread pool for notifying EventListeners, which is serial mode by default, also known as the direct implementation
Executor executor = getExecutor();
executor.execute(() -> {
sortedListeners(entry -> entry.getKey().isAssignableFrom(event.getClass()))
.forEach(listener -> {
if (listener instanceof ConditionalEventListener) { // Special handling for ConditionalEventListener
ConditionalEventListener predicateEventListener = (ConditionalEventListener) listener;
if (!predicateEventListener.accept(event)) {
return;
}
}
// Notify the EventListener
listener.onEvent(event);
});
});
}
// The sortedListeners method here filters and sorts the listenerCache
protected Stream<EventListener> sortedListeners(Predicate<Map.Entry<Class<? extends Event>, List<EventListener>>> predicate) {
return listenersCache
.entrySet()
.stream()
.filter(predicate)
.map(Map.Entry::getValue)
.flatMap(Collection::stream)
.sorted();
}
AbstractEventDispatcher has already implemented the core logic of EventDispatcher, which is to dispatch Event events and notify EventListeners. Then, it only needs to be determined whether it is parallel notification mode or serial notification mode in ParallelEventDispatcher and DirectEventDispatcher.
In ParallelEventDispatcher, the thread pool for notifying EventListeners is ForkJoinPool, which means it is in parallel mode. In DirectEventDispatcher, EventDispatcher.DIRECT_EXECUTOR thread pool is used, which means it is in serial mode. The implementation of these two EventDispatchers is relatively simple, and I will not show it here.
Let’s go back to ZookeeperServiceDiscovery. In its constructor, it obtains the default implementation of EventDispatcher and calls the addEventListener() method to add the ZookeeperServiceDiscovery object to the listenersCache collection to listen for the ServiceInstancesChangedEvent event. ZookeeperServiceDiscovery directly inherits the default implementation of the dispatchServiceInstancesChangedEvent() method in the ServiceDiscovery interface and does not override it. In this method, it obtains the default implementation of EventDispatcher and calls the dispatch() method to dispatch the ServiceInstancesChangedEvent event.
Summary #
In this lesson, we focused on the basics of service instance publishing and subscription in the Dubbo service discovery solution.
First, we explained the core definition of the ServiceDiscovery interface, which defines the core methods for service instance publishing and subscription. Then, we analyzed the ZookeeperServiceDiscovery implementation of using ZooKeeper as the registry center. We also explained the related implementation of adding listeners to ZookeeperServiceDiscovery and the mechanism of ZookeeperServiceDiscovery handling the ServiceInstancesChangedEvent event.
In the next lesson, we will continue to introduce the implementation of service instance publishing and subscription in the Dubbo service discovery solution. Make sure to tune in on time.