07 Zoo Keeper and Curator Request You Stop Using Zk Client Below

07 ZooKeeper and Curator Request You Stop Using ZkClient Below #

In the previous lesson, we introduced the core concepts and working principles of ZooKeeper. Here, let’s briefly understand the related content of ZooKeeper client. After all, in practical work, we interact with ZooKeeper more often by using the client directly than by extending and developing the underlying ZooKeeper. From the perspective of ZooKeeper architecture, the business node in Dubbo is just a ZooKeeper client.

The official ZooKeeper client provides some basic operations, such as creating sessions, creating nodes, reading nodes, updating data, deleting nodes, and checking if a node exists, etc. However, these simple functionalities are not sufficient in actual development. Moreover, ZooKeeper’s own APIs also have some drawbacks, such as:

  • ZooKeeper’s Watcher is one-time, and it needs to be re-registered after each trigger.
  • After session timeout, there is no mechanism for automatic reconnection.
  • ZooKeeper provides very detailed exceptions, and exception handling becomes cumbersome, which is not user-friendly for beginners.
  • It only provides interfaces for simple byte[] arrays and does not provide serialization for basic types or object-level serialization.
  • When creating a node, if the node already exists, an exception is thrown, and you need to check if the node exists yourself.
  • It is not possible to achieve cascade deletion when deleting a node.

The commonly used third-party open-source ZooKeeper clients are ZkClient and Apache Curator.

ZkClient is a wrapper based on the original ZooKeeper API interface. Although ZkClient solves many problems of the original ZooKeeper API interface and provides very concise API interfaces, such as implementing automatic reconnection mechanism after session timeout, solving the problem of repeated registration of Watchers, etc., its drawbacks are also very obvious. For example, incomplete documentation, difficult-to-use retry mechanism, all exceptions are converted to RuntimeExceptions, and lack of sufficient reference examples, etc. It can be seen how important a simple, easy-to-use, efficient, and reliable ZooKeeper client is.

Introduction to Apache Curator #

Apache Curator is a ZooKeeper client provided by the Apache Foundation. It provides a very user-friendly and readable Fluent-style client API, which can help us quickly build stable and reliable ZooKeeper client programs.

To help you better understand the features of Curator, I have organized the following table showing the jar packages provided by Curator:

1.png

Now, let’s start with the most basic usage and introduce the commonly used core features of Apache Curator in practice. Let the journey of Apache Curator begin.

1. Basic operations #

After briefly understanding the positioning of each component of Apache Curator, let’s start using Curator immediately through an example. First, create a Maven project and add the dependency of Apache Curator:

<dependency> 
    <groupId>org.apache.curator</groupId> 
    <artifactId>curator-recipes</artifactId> 
    <version>4.0.1</version> 
</dependency>

Then, write a main method that shows the usage of the basic APIs provided by Curator:

public class Main { 
    public static void main(String[] args) throws Exception { 
        // Address of the ZooKeeper cluster, multiple node addresses can be separated by commas
        String zkAddress = "127.0.0.1:2181"; 
        
        // Retry policy, if unable to connect to the ZooKeeper cluster, it will retry three times with an increasing retry interval
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 
        
        // Create and start the Curator Client, after successful startup, you can interact with ZooKeeper
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy); 
        client.start(); 
        
        // Below are some commonly used APIs in Curator
        // The create() method creates a ZNode, and additional methods can be called to set the node type or add a Watcher
        // Here, we create a persistent node named "user" and store the string "test" in it
        String path = client.create().withMode(CreateMode.PERSISTENT).forPath("/user", "test".getBytes()); 
        System.out.println(path); // Output: /user
        
        // The checkExists() method checks if a node exists
        Stat stat = client.checkExists().forPath("/user"); 
        System.out.println(stat != null); // Output: true, if the returned Stat is not null, the node exists
        
        // The getData() method gets the data in a node
        byte[] data = client.getData().forPath("/user"); 
        System.out.println(new String(data)); // Output: test
        
        // The setData() method sets the data in a node
        stat = client.setData().forPath("/user", "data".getBytes()); 
        data = client.getData().forPath("/user"); 
        System.out.println(new String(data));
    // Output: data
    
    // Create multiple temporary sequential nodes under the /user node
    
    for (int i = 0; i < 3; i++) {
    
        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    
            .forPath("/user/child-");
    
    }
    
    // Get all child nodes
    
    List<String> children = client.getChildren().forPath("/user");
    
    System.out.println(children);
    
    // Output: [child-0000000002, child-0000000001, child-0000000000]
    
    // The delete() method can delete a specified node, and the deletingChildrenIfNeeded() method
    
    // will cascade delete child nodes
    
    client.delete().deletingChildrenIfNeeded().forPath("/user");
    
}

}

2. Background #

The methods described above for creating, deleting, updating, and reading are all synchronous. Curator provides asynchronous interfaces and introduces the BackgroundCallback callback interface and the CuratorListener listener to handle the result information returned by the server after the Background call.

The BackgroundCallback interface and CuratorListener listener accept a CuratorEvent parameter, which contains detailed information such as event type, response code, and node path.

Below is an example to illustrate the basic usage of the BackgroundCallback interface and CuratorListener listener:

public class Main2 {

    public static void main(String[] args) throws Exception {

        // ZooKeeper cluster address, multiple node addresses can be separated by commas
        String zkAddress = "127.0.0.1:2181";

        // Retry policy, if unable to connect to the ZooKeeper cluster, it will retry three times with increasing interval
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);

        // Create and start Curator Client, after successful start, you can interact with ZooKeeper
        CuratorFramework client = CuratorFrameworkFactory
            .newClient(zkAddress, retryPolicy);

        client.start();

        // Add CuratorListener listener to handle different events
        client.getCuratorListenable().addListener(
          new CuratorListener() {
            public void eventReceived(CuratorFramework client,
                  CuratorEvent event) throws Exception {
                switch (event.getType()) {
                    case CREATE:
                        System.out.println("CREATE:" +
                              event.getPath());
                        break;
                    case DELETE:
                        System.out.println("DELETE:" +
                               event.getPath());
                        break;
                    case EXISTS:
                        System.out.println("EXISTS:" +
                                event.getPath());
                        break;
                    case GET_DATA:
                        System.out.println("GET_DATA:" +
                          event.getPath() + ","
                              + new String(event.getData()));
                        break;
                    case SET_DATA:
                        System.out.println("SET_DATA:" +
                              event.getPath());
                        break;
                    default:
                        break;
                }
            }
        });
    }
}
                // 当指定节点发生变化时,会触发process()方法
                public void process(WatchedEvent event) throws Exception {
                    System.out.println("节点发生变化:" + event.getPath());
                }
            }).forPath("/user");
            
            // 这里修改节点数据,会触发Watcher事件
            client.setData().forPath("/user", "setData-Test".getBytes());
            
            // 输出:
            // 节点发生变化:/user
        }
    }

通过上面的示例,我们可以看到,当使用 usingWatcher() 方法添加 Watcher 时,会触发 Watcher 监听方法中的 process() 方法,从而实现对节点变化的监听。

可以通过注释中的输出示例看出,当我们修改了 /user 节点的数据时,触发了 Watcher 事件,输出了 节点发生变化:/user

如果不再需要监听某个节点上的事件,可以使用 removeWatch() 方法移除 Watcher,例如:

client.getChildren().removeWatch().forPath("/user");

这样就可以移除对 /user 节点的 Watcher。

以上就是 Curator 中 Watcher 监听机制的基本用法,通过添加 Watcher,我们可以实现对指定节点上事件的监听。

NodeCache节点路径:/user,节点数据为:test_updated // NodeCache监听到节点数据的更新事件
PathChildrenCache修改子节点路径:/user/test1,子节点数据:test1_updated // PathChildrenCache监听到子节点数据的更新事件
PathChildrenCache修改子节点路径:/user/test2,子节点数据:test2_updated // PathChildrenCache监听到子节点数据的更新事件
TreeCache,type=NODE_UPDATED path=/user // TreeCache监听到节点数据的更新事件

可以看到,通过使用 NodeCache、PathChildrenCache 和 TreeCache,我们可以方便地监听 ZooKeeper 中节点和子节点的变化,并进行相应的处理。这大大简化了代码的复杂程度。

TreeCache, type=NODE_UPDATED, path=/user

NodeCache node path: /user, node data: userData

**Create /user/test3 node**:

![Drawing 2.png](../images/CgqCHl87iZqAaG93AABwFnQJA7o497.png)

Output received:
    
TreeCache, type=NODE_ADDED, path=/user/test3

2020-06-26T08:35:22.393 CHILD_ADDED

PathChildrenCache adds child node: /user/test3

PathChildrenCache child node data: xxx3

**Update data for /user/test3 node**:

![Drawing 3.png](../images/Ciqc1F87iaSAFZLpAABDyAm7vuE120.png)

Output received:
    
TreeCache, type=NODE_UPDATED, path=/user/test3

2020-06-26T08:43:54.604 CHILD_UPDATED

PathChildrenCache modifies child node path: /user/test3

PathChildrenCache modifies child node data: xxx33

**Delete /user/test3 node**:

![Drawing 4.png](../images/CgqCHl87ia6AYvijAABBmFLfzx4213.png)

Output received:
    
TreeCache, type=NODE_REMOVED, path=/user/test3

2020-06-26T08:44:06.329 CHILD_REMOVED

PathChildrenCache deletes child node: /user/test3

### curator-x-discovery Extended Library

To avoid the curator-framework package becoming too bloated, Curator has split many other solutions into separate packages, such as curator-recipes, curator-x-discovery, and curator-x-rpc.

In the future, we will use curator-x-discovery to complete the registration center module of a simple RPC framework. The curator-x-discovery extension library is a solution for service discovery. In ZooKeeper, we can use ephemeral nodes to implement a service registration mechanism. When a service starts, it creates an ephemeral node under the specified path in ZooKeeper. If the service disconnects from the ZooKeeper session, its corresponding ephemeral node will be deleted. The curator-x-discovery extension library abstracts this functionality and provides a simple API to implement a service discovery mechanism. The core concepts of the curator-x-discovery extension library are as follows:

  * **ServiceInstance:** This is the abstraction of a service instance in the curator-x-discovery extension library, consisting of name, id, address, port, and an optional payload attribute. Its storage in ZooKeeper is shown in the diagram below.

![Drawing 5.png](../images/CgqCHl87icOABt59AADHccHcE1Q955.png)

  * **ServiceProvider:** This is one of the core components of the curator-x-discovery extension library, providing various service discovery strategies like round-robin, random, and sticky (always select the same one). After obtaining the ServiceProvider object, we can call its getInstance() method to obtain a ServiceInstance object (i.e., discover an available service instance) according to the specified strategy; or call the getAllInstances() method to obtain all ServiceInstance objects (i.e., get all available service instances).
  * **ServiceDiscovery:** This is the entry class of the curator-x-discovery extension library. It must be started by calling the start() method, and should be closed by calling the close() method when finished.
  * **ServiceCache:** If the program frequently queries ServiceInstance objects, we can add a ServiceCache cache. The ServiceCache caches a list of ServiceInstance instances in memory and adds the corresponding Watcher to synchronize the cache updates. The way to query the ServiceCache is also through the getInstances() method. In addition, listeners can be added to the ServiceCache to listen for cache changes.

Next, let's use a simple example to illustrate the usage of the curator-x-discovery package. In this example, ServerInfo records the host, port, and description of a service.

```java
public class ZookeeperCoordinator {
    private ServiceDiscovery<ServerInfo> serviceDiscovery;
    private ServiceCache<ServerInfo> serviceCache;
    private CuratorFramework client;
    private String root;
    
    // The JsonInstanceSerializer here serializes ServerInfo into Json
    private InstanceSerializer serializer = new JsonInstanceSerializer<>(ServerInfo.class);
    
    ZookeeperCoordinator(Config config) throws Exception {
        this.root = config.getPath();
        
        // Create Curator client
        client = CuratorFrameworkFactory.newClient(config.getHostPort(), new ExponentialBackoffRetry(...));
        client.start(); // Start Curator client
        client.blockUntilConnected();  // Block the current thread until connected successfully
        
        // Create ServiceDiscovery
        serviceDiscovery = ServiceDiscoveryBuilder...
.builder(ServerInfo.class) 

.client(client) // Curator client dependency 

.basePath(root) // Managed Zk path 

.watchInstances(true) // When ServiceInstance is loaded 

.serializer(serializer) 

.build(); 

serviceDiscovery.start(); // Start ServiceDiscovery 

// Create ServiceCache to monitor changes to corresponding nodes in Zookeeper 

serviceCache = serviceDiscovery.serviceCacheBuilder() 

.name(root) 

.build();

serviceCache.start(); // Start ServiceCache 

}

public void registerRemote(ServerInfo serverInfo)throws Exception{ 

// Convert ServerInfo object to ServiceInstance object 

ServiceInstance<ServerInfo> thisInstance =

ServiceInstance.<ServerInfo>builder() 

.name(root) 

.id(UUID.randomUUID().toString()) // randomly generated UUID 

.address(serverInfo.getHost()) // host 

.port(serverInfo.getPort()) // port 

.payload(serverInfo) // payload 

.build(); 

// Write ServiceInstance to Zookeeper 

serviceDiscovery.registerService(thisInstance); 

}

public List<ServerInfo> queryRemoteNodes() { 

List<ServerInfo> ServerInfoDetails = new ArrayList<>(); 

// Query ServiceCache to get all ServiceInstance objects 

List<ServiceInstance<ServerInfo>> serviceInstances =

serviceCache.getInstances(); 

serviceInstances.forEach(serviceInstance -> { 

// Deserialize ServerInfo instance from the payload field of each ServiceInstance object 

ServerInfo instance = serviceInstance.getPayload(); 

ServerInfoDetails.add(instance); 

}); 

return ServerInfoDetails; 

}

Introduction to curator-recipes #

Recipes provide solutions to common distributed scenarios in Curator. Here we just give a brief introduction, for detailed usage and principles, we will not analyze in depth for now.

  • Queues: Provides various solutions for distributed queues, such as weighted queues and delayed queues. In production environments, ZooKeeper is rarely used as a distributed queue, and it is only suitable for use in scenarios with very low pressure. Therefore, it is recommended to use it in moderation.

  • Counters: Global counters are commonly used tools in distributed systems. Curator Recipes provides components such as SharedCount and DistributedAtomicLong to help developers implement distributed counter functions.

  • Locks: You should already have some understanding of various locks provided by java.util.concurrent.locks. In microservice architecture, distributed lock is also a very basic service component. Curator Recipes provides several distributed lock implementations based on ZooKeeper to meet the needs of distributed locks in daily work.

  • Barriers: The distributed barrier provided by Curator Recipes can be used to achieve collaboration among multiple services. The specific implementations include DistributedBarrier and DistributedDoubleBarrier.

  • Elections: The main function is to elect a leader among multiple participants, and then the leader node is used as the executor for operation scheduling, task monitoring, or queue consumption. Curator Recipes provides an implementation called LeaderLatch.

Summary #

In this lesson, we focused on the introduction of Apache Curator:

  • Firstly, we compared Apache Curator with other ZooKeeper clients, and the ease of use of Apache Curator is an important reason for choosing it.

  • Next, we introduced the basic usage of Apache Curator and some things to pay attention to in practical use through examples.

  • Afterwards, we introduced the basic concepts and usage of the curator-x-discovery extension library.

  • Lastly, we briefly introduced the powerful features provided by curator-recipes.

Do you have any other insights about Apache Curator? Feel free to leave me a message in the comments and share it with me.

zk-demo link: https://github.com/xxxlxy2008/zk-demo .