32 Registry Center How to Implement Database Access Circuit Break Mechanism Based on Registry Center #

In the previous lesson, we discussed the related content of the configuration center in ShardingSphere. Today, we will continue to discuss another core function of the orchestration and governance module, which is the registry center. Compared to the configuration center, the registry center is more widely used in ShardingSphere.

Registry Center Implementation Classes in ShardingSphere #

Similar to the configuration center, the registry center in ShardingSphere also consists of three independent projects in terms of code structure, namely the API project representing abstract interfaces, and two concrete implementation projects: nacos and zookeeper-curator. As we can see, here we also use Zookeeper as one of the ways to implement the registry center, which was introduced in the previous lesson, and the other implementation method is based on Alibaba’s Nacos.

Let’s first take a look at the abstraction of the registry center in ShardingSphere, which is the RegistryCenter interface shown below:

public interface RegistryCenter extends TypeBasedSPI {

    // Initialize the registry center based on configuration information
    void init(RegistryCenterConfiguration config);

    // Get data
    String get(String key);

    // Get data directly
    String getDirectly(String key);

    // Check if a data item exists
    boolean isExisted(String key);

    // Get a list of child data items
    List<String> getChildrenKeys(String key);

    // Persist data item
    void persist(String key, String value);

    // Update data item
    void update(String key, String value);

    // Persist ephemeral data
    void persistEphemeral(String key, String value);

    // Watch a data item or path
    void watch(String key, DataChangedEventListener dataChangedEventListener);

    // Close the registry center
    void close();

    // Initialize a lock for a data item
    void initLock(String key);

    // Acquire lock for a data item
    boolean tryLock();

    // Release lock for a data item
    void tryRelease();

We can see that, except for the last few methods related to lock handling, RegistryCenter is very similar to ConfigCenter introduced in the previous lesson. From this point, it is not hard to imagine why Zookeeper can be used as both a configuration center and a typical solution for implementing the registry center. Following this line of thinking, let’s first take a look at the CuratorZookeeperRegistryCenter, which is the registry center implementation class based on Zookeeper.

1. CuratorZookeeperRegistryCenter #

Let’s quickly go through the CuratorZookeeperRegistryCenter class and find that the implementation process of the common interface methods is exactly the same as that of the CuratorZookeeperConfigCenter. As for the newly added methods related to lock, the implementation is also very simple. We can directly use the InterProcessMutex encapsulated by Curator, as shown below:

private InterProcessMutex leafLock;

public void initLock(final String key) {
    leafLock = new InterProcessMutex(client, key);

public boolean tryLock() {
    return leafLock.acquire(5, TimeUnit.SECONDS);

public void tryRelease() {

This is about the explanation of CuratorZookeeperRegistryCenter. Next, let’s take a look at another implementation class of the registry center, NacosRegistryCenter.

2. NacosRegistryCenter #

The Nacos framework also provides a client component called ConfigService, which is used to implement the get, getDirectly, and isExisted methods to retrieve data. These methods actually use the getConfig method of ConfigService for implementation, so we don’t need to discuss it in detail.

The persist method of NacosRegistryCenter actually calls its update method, and the latter updates the data by using the publishConfig method of ConfigService, as shown below:

public void persist(final String key, final String value) {
    update(key, value);

public void update(final String key, final String value) {
    try {
        String dataId = key.replace("/", ".");
        String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
        configService.publishConfig(dataId, group, value);
    } catch (final NacosException ex) {
        log.debug("exception for: {}", ex.toString());

Unlike Zookeeper, for Nacos, the methods getChildrenKeys, persistEphemeral, close, initLock, tryLock, and tryRelease cannot be implemented or are not necessary. As for the watch method, ConfigService also provides the addListener method to use listeners. It also relies on the DataChangedEventListener class introduced in the previous lesson to handle events, as shown below:

public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
    try {
        String dataId = key.replace("/", ".");
        String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
        configService.addListener(dataId, group, new Listener() {
            public Executor getExecutor() {
                return null;
    private String instanceId;
    private String ipAddress;
    private String pid;
    private String uuid;
    private String separator = "@";
        "orchestration-" + InetAddress.getLocalHost().getHostName() + "-" + getProcessId() + "@-@-@-@")

3.ShardingOrchestrationListenerManager #

instanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split(DELIMITER)[0] + DELIMITER + UUID.randomUUID().toString();

Please note that in the StateService, the mechanism for storing instances and datasources is different:

// Save Instance using the ephemeral node
public void persistInstanceOnline() {
    regCenter.persistEphemeral(stateNode.getInstancesNodeFullPath(instance.getInstanceId()), "");

// Save DataSources using the persistent node
public void persistDataSourcesNode() {
    regCenter.persist(stateNode.getDataSourcesNodeFullRootPath(), "");

You can see that the persistEphemeral method is used to save instances using the RegistryCenter’s ephemeral node mechanism, while the persist method is used to save DataSources using the persistent node mechanism. This is because it is intentional to handle availability design by designating running instances as ephemeral nodes, automatically registering them when they come online and cleaning them up when they go offline.

3. ShardingOrchestrationListenerManager #

Next, let’s take a look at the last variable ShardingOrchestrationListenerManager in ShardingOrchestrationFacade. From the naming, it seems that this class is used to manage various listener Listeners for processing change events. Based on the analysis so far, it is evident that there should be two types of listeners in the system: one for monitoring configuration changes, and one for monitoring instance status changes.

Sure enough, in ShardingOrchestrationListenerManager, we find two ListenerManager: ConfigurationChangedListenerManager and StateChangedListenerManager, as shown below:

public final class ShardingOrchestrationListenerManager {

    // Manager for configuration change listeners
    private final ConfigurationChangedListenerManager configurationChangedListenerManager;

    // Manager for state change listeners
    private final StateChangedListenerManager stateChangedListenerManager;

    public ShardingOrchestrationListenerManager(final String name, final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
        configurationChangedListenerManager = new ConfigurationChangedListenerManager(name, regCenter, shardingSchemaNames);
        stateChangedListenerManager = new StateChangedListenerManager(name, regCenter);

    public void initListeners() {

We create these two ListenerManager objects and call their initListeners() methods to initialize them. Taking ConfigurationChangedListenerManager as an example, let’s see its internal structure:

public final class ConfigurationChangedListenerManager {

    private final SchemaChangedListener schemaChangedListener;

    private final PropertiesChangedListener propertiesChangedListener;

    private final AuthenticationChangedListener authenticationChangedListener;

    public ConfigurationChangedListenerManager(final String name, final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
        schemaChangedListener = new SchemaChangedListener(name, regCenter, shardingSchemaNames);
        propertiesChangedListener = new PropertiesChangedListener(name, regCenter);
        authenticationChangedListener = new AuthenticationChangedListener(name, regCenter);

    public void initListeners() {
        schemaChangedListener.watch(ChangedType.UPDATED, ChangedType.DELETED);

As you can see, it defines three listeners: SchemaChangedListener, PropertiesChangedListener, and AuthenticationChangedListener. Obviously, they correspond to the three top-level configuration items in the configuration structure introduced by ConfigurationService: schema, props, and authentication. Then, for each of these configuration items, we add monitoring for specific operations as needed. From the above code, we can see that for the schema configuration item, we need to respond to UPDATE and DELETE events, while for the props and authentication configuration items, we only need to pay attention to the UPDATE operation.

Because these specific events and the handling of the monitoring mechanism are similar, let’s take a closer look at SchemaChangedListener. SchemaChangedListener extends the abstract class PostShardingOrchestrationEventListener, which in turn implements the ShardingOrchestrationListener interface. Let’s first look at the definition of this interface:

public interface ShardingOrchestrationListener {
    // Listen to events
    void watch(ChangedType... watchedChangedTypes);

PostShardingOrchestrationEventListener implements this interface, and the implementation process is as follows:

public abstract class PostShardingOrchestrationEventListener implements ShardingOrchestrationListener {

    // Create an EventBus
    private final EventBus eventBus = ShardingOrchestrationEventBus.getInstance();
    private final RegistryCenter regCenter;
    private final String watchKey;

    public final void watch(final ChangedType... watchedChangedTypes) {
        final Collection<ChangedType> watchedChangedTypeList = Arrays.asList(watchedChangedTypes);
        regCenter.watch(watchKey, new DataChangedEventListener() {
            public void onChange(final DataChangedEvent dataChangedEvent) {
                if (watchedChangedTypeList.contains(dataChangedEvent.getChangedType())) {
                    // Publish event through EventBus

    protected abstract ShardingOrchestrationEvent createShardingOrchestrationEvent(DataChangedEvent event);

The core mechanism of the above code is to add an event handler to specific events using the watch method of RegistryCenter, and the event processing process is further forwarded using the post method of Guava’s EventBus. The specific event types to be forwarded are provided by the abstract method createShardingOrchestrationEvent, which needs to be implemented by each subclass of PostShardingOrchestrationEventListener.

Next, let’s take a closer look at the method responsible for event creation in the subclass SchemaChangedListener. We will expand on the createDataSourceChangedEvent method, which is a typical event creation method:

private DataSourceChangedEvent createDataSourceChangedEvent(final String shardingSchemaName, final DataChangedEvent event) {
    Map<String, YamlDataSourceConfiguration> dataSourceConfigurations = (Map) YamlEngine.unmarshal(event.getValue());
    Preconditions.checkState(null != dataSourceConfigurations && !dataSourceConfigurations.isEmpty(), "No available data sources to load for orchestration.");
    // create DataSourceChangedEvent
    return new DataSourceChangedEvent(shardingSchemaName, Maps.transformValues(dataSourceConfigurations, new Function<YamlDataSourceConfiguration, DataSourceConfiguration>() {

            public DataSourceConfiguration apply(final YamlDataSourceConfiguration input) {

                return new DataSourceConfigurationYamlSwapper().swap(input);







