38 Cluster Fault Tolerance One Good Man Three Groups Below

38 Cluster Fault Tolerance One Good Man Three Groups Below #

Hello, I am Yang Sizheng. Today, I will continue to share the topic of cluster fault tolerance: one hero with three helpers.

In the previous lesson, we introduced the basic knowledge of cluster fault tolerance mechanism in the Dubbo Cluster layer, and explained the definition of the Cluster interface and the core functions of its implementation classes. At the same time, we also analyzed the core implementation of the AbstractClusterInvoker abstract class and the AbstractCluster abstract implementation class.

In this lesson, we will introduce all the implementation classes of the Cluster interface, as well as the related Cluster Invoker implementation classes.

FailoverClusterInvoker #

From the introduction of the Cluster interface, we know that the default extension implementation of Cluster is FailoverCluster, and its doJoin() method will create a FailoverClusterInvoker object and return it. The specific implementation is as follows:

public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {

    return new FailoverClusterInvoker<>(directory);

}

FailoverClusterInvoker automatically switches Invokers for retries when a call fails. Let’s take a look at the core implementation of FailoverClusterInvoker:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    List<Invoker<T>> copyInvokers = invokers;

    // Check if the copyInvokers collection is empty, an exception will be thrown if it is

    checkInvokers(copyInvokers, invocation);

    String methodName = RpcUtils.getMethodName(invocation);

    // Retry times for the parameters, default is 2, total 3 executions

    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;

    if (len <= 0) {

        len = 1;

    }

    RpcException le = null;

    // Record the Invoker objects that have been called

    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());

    Set<String> providers = new HashSet<String>(len);

    for (int i = 0; i < len; i++) {

        // Check whether the invokers passed in for the first time have been checked, and for the second time, it is a retry and needs to get the latest service list again

        if (i > 0) {

            checkWhetherDestroyed();

            // This will call the Directory.list() method again to get the Invoker list

            copyInvokers = list(invocation);

            // Check if the copyInvokers collection is empty, an exception will be thrown if it is

            checkInvokers(copyInvokers, invocation);

        }

        // Use LoadBalance to select the Invoker object, the invoked collection passed in here is the selected collection mentioned in the previous AbstractClusterInvoker.select() method

        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);

        // Record the Invoker object to be attempted this time, it will be filtered for the next retry

        invoked.add(invoker);

        RpcContext.getContext().setInvokers((List) invoked);

        try {

            // Call the invoke() method of the target Invoker object to complete the remote call

            Result result = invoker.invoke(invocation);

            // After trying, it finally succeeds, here it will print a warning log and print out the address of the tried Provider

            if (le != null && logger.isWarnEnabled()) {

                logger.warn("...");

            }

            return result;

        } catch (RpcException e) {

            if (e.isBiz()) { // Biz exception.

                throw e;

            }

            le = e;

        } catch (Throwable e) { // Throw an exception, indicating that the attempt failed and it will be retried

            le = new RpcException(e.getMessage(), e);

        } finally {

            // Record the address of the tried Provider, it will be printed in the warning log above

            providers.add(invoker.getUrl().getAddress());

        }

    }

    // After reaching the maximum retry times, an exception will be thrown, which will carry the method name of the call, the addresses of the tried Provider nodes (providers collection), the total number of all Providers (copyInvokers collection), and the Directory information

    throw new RpcException(le.getCode(), "...");

}

FailbackClusterInvoker #

FailbackCluster is another extension implementation of the Cluster interface, with the extension name “failback”. The Invoker object created in the doJoin() method is of type FailbackClusterInvoker. The specific implementation is as follows:

public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {

    return new FailbackClusterInvoker<>(directory);

}

FailbackClusterInvoker returns an empty result to the Consumer after a request fails, and also adds a scheduled task to retry the failed request. Let’s take a look at the specific implementation of FailbackClusterInvoker:

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    Invoker<T> invoker = null;

    try {

        // Check if the Invoker collection is empty

        checkInvokers(invokers, invocation);

        // Call the select() method to get the Invoker object to try this time

        invoker = select(loadbalance, invocation, invokers, null);

        // Call the invoke() method to complete the remote call

        return invoker.invoke(invocation);

    } catch (Throwable e) {

        // After the request fails, add a scheduled task to retry

        addFailed(loadbalance, invocation, invokers, invoker);

        return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // Return an empty result when the request fails

    }

}

In the doInvoke() method, when the request fails, the addFailed() method is called to add a scheduled task for retry. By default, it will be executed every 5 seconds for a total of 3 retries. The specific implementation is as follows:

private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {

    if (failTimer == null) {

        synchronized (this) {

            if (failTimer == null) { // Double Check to prevent concurrency issues

                // Initialize the time wheel, this time wheel has 32 slots, each slot represents 1 second

                failTimer = new HashedWheelTimer(

                        new NamedThreadFactory("failback-cluster-timer", true),

                        1,

                        TimeUnit.SECONDS, 32, failbackTasks);

            }

        }

    }

    // Create a scheduled task

    RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);

    try {

        // Add the scheduled task to the time wheel

        failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);

    } catch (Throwable e) {

        logger.error("...");

    }

}

In the RetryTimerTask scheduled task, the select() method will be called again to select the appropriate Invoker object and attempt the request. If the request fails again and the number of retries has not reached the maximum, the rePut() method will be called again to add a scheduled task for retry and wait. If the request succeeds, no result will be returned. The core implementation of RetryTimerTask is as follows:

public void run(Timeout timeout) {

    try {

        // Select the Invoker object again, note that the failed Invoker will be passed in as the selected collection here

        Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));

        lastInvoker = retryInvoker;

        retryInvoker.invoke(invocation); // Request the corresponding Provider node

    } catch (Throwable e) {

        if ((++retryTimes) >= retries) { // When the number of retries reaches the limit, output a warning log

            logger.error("...");

} else {

    rePut(timeout); // Retry count did not reach the upper limit, re-add the scheduled task and wait for retry

}

}

private void rePut(Timeout timeout) {

if (timeout == null) { // Boundary check

    return;

}

Timer timer = timeout.timer();

if (timer.isStop() || timeout.isCancelled()) { // Check the state of the timer wheel and check the status of the scheduled task

    return;

}

// Re-add the scheduled task

timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);

}

FailfastClusterInvoker #

The extension name for FailfastClusterInvoker is failfast. In its doJoin() method, a FailfastClusterInvoker object is created. The implementation is as follows:

public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {

    return new FailfastClusterInvoker<>(directory);

}

FailfastClusterInvoker will only make one request, and after the request fails, an exception will be thrown immediately. This strategy is suitable for non-idempotent operations. The implementation is as follows:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    checkInvokers(invokers, invocation);

    // Call select() to get the Invoker object to be called in this request

    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

    try {

        return invoker.invoke(invocation); // Initiate the request

    } catch (Throwable e) { 

        // If the request fails, throw the exception directly

        if (e instanceof RpcException && ((RpcException) e).isBiz()) {

            throw (RpcException) e;

        }

        throw new RpcException("...");

    }

}

FailsafeClusterInvoker #

The extension name for FailsafeClusterInvoker is failsafe. In its doJoin() method, a FailsafeClusterInvoker object is created. The implementation is as follows:

public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {

    return new FailsafeClusterInvoker<>(directory);

}

FailsafeClusterInvoker will only make one request, and after the request fails, an empty result will be returned. The implementation is as follows:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    try {

        // Check if the Invoker collection is empty

        checkInvokers(invokers, invocation);

        // Call select() to get the Invoker object to be called in this request

        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);

        // Initiate the request

        return invoker.invoke(invocation);

    } catch (Throwable e) {

        // After the request fails, print a line of logs and return an empty result

        logger.error("...");

        return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); 

    }

}

ForkingClusterInvoker #

The extension name for ForkingClusterInvoker is forking. In its doJoin() method, a ForkingClusterInvoker object is created. The implementation is as follows:

public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {

    return new ForkingClusterInvoker<>(directory);

}

ForkingClusterInvoker maintains a thread pool (executor field created by Executors.newCachedThreadPool()) and concurrently calls multiple Provider nodes. As long as one Provider node returns a result successfully, the doInvoke() method of ForkingClusterInvoker will exit immediately.

ForkingClusterInvoker is mainly used to handle read operations with high real-time requirements, because multi-threaded writes without concurrency control may cause data inconsistency.

The doInvoke() method of ForkingClusterInvoker first selects a specified number (determined by the forks parameter) of Invoker objects from the Invoker collection, and then concurrently calls these Invokers through the executor thread pool. The request results are stored in the ref blocking queue, so the current thread will be blocked on the ref queue, waiting for the first request result to return. Below is the specific implementation of ForkingClusterInvoker:

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    try {

        // Check if the Invoker collection is empty

        checkInvokers(invokers, invocation);

        final List<Invoker<T>> selected;

        // Get the forks parameter from the URL as the upper limit of concurrent requests, with a default value of 2

        final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);

        final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);

        if (forks <= 0 || forks >= invokers.size()) {

            // If forks is negative or greater than the length of the Invoker collection, all Invokers will be called concurrently

            selected = invokers;

        } else {

            // Select the Invoker objects to be concurrently called in this request based on the concurrency degree specified by forks

            selected = new ArrayList<>(forks);

            while (selected.size() < forks) {

                Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);

                if (!selected.contains(invoker)) {

                    selected.add(invoker); // Avoid duplicate selection

                }

            }

        }

        RpcContext.getContext().setInvokers((List) selected);

        // Record the number of failed requests

        final AtomicInteger count = new AtomicInteger();

        // Used to record the request results

        final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();

        for (final Invoker<T> invoker : selected) {  // Traverse the selected list

            executor.execute(() -> { // Create a task for each Invoker and submit it to the thread pool

                try {

                    // Initiate the request

                    Result result = invoker.invoke(invocation);

                    // Write the request result to the ref queue

                    ref.offer(result);

                } catch (Throwable e) {

                    int value = count.incrementAndGet();

                    if (value >= selected.size()) {

                        // If the number of failed requests exceeds the number of concurrent requests, write an exception to the ref queue

                        ref.offer(e); 

                    }

                }

            });

        }

        try {

            // The current thread will block and wait for the occurrence of any request result

            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);

            if (ret instanceof Throwable) { // If the result type is Throwable, throw an exception

                Throwable e = (Throwable) ret;

                throw new RpcException("...");

            }

            return (Result) ret; // Return the result

        } catch (InterruptedException e) {
throw new RpcException("...");
}
} finally { 

// Clear context information

RpcContext.getContext().clearAttachments();
}

}

### BroadcastClusterInvoker

The extension name for this implementation class of Cluster named BroadcastCluster is `broadcast`. In its `doJoin()` method, it creates an instance of `BroadcastClusterInvoker`. Here is the specific implementation:

```java
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {

return new BroadcastClusterInvoker<>(directory);

}

In BroadcastClusterInvoker, each Provider node will be called one by one, and if any Provider node throws an error, an exception will be thrown after all invocations are completed. BroadcastClusterInvoker is usually used for notification operations, such as notifying all Provider nodes to update the local cache.

Now let’s look at the specific implementation of BroadcastClusterInvoker:

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

// Check whether the Invokers collection is empty

checkInvokers(invokers, invocation);

RpcContext.getContext().setInvokers((List) invokers);

RpcException exception = null; // Used to record exceptions related to failed requests

Result result = null;

// Traverse all Invoker objects

for (Invoker<T> invoker : invokers) {

try {

// Initiate the request

result = invoker.invoke(invocation);

} catch (RpcException e) {

exception = e;

logger.warn(e.getMessage(), e);

} catch (Throwable e) {

exception = new RpcException(e.getMessage(), e);

logger.warn(e.getMessage(), e);

}

}

if (exception != null) { // Any exception will be thrown here

throw exception;

}

return result;

}

AvailableClusterInvoker #

The extension name for this implementation class of Cluster named AvailableCluster is available. In its join() method, it creates an instance of AvailableClusterInvoker. Here is the specific implementation:

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {

return new AvailableClusterInvoker<>(directory);

}

In the doInvoke() method of AvailableClusterInvoker, it traverses the entire Invoker collection and calls each corresponding Provider node. When encountering the first available Provider node, it attempts to access that Provider node. If the access fails, an exception is thrown to terminate the loop.

Here is the specific implementation of AvailableClusterInvoker:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

for (Invoker<T> invoker : invokers) { // Traverse the entire Invokers collection

if (invoker.isAvailable()) { // Check if the Invoker is available

// Initiate the request, and the exception will be thrown directly when the call fails

return invoker.invoke(invocation);

}

}

// If no available Invoker is found, an exception will be thrown

throw new RpcException("No provider available in " + invokers);

}

MergeableClusterInvoker #

The extension name for this implementation class of Cluster named MergeableCluster is mergeable. In its doJoin() method, it creates an instance of MergeableClusterInvoker. Here is the specific implementation:

public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {

return new MergeableClusterInvoker<T>(directory);

}

MergeableClusterInvoker merges the results returned by multiple Provider nodes. If the requested method does not have a configured Merger (i.e., no merger parameter is specified), the results will not be merged, and the result of the first available Invoker will be returned directly. Now let’s look at the specific implementation of MergeableClusterInvoker:

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

checkInvokers(invokers, invocation);

String merger = getUrl().getMethodParameter(invocation.getMethodName(), MERGER_KEY);

// Check if the target method has a merger, if not, no merging will be performed,

// and the first available Invoker will be called and the result will be returned

if (ConfigUtils.isEmpty(merger)) {

for (final Invoker<T> invoker : invokers) {

if (invoker.isAvailable()) {

try {

return invoker.invoke(invocation);

} catch (RpcException e) {

if (e.isNoInvokerAvailableAfterFilter()) {

log.debug("No available provider for service" + getUrl().getServiceKey() + " on group " + invoker.getUrl().getParameter(GROUP_KEY) + ", will continue to try another group.");

} else {

throw e;

}

}

}

}

return invokers.iterator().next().invoke(invocation);

}

// Determine the return type of the target method

Class<?> returnType;

try {

returnType = getInterface().getMethod(

invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();

} catch (NoSuchMethodException e) {

returnType = null;

}

// Call each Invoker object (asynchronously), and record the request results in the results collection

Map<String, Result> results = new HashMap<>();

for (final Invoker<T> invoker : invokers) {

RpcInvocation subInvocation = new RpcInvocation(invocation, invoker);

subInvocation.setAttachment(ASYNC_KEY, "true");

results.put(invoker.getUrl().getServiceKey(), invoker.invoke(subInvocation));

}

Object result = null;

List<Result> resultList = new ArrayList<Result>(results.size());

// Wait for the results to return

for (Map.Entry<String, Result> entry : results.entrySet()) {

Result asyncResult = entry.getValue();

try {

Result r = asyncResult.get();

if (r.hasException()) {

log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +

" failed: " + r.getException().getMessage(),

r.getException());

} else {

resultList.add(r);

}

} catch (Exception e) {

throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);

}

}

}

if (resultList.isEmpty()) {
    return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (resultList.size() == 1) {
    return resultList.iterator().next();
}

if (returnType == void.class) {
    return AsyncRpcResult.newDefaultAsyncResult(invocation);
}

// If merger starts with ".", the following string is the name of the method,
// which is a method in the return type of the remote target method.
// After obtaining the result objects returned by each provider node,
// traverse each return object and call the method specified by merger parameter.
if (merger.startsWith(".")) {
    merger = merger.substring(1);
    Method method;
    
    try {
        method = returnType.getMethod(merger, returnType);
    } catch (NoSuchMethodException e) {
        throw new RpcException("Can not merge result because missing method [" + merger + "] in class [" + returnType.getName() + "]");
    }
    
    if (!Modifier.isPublic(method.getModifiers())) {
        method.setAccessible(true);
    }
    
    // The resultList collection stores all the return objects,
    // and the method is the Method object specified by merger.
    // The result is the final result returned to the caller.
    result = resultList.remove(0).getValue();
    
    try {
        if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {
            for (Result r : resultList) {
                // Invoke through reflection
                result = method.invoke(result, r.getValue());
            }
        } else {
            for (Result r : resultList) {
                // Invoke through reflection
                method.invoke(result, r.getValue());
            }
        }
    } catch (Exception e) {
        throw new RpcException("Can not merge result: " + e.getMessage(), e);
    }
} else {
    Merger resultMerger;
    
    if (ConfigUtils.isDefault(merger)) {
        // If merger parameter is true or default, use the default Merger extension to complete the merge.
        // The Merger interface will be introduced in later lessons.
        resultMerger = MergerFactory.getMerger(returnType);
    } else {
        // If merger parameter specifies the extension name of Merger,
        // use SPI to find the corresponding implementation object of Merger extension.
        resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
    }
    
    if (resultMerger != null) {
        List<Object> rets = new ArrayList<Object>(resultList.size());
        
        for (Result r : resultList) {
            rets.add(r.getValue());
        }
        
        // Perform merge operation
        result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
    } else {
        throw new RpcException("There is no merger to merge result.");
    }
}

return AsyncRpcResult.newDefaultAsyncResult(result, invocation);

}


### ZoneAwareClusterInvoker

The extension name of the ZoneAwareCluster Cluster implementation class is "zone-aware". In its `doJoin()` method, it creates an Invoker object of type ZoneAwareClusterInvoker. The specific implementation is as follows:

```java
protected <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
    return new ZoneAwareClusterInvoker<T>(directory);
}

In Dubbo, the architecture for using multiple registries is as shown in the following diagram:

Lark20201203-183149.png

Dual Registry Architecture Diagram

The Consumer can use the ZoneAwareClusterInvoker to initially select from multiple registries and then select a Provider node. The process is shown in the following diagram:

Lark20201203-183145.png

ZoneAwareClusterInvoker selects the strategy for switching between multiple registries in the following four ways.

  1. Find the registry with the preferred attribute set to true, which has the highest priority. Only when this registry has no available Provider nodes will it fallback to other registries.
  2. Match based on the zone key in the request, and prioritize dispatching to the registry center in the same zone.
  3. Round-robin based on the weight (the “weight” attribute configured in the registry center).
  4. If none of the above strategies are matched, select the first available Provider node.

Now let’s take a look at the implementation of ZoneAwareClusterInvoker:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

    // First, find the registry with the preferred attribute set to true, 
    // which has the highest priority. Only when this registry has no available Provider nodes will it fallback to other registries.
    for (Invoker<T> invoker : invokers) {
        MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker<T>) invoker;
        if (mockClusterInvoker.isAvailable() && mockClusterInvoker.getRegistryUrl().getParameter(REGISTRY_KEY + "." + PREFERRED_KEY, false)) {
            return mockClusterInvoker.invoke(invocation);
        }
    }

    // Match based on the "registry_zone" in the request, 
    // and prioritize dispatching to the registry center in the same zone.
    String zone = (String) invocation.getAttachment(REGISTRY_ZONE);
    if (StringUtils.isNotEmpty(zone)) {
        for (Invoker<T> invoker : invokers) {
            MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker<T>) invoker;
            if (mockClusterInvoker.isAvailable() && zone.equals(mockClusterInvoker.getRegistryUrl().getParameter(REGISTRY_KEY + "." + ZONE_KEY))) {
                return mockClusterInvoker.invoke(invocation);
            }
        }
        String force = (String) invocation.getAttachment(REGISTRY_ZONE_FORCE);
        if (StringUtils.isNotEmpty(force) && "true".equalsIgnoreCase(force)) {
            throw new IllegalStateException("...");
        }
    }

    // Round-robin based on the weight (the "weight" attribute configured in the registry center).
    Invoker<T> balancedInvoker = select(loadbalance, invocation, invokers, null);
    if (balancedInvoker.isAvailable()) {
        return balancedInvoker.invoke(invocation);
    }

    // Select the first available Provider node.
    for (Invoker<T> invoker : invokers) {
        MockClusterInvoker<T> mockClusterInvoker = (MockClusterInvoker<T>) invoker;
        if (mockClusterInvoker.isAvailable()) {
            return mockClusterInvoker.invoke(invocation);
        }
    }

    throw new RpcException("No provider available in " + invokers);
}

Summary #

In this lesson, we have mainly introduced the principles of Dubbo’s various implementation classes of the Cluster interface and the implementation principles of the relevant Invoker. The Cluster implementations analyzed here include: Failover Cluster, Failback Cluster, Failfast Cluster, Failsafe Cluster, Forking Cluster, Broadcast Cluster, Available Cluster, and Mergeable Cluster. In addition, we also discussed the ZoneAware Cluster implementation for multiple registries.