35 Load Balancing Fair and Proper Use of Load Balancing Strategies Available Here Above

35 Load Balancing Fair and Proper Use of Load Balancing Strategies Available Here Above #

In the previous lessons, we have introduced in detail the two core interfaces and implementations, Directory and Router, in the dubbo-cluster module, and also introduced the peripheral knowledge related to these two interfaces. In this lesson, we will continue to introduce the content related to LoadBalance in the order shown in the following figure.

Drawing 0.png

LoadBalance Core Interface Diagram

The responsibility of LoadBalance is to “share” the network requests or other forms of load to different service nodes, in order to avoid situations where some nodes in the service cluster are under heavy stress and resource-constrained, while other nodes are relatively idle.

Through reasonable load balancing algorithms, we hope to allow each service node to receive a load that suits its processing capacity, achieving a reasonable distribution of processing capacity and traffic. Common load balancing techniques can be divided into software load balancing (such as Nginx used in daily work) and hardware load balancing (mainly F5, Array, NetScaler, etc., but rarely directly encountered by development engineers in practice).

Common RPC frameworks have the concept of load balancing and corresponding implementations, and Dubbo is no exception. Dubbo needs to distribute the invocation requests of the Consumer to avoid situations where a few Provider nodes are overloaded while the remaining Provider nodes are idle. This is because when a Provider is overloaded, it will cause a series of problems such as request timeout and loss, leading to online failure.

Dubbo provides 5 load balancing implementations, namely:

  • ConsistentHashLoadBalance based on Hash Consistency;
  • RandomLoadBalance based on Weighted Random Algorithm;
  • LeastActiveLoadBalance based on Least Active Invocation Count;
  • RoundRobinLoadBalance based on Weighted Round Robin Algorithm;
  • ShortestResponseLoadBalance based on Shortest Response Time.

LoadBalance Interface #

The load balancing implementations provided by Dubbo mentioned above are all implementation classes of the LoadBalance interface, as shown in the following figure:

Lark20201124-174750.png

LoadBalance Inheritance Diagram

LoadBalance is an extension interface, and the default extension implementation it uses is RandomLoadBalance. Its definition is as follows, where the @Adaptive annotation parameter is loadbalance, which means that the dynamically generated adapter will select the extension implementation class according to the value of the loadbalance parameter in the URL.

@SPI(RandomLoadBalance.NAME)

public interface LoadBalance {

    @Adaptive("loadbalance")

    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

The core function of the select() method in the LoadBalance interface is to select an Invoker from the Invoker collection based on the passed-in URL, Invocation, and its own load balancing algorithm.

The AbstractLoadBalance abstract class does not implement the select() method itself, but only handles the special case where the Invoker collection is empty or contains only one Invoker object, as shown below:

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

    if (CollectionUtils.isEmpty(invokers)) {

        return null; // Invoker collection is empty, directly return null

    }

    if (invokers.size() == 1) { // Invoker collection contains only one Invoker, directly return this Invoker object

        return invokers.get(0);

    }

    // When the Invoker collection contains multiple Invoker objects, it is handed over to the doSelect() method for processing. This is an abstract method left for concrete implementation by subclasses.

    return doSelect(invokers, url, invocation);

}

In addition, the AbstractLoadBalance class also provides a getWeight() method, which is used to calculate the weight of the Provider. The specific implementation is as follows:

int getWeight(Invoker<?> invoker, Invocation invocation) {

    int weight;

    URL url = invoker.getUrl();

    if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {

        // If it is the RegistryService interface, directly get the weight

        weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);

    } else {

        weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);

        if (weight > 0) {

            // Get the start timestamp of the service provider

            long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);

            if (timestamp > 0L) {

                // Calculate the running time of the Provider

                long uptime = System.currentTimeMillis() - timestamp;

                if (uptime < 0) {

                    return 1;

                }

                // Calculate the warm-up time for the Provider

                int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
// If the Provider's uptime is less than the warm-up time, the Provider node may still be in the warm-up phase and needs to recalculate the service weight (reduce its weight)

if (uptime > 0 && uptime < warmup) {

    weight = calculateWarmupWeight((int)uptime, warmup, weight);

}

}


}

}


return Math.max(weight, 0);

}

The purpose of the calculateWarmupWeight() method is to reduce the weight of Provider nodes that are still in the warm-up state in order to avoid a large influx of requests as soon as the Provider starts. Service warm-up is an optimization measure determined by certain features of the JVM itself, such as JIT optimization. Generally, after the service starts, we let it run for a period of time in a low traffic state before gradually increasing the traffic.

static int calculateWarmupWeight(int uptime, int warmup, int weight) {

// Calculating the weight, as the service uptime increases, the value of the weight ww will gradually approach the configured value weight

int ww = (int) ( uptime / ((float) warmup / weight));

return ww < 1 ? 1 : (Math.min(ww, weight));

}

After understanding the definition of the LoadBalance interface and the common capabilities provided by AbstractLoadBalance, we will now introduce the specific implementations of the LoadBalance interface one by one.

ConsistentHashLoadBalance

ConsistentHashLoadBalance uses the consistent hash algorithm to implement load balancing. In order to better understand this part, let’s first briefly introduce the knowledge points related to the consistent hash algorithm.

  1. Analysis of consistent hash

Consistent hash load balancing allows requests with the same parameters to be routed to the same service node every time. This load balancing strategy can evenly distribute the traffic of the nodes that go offline to other providers, without causing drastic fluctuations in traffic.

Next, we will briefly introduce the principle of the consistent hash algorithm through an example.

Suppose there are three Provider nodes, 1, 2, and 3, providing services externally. 100 requests arrive simultaneously. If we want to distribute the requests as evenly as possible to these three Provider nodes, the simplest method we might think of is to perform Hash modulo, which is hash(request parameters) % 3. If the parameters involved in the Hash calculation are all the request parameters, then requests with the same parameters will be assigned to the same Provider node. However, if there is a sudden failure of a Provider node, we need to perform modulo 2, and the requests will be reassigned to the corresponding Provider. In extreme cases, all processing nodes for requests may change, causing significant fluctuations.

To avoid the situation where a large number of requests are reassigned due to the failure of a Provider node, we can consider using the consistent hash algorithm. The principle of the consistent hash algorithm is also modulo algorithm, with the difference being that Hash modulo is performed on the number of Provider nodes, while the consistent hash algorithm is performed on 2^32.

The consistent hash algorithm needs to perform modulo on both the Provider address and the request parameters:

hash(Provider address) % 2^32

hash(request parameters) % 2^32 Provider addresses and request results, obtained by taking the modulo of 2^32, will all fall onto a hash ring as shown in the diagram below:

Lark20201124-174752.png

Consistent Hash Node Distribution Diagram

We distribute the requests to the corresponding Provider in a clockwise direction. This way, when a Provider node goes offline or a new Provider node is added, it will only affect the requests corresponding to that Provider node.

In an ideal scenario, the Consistent Hash algorithm will evenly distribute these three Provider nodes on the hash ring, and the requests will be evenly distributed to these three Provider nodes. However, in reality, the values obtained after taking the modulo of the Provider node addresses may not have much difference. This can result in a large number of requests falling onto one Provider node, as shown in the diagram below:

Lark20201124-174755.png

Uneven Distribution of Consistent Hash Nodes

This is referred to as data skew. Data skew refers to a situation where a large number of requests fall onto one node due to the nodes not being sufficiently scattered, while other nodes only receive a small number of requests.

To solve the problem of data skew in the Consistent Hash algorithm, the concept of hash slots has been introduced.

The idea behind hash slots is to solve data skew by virtualizing multiple sets of Provider nodes, such as P1, P2, P3, to distribute multiple sets of Provider nodes evenly on the hash ring. The nodes with the same shading in the diagram below represent the same Provider node, for example, P1-1, P1-2…P1-99 all represent the P1 Provider node. With the introduction of virtual Provider nodes, the Providers are scattered on the circular ring to avoid data skew issues.

Lark20201124-174743.png

Diagram illustrating the solution to data skew

2. Analysis of ConsistentHashSelector Implementation #

After understanding the basic principles of the Consistent Hash algorithm, let’s take a look at the implementation of ConsistentHashLoadBalance’s doSelect() method. The core algorithm is delegated to the ConsistentHashSelector object.

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

    // Get the name of the invoked method

    String methodName = RpcUtils.getMethodName(invocation);

    // Concatenate ServiceKey and method to form a key

    String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

    // Note: This is to re-generate the ConsistentHashSelector object when invokers list changes

    int invokersHashCode = invokers.hashCode();

    // Get the corresponding ConsistentHashSelector object based on the key. selectors is a ConcurrentMap<String, ConsistentHashSelector> collection.

    ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);

    if (selector == null || selector.identityHashCode != invokersHashCode) { // If the ConsistentHashSelector object is not found, create a new one

        selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));

        selector = (ConsistentHashSelector<T>) selectors.get(key);

    }

    // Select an Invoker object using the ConsistentHashSelector object
return selector.select(invocation);

Now let’s take a look at ConsistentHashSelector, its core fields are as follows:

  • virtualInvokers (of type TreeMap<Long, Invoker>): used to record the hash ring of virtual Invoker objects. TreeMap is used here to implement the hash ring, and virtual Invoker objects are distributed on the hash ring.
  • replicaNumber (of type int): the number of virtual Invoker objects.
  • identityHashCode (of type int): the hashcode value of the Invoker collection.
  • argumentIndex (of type int[]): the indexes of the parameters that need to participate in the hash calculation. For example, when argumentIndex = [0, 1, 2], it means that the first three parameters of the target method to be called should participate in the hash calculation.

Next, let’s look at the constructor of ConsistentHashSelector, where the main tasks are:

  • Building hash slots.
  • Confirming the parameters that participate in consistent hash calculation, by default it is the first parameter.

The purpose of these operations is to distribute the Invokers as evenly as possible on the hash ring. The specific implementation is as follows:

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {

    // Initialize virtualInvokers field, which is the virtual hash slot

    this.virtualInvokers = new TreeMap<Long, Invoker<T>>();

    // Record the hashCode of the Invoker collection, use this hashCode value to determine whether the Provider list has changed

    this.identityHashCode = identityHashCode;

    URL url = invokers.get(0).getUrl();

    // Get the number of virtual nodes from the hash.nodes parameter

    this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);

    // Get the indexes of the parameters that participate in the hash calculation, by default the first parameter is used

    String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));

    argumentIndex = new int[index.length];

    for (int i = 0; i < index.length; i++) {

        argumentIndex[i] = Integer.parseInt(index[i]);

    }

    // Build virtual hash slot, by default replicaNumber=160, which means placing 160 slots on the hash ring

    // Outer loop 40 times, inner loop 4 times, total 40*4=160 times, which means 160 slots are virtually generated for the same node

    for (Invoker<T> invoker : invokers) {

        String address = invoker.getUrl().getAddress();

        for (int i = 0; i < replicaNumber / 4; i++) {

            // Take md5 calculation on address + i, resulting in a byte array of length 16

            byte[] digest = md5(address + i);

            // Perform 4 hash calculations on part of the digest bytes, resulting in 4 different long positive integers

            for (int h = 0; h < 4; h++) {

                // When h = 0, take the 4 bytes with index 0-3 in digest for bit operation

                // When h = 1, take the 4 bytes with index 4-7 in digest for bit operation

                // The process is the same for h = 2 and h = 3

                long m = hash(digest, h);

                virtualInvokers.put(m, invoker);

            }

        }

    }

}

Finally, the request will choose the appropriate Invoker object through the ConsistentHashSelector.select() method. The request parameters will be md5-hashed and hashed to get a hash value. Then, the target Invoker is looked up in the TreeMap using this hash value. The specific implementation is as follows:

public Invoker<T> select(Invocation invocation) {

    // Concatenate the parameters participating in consistent hash into a key

    String key = toKey(invocation.getArguments());

    // Calculate the hash value of the key

    byte[] digest = md5(key);

    // Match the Invoker object

    return selectForKey(hash(digest, 0));

}

private Invoker<T> selectForKey(long hash) {

    // Look up the first Invoker object in the virtualInvokers collection (TreeMap is sorted by key) that is greater than or equal to the passed-in hash value

    Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
// If the hash value is greater than all the invokers on the hash ring, return to the beginning of the hash ring and return the first invoker object

if (entry == null) {

    entry = virtualInvokers.firstEntry();

}

return entry.getValue();

RandomLoadBalance #

The load balancing algorithm used by RandomLoadBalance is Weighted Random Algorithm. RandomLoadBalance is a simple and efficient load balancing implementation, and it is the default load balancing implementation used by Dubbo.

Here we use an example to illustrate the core idea of the weighted random algorithm. Let’s assume we have three Provider nodes A, B, and C, and their weights are 5, 2, and 3 respectively. The sum of the weights is 10. Now, we put these weight values on a one-dimensional coordinate axis. The range [0, 5) belongs to node A, the range [5, 7) belongs to node B, and the range [7, 10) belongs to node C, as shown in the following diagram:

Drawing 5.png

Diagram of the weight coordinate axis

Next, we generate a random number within the range [0, 10) using a random number generator, and then calculate which range this random number falls into. For example, if we generate 4 randomly, it will fall into the range corresponding to Provider A, and RandomLoadBalance will return Provider A.

Next, let’s take a look at the implementation of the doSelect() method in RandomLoadBalance. Its core logic can be divided into three key points:

  • Calculate the weight value of each invoker and the total weight value;
  • When the weight values of each invoker are not equal, calculate which invoker range the random number should fall into and return the corresponding invoker object;
  • When the weight values of each invoker are equal, randomly select an invoker and return it.

After multiple requests, RandomLoadBalance can evenly distribute the invocation requests to each Provider node based on their weight values. The following is the core implementation of RandomLoadBalance:

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {

    int length = invokers.size();

    boolean sameWeight = true;

    // Calculate the weight value of each Invoker and fill it into the weights[] array

    int[] weights = new int[length];

    // Calculate the weight of the first Invoker

    int firstWeight = getWeight(invokers.get(0), invocation);

    weights[0] = firstWeight;

    // totalWeight is used to record the total weight value

    int totalWeight = firstWeight;

    for (int i = 1; i < length; i++) {

        // Calculate the weight of each Invoker and the total weight totalWeight

        int weight = getWeight(invokers.get(i), invocation);

        weights[i] = weight;

        // Sum

        totalWeight += weight;

        // Check if the weight of each Provider is the same

        if (sameWeight && weight != firstWeight) {

            sameWeight = false;

        }

    }

    // When the weight values of each Invoker are not equal, calculate which range the random number falls into

    if (totalWeight > 0 && !sameWeight) {

        // Obtain a random number within the range [0, totalWeight)

        int offset = ThreadLocalRandom.current().nextInt(totalWeight);

        // Subtract the weight value of the Invoker from the offset number in each iteration, when the offset is less than 0, return the corresponding Invoker

        for (int i = 0; i < length; i++) {

            offset -= weights[i];

            if (offset < 0) {

                return invokers.get(i);

            }

        }

    }

    // When the weight values of each Invoker are equal, randomly return an Invoker

    return invokers.get(ThreadLocalRandom.current().nextInt(length));

}

Summary #

In this lesson, we focused on the load balancing related content in the Dubbo Cluster layer. First, we introduced the definition of the LoadBalance interface and the common capabilities provided by the AbstractLoadBalance abstract class. Then we explained in detail the core implementation of ConsistentHashLoadBalance, and briefly mentioned the basic knowledge points of the consistent hash algorithm. Finally, we analyzed the basic principles and core implementation of RandomLoadBalance together.