26 Service Governance, Implementing Service Discovery and Load Balancing Regulations

26 Service governance, implementing service discovery and load balancing regulations #

In a distributed system, there are multiple nodes for both service consumers and service providers. If some of the machine nodes of the service provider are overloaded, it may cause the requests received on that node to time out, thereby reducing the overall availability of the service provider. Therefore, RPC frameworks need to implement reasonable load balancing algorithms to evenly distribute traffic to each service provider. So, how can we ensure that traffic is evenly distributed among each service provider? In today’s class, we will discuss the implementation of the load balancing mechanism in RPC frameworks.

Source code reference: mini-rpc

Selection of a Registry Center #

Before initiating an RPC call, the service consumer needs to know which nodes of the service provider are available, and the service provider nodes may come online or go offline. Therefore, service consumers need to be aware of the dynamic changes in the list of service provider nodes. In RPC frameworks, registration centers are generally used to implement service registration and discovery.

Currently, popular registration centers include ZooKeeper, Eureka, Etcd, Consul, Nacos, etc. Choosing a high-performance and highly available registration center is crucial for RPC frameworks. Speaking of high availability, it is naturally related to the CAP theorem. Consistency, Availability, and Partition tolerance cannot be satisfied simultaneously. Registration centers are generally divided into CP-type registration centers and AP-type registration centers. The widely used ZooKeeper is a CP-type registration center. There will be a node in the cluster acting as a leader. If the leader node fails, leader election will be carried out. ZooKeeper ensures strong consistency among all nodes, but it is unable to provide services to the outside world during the leader election process, sacrificing some availability. Eureka is a typical AP-type registration center, which has great advantages in scenarios where service discovery is implemented. The entire cluster does not have the concept of leader or follower. If one node fails, the request will immediately be redirected to other nodes. The possible problem is that if different partitions cannot communicate with each other, the data between nodes may differ. Therefore, AP-type registration centers sacrifice strong consistency to ensure high availability.

For RPC frameworks, even if the registration center encounters problems, it should not affect the normal invocation of services. Therefore, in this scenario, AP-type registration centers have more advantages compared to CP-type registration centers. Mature RPC frameworks provide a variety of registration center options. Next, we will design a common interface for registration centers, and then each registration center implementation will be expanded according to this interface specification.

Design of Registration Center Interface #

The registration center is mainly used to store service metadata information. First, we need to encapsulate the service metadata information into an object, which includes the service name, service version, service address, and service port number, as shown below:

@Data
public class ServiceMeta {
    private String serviceName;
    private String serviceVersion;
    private String serviceAddr;
    private int servicePort;
}

Next, we provide a common interface for the registration center, and the main operation object of this interface is ServiceMeta. It should not have any connection with any third-party registration center tool library. Here is the code:

public interface RegistryService {
    void register(ServiceMeta serviceMeta) throws Exception;
    void unRegister(ServiceMeta serviceMeta) throws Exception;
    ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception;
    void destroy() throws IOException;
}

The RegistryService interface includes four basic operations of the registration center: service registration, service deregistration, service discovery, and registration center shutdown. Next, let’s implement the four interfaces mentioned above using ZooKeeper registration center as an example.

Initialization and Shutdown of Registration Center #

Zookeeper’s commonly used open-source client tools are ZkClient and Apache Curator. Currently, it is recommended to use the Apache Curator client. Compared with ZkClient, Apache Curator not only provides more functionalities but also has a higher level of abstraction, providing more user-friendly API interfaces and a fluent programming style. Before using Apache Curator, we need to import the Maven dependency in the pom.xml file, as shown below:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
    <exclusions>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>2.12.0</version>
</dependency>

…. (The remaining content has been truncated.) 2.12.0

</dependency>

Note that Apache Curator needs to be used with a specific version of ZooKeeper. This project uses ZooKeeper 3.4.14. For compatibility between versions, you should refer to the version update instructions on the Curator official website (https://curator.apache.org).

First, we need to build the ZooKeeper client. The usage of initializing a ZooKeeper client with Apache Curator is mostly similar to the following code:

public class ZookeeperRegistryService implements RegistryService {

    public static final int BASE_SLEEP_TIME_MS = 1000;

    public static final int MAX_RETRIES = 3;

    public static final String ZK_BASE_PATH = "/mini_rpc";

    private final ServiceDiscovery<ServiceMeta> serviceDiscovery;

    public ZookeeperRegistryService(String registryAddr) throws Exception {

        CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr, new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));

        client.start();

        JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);

        this.serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMeta.class)

                .client(client)

                .serializer(serializer)

                .basePath(ZK_BASE_PATH)

                .build();

        this.serviceDiscovery.start();

    }

}

We use the factory pattern to create an instance of CuratorFramework using CuratorFrameworkFactory. The only thing you need to specify is the retry strategy. After creating the CuratorFramework instance, you need to call start() to start it up. Then, we create a ServiceDiscovery object using ServiceDiscoveryBuilder, which is responsible for registering and discovering services. When the system exits, we need to close the initialized instance by calling destroy(). The implementation of destroy() is very simple, as shown in the following code:

@Override

public void destroy() throws IOException {

    serviceDiscovery.close();

}

Service Registration Implementation #

After obtaining an instance of ServiceDiscovery through initialization, we can publish the service metadata information ServiceMeta to the registry center. The implementation of the register() method is as follows:

@Override

public void register(ServiceMeta serviceMeta) throws Exception {

    ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance

            .<ServiceMeta>builder()

            .name(RpcServiceHelper.buildServiceKey(serviceMeta.getServiceName(), serviceMeta.getServiceVersion()))

            .address(serviceMeta.getServiceAddr())

            .port(serviceMeta.getServicePort())

            .payload(serviceMeta)

            .build();

    serviceDiscovery.registerService(serviceInstance);

}

The ServiceInstance object represents a service instance. It has attributes such as name (the name of the service instance), id (a unique identifier), address, port, and payload (custom optional attributes). It’s necessary to understand how ServiceInstance is stored in the ZooKeeper server, as shown in the following diagram.

Drawing 0.png

Generally, we categorize RPC services with the same version together. So we can assign a value to the name attribute of ServiceInstance based on the service name and version, as shown below:

public class RpcServiceHelper {

    public static String buildServiceKey(String serviceName, String serviceVersion) {
import java.util.List;

public interface ServiceLoadBalancer<T> {

    T select(List<T> servers, int hashCode);

}

ServiceLoadBalancer接口定义了负载均衡器的方法select(),该方法接受一个服务节点列表和客户端对象的哈希码作为参数,用于选择一个合适的服务节点。

import java.util.List;

public class ZKConsistentHashLoadBalancer implements ServiceLoadBalancer<ServiceInstance<ServiceMeta>> {

    private final static int VIRTUAL_NODE_SIZE = 10;
    private final static String VIRTUAL_NODE_SPLIT = "#";

    @Override

    public ServiceInstance<ServiceMeta> select(List<ServiceInstance<ServiceMeta>> servers, int hashCode) {

        TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = makeConsistentHashRing(servers); // Construct hash ring

        return allocateNode(ring, hashCode); // Allocate node based on hashCode

    }

    private ServiceInstance<ServiceMeta> allocateNode(TreeMap<Integer, ServiceInstance<ServiceMeta>> ring, int hashCode) {

        Map.Entry<Integer, ServiceInstance<ServiceMeta>> entry = ring.ceilingEntry(hashCode); // Find the first node in a clockwise direction

        if (entry == null) {

            entry = ring.firstEntry(); // If there is no node greater than hashCode, take the first one directly

        }

        return entry.getValue();

    }

    private TreeMap<Integer, ServiceInstance<ServiceMeta>> makeConsistentHashRing(List<ServiceInstance<ServiceMeta>> servers) {

        TreeMap<Integer, ServiceInstance<ServiceMeta>> ring = new TreeMap<>();

        for (ServiceInstance<ServiceMeta> instance : servers) {

            for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {

                ring.put((buildServiceInstanceKey(instance) + VIRTUAL_NODE_SPLIT + i).hashCode(), instance);

            }

        }

        return ring;

    }

    private String buildServiceInstanceKey(ServiceInstance<ServiceMeta> instance) {

        ServiceMeta payload = instance.getPayload();

        return String.join(":", payload.getServiceAddr(), String.valueOf(payload.getServicePort()));

    }

}

JDK provides the TreeMap data structure, which can construct a hash ring very conveniently. By calculating the hashCode corresponding to the address and port of each service instance ServiceInstance, and then directly putting it into the TreeMap, TreeMap will sort the hashCode from small to large by default. When allocating nodes for client objects, use the ceilingEntry() method of TreeMap to find the first node that is greater than or equal to the client hashCode in clockwise direction, which corresponds to the service node that the client needs to call. If no node greater than or equal to the client hashCode is found, then simply take the first node in TreeMap.

At this point, a basic consistent hash algorithm has been implemented, and next, we can complete the discovery() method of service discovery.

Service Discovery Implementation #

The implementation of service discovery is relatively simple. First, find out the node list of the service being called, and then use the consistent hash algorithm provided by ZKConsistentHashLoadBalancer to find the corresponding service node. The specific code implementation is as follows:

@Override

public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {

    Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);

    ServiceInstance<ServiceMeta> instance = new ZKConsistentHashLoadBalancer().select((List<ServiceInstance<ServiceMeta>>) serviceInstances, invokerHashCode);

    if (instance != null) {

        return instance.getPayload();

    }

    return null;

}

Before the service consumer initiates an RPC call through dynamic proxy, it needs to obtain the callable node through the service discovery interface. The specific code implementation will be covered in the next lesson “Dynamic Proxy: Shielding the Underlying Details of RPC Calls from Users”, so we won’t go into details in this lesson.

Summary #

Service registration and discovery are important components of an RPC framework. In this lesson, we designed a generic registry service interface and provided a default implementation for the Zookeeper scenario. Load balancing algorithms are needed for service discovery, and the consistent hash algorithm is widely used in many scenarios. It can ensure that the load is evenly distributed among service nodes and minimize the impact of scaling up/down of service nodes. It is important to understand the implementation principles of the consistent hash algorithm, as it is a frequently asked question in interviews.

Finally, here are two follow-up tasks:

  1. If you are familiar with Eureka or other types of registry centers, you can try to extend the RegistryService interface and implement it.
  2. In the implementation of the consistent hash algorithm, we simply used the hashCode of the service instance as the basis for building the hash ring. A better hash function can refer to the higher-performance MurmurHash, which has a default implementation in the Guava utility library. You can introduce MurmurHash to optimize the implementation of the consistent hash algorithm mentioned above.