37 Cluster Fault Tolerance One Good Man Three Groups Above

37 Cluster Fault Tolerance One Good Man Three Groups Above #

Hello, I’m Yang Sizheng. Today, I will share with you the topic of cluster fault tolerance: one hero, three assistants (Part 1).

In the previous lessons, we have analyzed the concepts of Directory, Router, LoadBalance, and other related components in depth. This lesson will focus on the analysis of the Cluster interface.

The Cluster interface provides cluster fault tolerance functionality.

In a cluster, there is a certain probability that individual nodes may encounter issues, such as disk damage, system crashes, etc., resulting in the inability of the node to provide external services. Therefore, in a distributed RPC framework, it is necessary to pay attention to this situation. In order to avoid single point of failure, our Provider is usually deployed on at least two servers to provide services in a clustered manner. For services with higher loads, more Providers need to be deployed to handle the traffic.

In Dubbo, the Cluster interface combines a group of available Provider information into a unified Invoker for the caller to make remote invocations. After the filtering by the Router and the selection by the LoadBalance, a specific Provider is selected for invocation. If the invocation fails, fault tolerance processing will be performed according to the cluster’s fault tolerance strategy.

Dubbo has built-in several fault tolerance strategies by default, and each strategy has its own unique application scenarios. We can select different fault tolerance strategies through configuration. If these built-in fault tolerance strategies cannot meet our requirements, we can also configure custom fault tolerance strategies.

After understanding the above background knowledge, let’s formally introduce the Cluster interface.

Cluster Interface and Fault Tolerance Mechanism #

The workflow of the Cluster interface can be roughly divided into two steps (as shown in the following diagram): ① Creating a Cluster Invoker instance (during Consumer initialization, the Cluster implementation class creates a Cluster Invoker instance, as shown in the diagram); ② Using the Cluster Invoker instance (when the Consumer initiates a remote invocation request, the Cluster Invoker relies on the Directory, Router, LoadBalance, and other components introduced in the previous lessons to obtain the final Invoker object to be invoked).

Lark20201201-164714.png

Diagram of the Cluster core process

The process of Cluster Invoker obtaining Invoker can be roughly described as follows:

  1. Obtain the Invoker list through the Directory. Taking the RegistryDirectory as an example, it can perceive the dynamic changes in the registry and obtain the Invoker collection corresponding to the current Provider in real-time.
  2. Call the route() method of the Router to filter out Invoker objects that do not meet the routing rules.
  3. Select an Invoker from the Invoker list through the LoadBalance.
  4. The ClusterInvoker will pass the parameters to the invoke method of the selected Invoker instance in LoadBalance to perform the actual remote invocation.

This process is a normal process without fault tolerance handling. Dubbo’s commonly used fault tolerance methods include the following:

  • Failover Cluster: Automatic failover. It is the default fault tolerance mechanism of Dubbo. When a request to a Provider node fails, it automatically switches to other Provider nodes. By default, it retries 3 times, which is suitable for idempotent operations. Of course, the more the number of retries, the greater the pressure on the Provider during fault tolerance, and in extreme cases, it may even cause cascading problems.
  • Failback Cluster: Automatic recovery after failure. After a failure, it is recorded in a queue and retried through a timer.
  • Failfast Cluster: Quick failure. After a request fails, an exception is immediately returned without any retries.
  • Failsafe Cluster: Failure safety. After a request fails, the exception is ignored and no retries are performed.
  • Forking Cluster: Parallel invocation of multiple Provider nodes, as long as one succeeds, it returns.
  • Broadcast Cluster: Broadcasting to multiple Provider nodes, if any node fails, it fails.
  • Available Cluster: Traverse all Provider nodes, find each available node, and directly invoke it. If no available Provider nodes are found, an exception is thrown directly.
  • Mergeable Cluster: Request multiple Provider nodes and merge the results obtained.

Now let’s take a look at the Cluster interface. The Cluster interface is an extension interface, and we can see from the @SPI annotation parameter that its default implementation is FailoverCluster. It only defines a join() method and adds the @Adaptive annotation on it, which will dynamically generate an adapter class. It will first choose an extension implementation based on the cluster parameter value returned by the Directory.getUrl() method, or use the default FailoverCluster implementation if there is no cluster parameter. The specific definition of the Cluster interface is as follows:

@SPI(FailoverCluster.NAME)

public interface Cluster {

    @Adaptive

    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

The implementation classes of the Cluster interface are shown in the following diagram, corresponding to the various fault tolerance strategies mentioned earlier:

Lark20201201-164718.png

Inheritance relationship of the Cluster interface

In each implementation of the Cluster interface, an Invoker object will be created, all of which inherit from the AbstractClusterInvoker abstract class, as shown in the following diagram:

Lark20201201-164728.png

Inheritance relationship diagram of AbstractClusterInvoker

From the above two inheritance diagrams, we can see that both the Cluster interface and the Invoker interface have corresponding abstract implementation classes, which implement some common capabilities. Now let’s delve into these two abstract classes: AbstractClusterInvoker and AbstractCluster.

AbstractClusterInvoker #

After understanding the inheritance relationship of Cluster Invoker, let’s first look at AbstractClusterInvoker, which has two core functions: one is implementing the Invoker interface and providing a generic abstraction of the Invoker.invoke() method; the other is implementing a generic load balancing algorithm.

In the AbstractClusterInvoker.invoke() method, it retrieves a list of Invokers through the Directory, initializes the LoadBalance through SPI, and finally calls the doInvoke() method to execute the logic of its subclass. Before the Directory.list() method returns the Invoker collection, it has already filtered the Invokers once using a Router. You can review the analysis of RegistryDirectory in Lesson 31.

public Result invoke(final Invocation invocation) throws RpcException {

    // Check if the current Invoker has been destroyed

    checkWhetherDestroyed();

    // Add the attachments from RpcContext to the Invocation

    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();

    if (contextAttachments != null && contextAttachments.size() != 0) {

        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);

    }

    // Retrieve a list of Invoker objects through the Directory.
    // As we know from the analysis of RegistryDirectory,
    // Router has been applied during this process.

    List<Invoker<T>> invokers = list(invocation);

    // Load the LoadBalance through SPI

    LoadBalance loadbalance = initLoadBalance(invokers, invocation);

    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

    // Call the doInvoke() method, which is an abstract method

    return doInvoke(invocation, invokers, loadbalance);

}

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {

    return directory.list(invocation); // Call the Directory.list() method

}

Now let’s take a look at how AbstractClusterInvoker selects the final Invoker object from the Invoker collection based on different LoadBalance algorithms.

AbstractClusterInvoker does not simply use the LoadBalance.select() method to achieve load balancing. Instead, it further encapsulates the process in the select() method. In the select() method, it determines whether to enable sticky connection based on the configuration. If sticky connection is enabled, it caches the previously used Invoker and directly calls it as long as the Provider node is available, without further load balancing. It only performs load balancing if the call fails, and excludes the providers that have been retried.

// The first parameter is the LoadBalance implementation used in this call,
// the second parameter Invocation is the context information of this service call,
// the third parameter is the list of Invokers to choose from,
// the fourth parameter is used to record the selected Invokers and the ones that have been tried for load balancing

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {

        return null;

    }

    // Get the method name

    String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();

    // Get the sticky configuration. Sticky means that the Consumer will try to
    // call the same Provider node as much as possible, unless the Provider is unavailable

    boolean sticky = invokers.get(0).getUrl()... (The remaining part of the code is truncated)```
        if (selected == null || !selected.contains(invoker)) {

            reselectInvokers.add(invoker);

        }

    }

    if (!reselectInvokers.isEmpty()) {

        // 如果有可重新负载均衡的Invoker,则调用LoadBalance的select方法重新负载均衡

        return loadbalance.select(reselectInvokers, getUrl(), invocation);

    }

    // 重新选择的Invoker也为空,则说明全都重选过了,此时调用selected集合进行负载均衡

    if (selected != null && !selected.isEmpty()) {

        return loadbalance.select(selected, getUrl(), invocation);

    }

    return null;

}

if (availablecheck && !invoker.isAvailable()) {

continue;

}

if (selected == null || !selected.contains(invoker)) {

reselectInvokers.add(invoker);

}

}

// Only when reselectInvokers is not empty, selection is required through the load balancing component

if (!reselectInvokers.isEmpty()) {

return loadbalance.select(reselectInvokers, getUrl(), invocation);

}

// Can only perform load balancing on available invokers in the selected collection

if (selected != null) {

for (Invoker invoker : selected) {

    if ((invoker.isAvailable()) // available first

            && !reselectInvokers.contains(invoker)) {

        reselectInvokers.add(invoker);

    }

}

}

if (!reselectInvokers.isEmpty()) {

return loadbalance.select(reselectInvokers, getUrl(), invocation);

}

return null;

}

AbstractCluster #

Commonly used ClusterInvoker implementations inherit from the AbstractClusterInvoker class, and corresponding Cluster implementations inherit from the AbstractCluster abstract class. The core logic of the AbstractCluster abstract class is to wrap a ClusterInterceptor layer outside the ClusterInvoker to achieve a similar aspect effect.

Here is the definition of the ClusterInterceptor interface:

@SPI

public interface ClusterInterceptor {

    // Pre-interception method

    void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

    // Post-interception method

    void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

    // Call the invoke() method of ClusterInvoker to complete the request

    default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {

        return clusterInvoker.invoke(invocation);

    }

    // This Listener is used to listen for normal results of the request and exceptions

    interface Listener {

        void onMessage(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

        void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

    }

}

In the join() method of the AbstractCluster abstract class, it will first call the doJoin() method to obtain the Invoker object to be called. doJoin() is an abstract method and is implemented by the AbstractCluster subclass based on specific strategies. Afterwards, the AbstractCluster.join() method will call the buildClusterInterceptors() method to load the ClusterInterceptor extension implementation and wrap the Invoker object. The specific implementation is as follows:

private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {

    AbstractClusterInvoker<T> last = clusterInvoker;

    // Load the ClusterInterceptor extension implementation through the SPI method

    List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);

    if (!interceptors.isEmpty()) {

        for (int i = interceptors.size() - 1; i >= 0; i--) {

            // Connect the InterceptorInvokerNode at the beginning and end to form a call chain

            final ClusterInterceptor interceptor = interceptors.get(i);

            final AbstractClusterInvoker<T> next = last;

            last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);

        }

    }

    return last;

}

@Override

public Invoker join(Directory directory) throws RpcException {

// The extension name is determined by the reference.interceptor parameter

return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));

}

** The InterceptorInvokerNode encapsulates the underlying AbstractClusterInvoker object and the associated ClusterInterceptor object, and maintains a next reference pointing to the next InterceptorInvokerNode object **.

In the InterceptorInvokerNode.invoke() method, it will first call the pre-logic of the ClusterInterceptor, then execute the intercept() method to call the invoke() method of AbstractClusterInvoker to complete the remote call, and finally execute the post-logic of the ClusterInterceptor. The specific implementation is as follows:

public Result invoke(Invocation invocation) throws RpcException {

Result asyncResult;

try {

    interceptor.before(next, invocation); // Pre-logic
// Execute the invoke() method to complete the remote call
asyncResult = interceptor.intercept(next, invocation);

} catch (Exception e) {

  if (interceptor instanceof ClusterInterceptor.Listener) {

    // When an exception occurs, the onError() method of the listener is triggered

    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;

    listener.onError(e, clusterInvoker, invocation);

  }

  throw e;

} finally {

  // Execute post-processing logic

  interceptor.after(next, invocation);

}

return asyncResult.whenCompleteWithContext((r, t) -> {

  if (interceptor instanceof ClusterInterceptor.Listener) {

    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;

    if (t == null) {

      // When returning normally, the onMessage() method is called to trigger the listener

      listener.onMessage(r, clusterInvoker, invocation);

    } else {

      listener.onError(t, clusterInvoker, invocation);

    }

  }

});

}

Dubbo provides two implementations of ClusterInterceptor, which are ConsumerContextClusterInterceptor and ZoneAwareClusterInterceptor, as shown in the following figure:

![Lark20201201-164721.png](../images/CgqCHl_GA2GAYY4rAAGXJIXwK1k980.png)

ClusterInterceptor inheritance diagram

In the before() method of ConsumerContextClusterInterceptor, the current consumer address and the invoker of this call are set in RpcContext, and the server context bound to the current thread is deleted. In the after() method, the local RpcContext information is deleted. The implementation of ConsumerContextClusterInterceptor is as follows:

```java
public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {

  // Get the RpcContext bound to the current thread
  RpcContext context = RpcContext.getContext();

  // Set invoker, consumer address, and other information
  context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);

  if (invocation instanceof RpcInvocation) {
    ((RpcInvocation) invocation).setInvoker(invoker);
  }

  RpcContext.removeServerContext();

}

public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {

  RpcContext.removeContext(true); // Delete local RpcContext information

}

ConsumerContextClusterInterceptor also inherits the ClusterInterceptor.Listener interface, and in its onMessage() method, it retrieves the attachments from the response and sets them to RpcContext.SERVER_LOCAL. The implementation is as follows:

public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {

  // Retrieve attachments from AppResponse and set them in SERVER_LOCAL in RpcContext
  RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());

}

After introducing ConsumerContextClusterInterceptor, let’s take a look at ZoneAwareClusterInterceptor.

In the before() method of ZoneAwareClusterInterceptor, it retrieves the multi-registry related parameters from RpcContext and sets them in Invocation (mainly the registry_zone parameter and the registry_zone_force parameter). The meanings of these two parameters will be detailed later when analyzing ZoneAwareClusterInvoker. The implementation of ZoneAwareClusterInterceptor is as follows:

public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {

  RpcContext rpcContext = RpcContext.getContext();

  // Retrieve registry_zone parameter and registry_zone_force parameter from RpcContext
  String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
  String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);

  // Check if the user has provided an implementation of ZoneDetector interface extension
  ExtensionLoader<ZoneDetector> loader = ExtensionLoader.getExtensionLoader(ZoneDetector.class);

  if (StringUtils.isEmpty(zone) && loader.hasExtension("default")) {
    ZoneDetector detector = loader.getExtension("default");
    zone = detector.getZoneOfCurrentRequest(invocation);
    force = detector.isZoneForcingEnabled(invocation, zone);
  }

  // Set the registry_zone parameter and registry_zone_force parameter in Invocation
  if (StringUtils.isNotEmpty(zone)) {
    invocation.setAttachment(REGISTRY_ZONE, zone);
  }

  if (StringUtils.isNotEmpty(force)) {
    invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
  }

}

Note that ZoneAwareClusterInterceptor does not implement the ClusterInterceptor.Listener interface, which means it does not provide response listener functionality.

Summary #

In this lesson, we mainly introduced the fault tolerance mechanisms in the Dubbo Cluster layer. First, we understood the role of the cluster fault tolerance mechanism. Then, we introduced the definition of the Cluster interface and the core functions of its implementations. Next, we explained the implementation of AbstractClusterInvoker, which implements a generic load balancing algorithm. Finally, we analyzed the AbstractCluster abstract implementation class and the ClusterInterceptor interface involved in it.