32 Registry Center How to Implement Database Access Circuit Break Mechanism Based on Registry Center

32 Registry Center How to Implement Database Access Circuit Break Mechanism Based on Registry Center #

In the previous lesson, we discussed the related content of the configuration center in ShardingSphere. Today, we will continue to discuss another core function of the orchestration and governance module, which is the registry center. Compared to the configuration center, the registry center is more widely used in ShardingSphere.

Registry Center Implementation Classes in ShardingSphere #

Similar to the configuration center, the registry center in ShardingSphere also consists of three independent projects in terms of code structure, namely the API project representing abstract interfaces, and two concrete implementation projects: nacos and zookeeper-curator. As we can see, here we also use Zookeeper as one of the ways to implement the registry center, which was introduced in the previous lesson, and the other implementation method is based on Alibaba’s Nacos.

Let’s first take a look at the abstraction of the registry center in ShardingSphere, which is the RegistryCenter interface shown below:

public interface RegistryCenter extends TypeBasedSPI {

    // Initialize the registry center based on configuration information
    void init(RegistryCenterConfiguration config);

    // Get data
    String get(String key);

    // Get data directly
    String getDirectly(String key);

    // Check if a data item exists
    boolean isExisted(String key);

    // Get a list of child data items
    List<String> getChildrenKeys(String key);

    // Persist data item
    void persist(String key, String value);

    // Update data item
    void update(String key, String value);

    // Persist ephemeral data
    void persistEphemeral(String key, String value);

    // Watch a data item or path
    void watch(String key, DataChangedEventListener dataChangedEventListener);

    // Close the registry center
    void close();

    // Initialize a lock for a data item
    void initLock(String key);

    // Acquire lock for a data item
    boolean tryLock();

    // Release lock for a data item
    void tryRelease();
}

We can see that, except for the last few methods related to lock handling, RegistryCenter is very similar to ConfigCenter introduced in the previous lesson. From this point, it is not hard to imagine why Zookeeper can be used as both a configuration center and a typical solution for implementing the registry center. Following this line of thinking, let’s first take a look at the CuratorZookeeperRegistryCenter, which is the registry center implementation class based on Zookeeper.

1. CuratorZookeeperRegistryCenter #

Let’s quickly go through the CuratorZookeeperRegistryCenter class and find that the implementation process of the common interface methods is exactly the same as that of the CuratorZookeeperConfigCenter. As for the newly added methods related to lock, the implementation is also very simple. We can directly use the InterProcessMutex encapsulated by Curator, as shown below:

private InterProcessMutex leafLock;

@Override
public void initLock(final String key) {
    leafLock = new InterProcessMutex(client, key);
}

@Override
@SneakyThrows
public boolean tryLock() {
    return leafLock.acquire(5, TimeUnit.SECONDS);
}

@Override
@SneakyThrows
public void tryRelease() {
    leafLock.release();
}

This is about the explanation of CuratorZookeeperRegistryCenter. Next, let’s take a look at another implementation class of the registry center, NacosRegistryCenter.

2. NacosRegistryCenter #

The Nacos framework also provides a client component called ConfigService, which is used to implement the get, getDirectly, and isExisted methods to retrieve data. These methods actually use the getConfig method of ConfigService for implementation, so we don’t need to discuss it in detail.

The persist method of NacosRegistryCenter actually calls its update method, and the latter updates the data by using the publishConfig method of ConfigService, as shown below:

@Override
public void persist(final String key, final String value) {
    update(key, value);
}

@Override
public void update(final String key, final String value) {
    try {
        String dataId = key.replace("/", ".");
        String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
        configService.publishConfig(dataId, group, value);
    } catch (final NacosException ex) {
        log.debug("exception for: {}", ex.toString());
    }
}

Unlike Zookeeper, for Nacos, the methods getChildrenKeys, persistEphemeral, close, initLock, tryLock, and tryRelease cannot be implemented or are not necessary. As for the watch method, ConfigService also provides the addListener method to use listeners. It also relies on the DataChangedEventListener class introduced in the previous lesson to handle events, as shown below:

@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
    try {
        String dataId = key.replace("/", ".");
        String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }
OrchestrationInstance...
    private String instanceId;
    private String ipAddress;
    private String pid;
    private String uuid;
    private String separator = "@";
...
OrchestrationInstance.getInstance().withInstanceId(
        "orchestration-" + InetAddress.getLocalHost().getHostName() + "-" + getProcessId() + "@-@-@-@")
        .init();

StateService 中实现的方法比较简单,可以通过以下三个方法获取对应的状态信息:

  • getAllInstances 接口返回的是获取注册中心中的实例集合。
  • getServersNodePath 接口返回了注册中心中的数据库节点。
  • persistInstanceOnline 接口用于记录当前数据库实例上线。

3.ShardingOrchestrationListenerManager #

ShardingOrchestrationListenerManager 类定义了 ShardingSphere 编排治理服务的所有待监听事件,同样也定义了每个事件在被监听后的处理逻辑,如下所示:

public ShardingOrchestrationListenerManager(final ShardingOrchestrationFacade shardingOrchestrationFacade) {
    //注册实例上线监听器
    configureStateChangedListenerEngine.register(new InstanceStateChangedListener(shardingOrchestrationFacade));
    //注册规则配置监听器
    configureStateChangedListenerEngine.register(new RuleChangedListener(shardingOrchestrationFacade));
    //注册分片数据源配置监听器
    configureStateChangedListenerEngine.register(new DataSourceChangedListener(shardingOrchestrationFacade));
    //注册认证配置监听器
    configureStateChangedListenerEngine.register(new AuthenticationChangedListener(shardingOrchestrationFacade));
    //注册属性配置监听器
    configureStateChangedListenerEngine.register(new PropChangedListener(shardingOrchestrationFacade));
    //注册发布监听数据源状态的监听器
    instanceStateChangedListenerEngine.register(new DataSourceListener(shardingOrchestrationFacade));
}

在 ShardingOrchestrationListenerManager 中,我们可以看到它是在构造方法中注册各种类型的 Listener,Listener 的类型包括 InstanceStateChangedListener、RuleChangedListener、DataSourceChangedListener、AuthenticationChangedListener 以及 PropChangedListener。这些监听器用于监听相应的事件,当事件发生时,会触发相应的处理逻辑。

听到这里,以上就是 ShardingOrchestrationListenerManager 类中的所有代码,注册了一组监听器但却不知道是如何触发相应事件的。实际上,事件的触发是由 RuleRegistry 并通过 ShardingRuleConfiguration 的 isInstanceDataSourceExists 方法决定的,下面我们着重来看一下这两块。

RuleRegistry #

触发事件的入口是在 RuleRegistry 的 getInstanceDataSourceMap 方法中,其内部创建了 RuleConfigurationListener 实例并调用了 registerListener 方法,如下所示:

private Map<String, Collection<String>> getInstanceDataSourceMap(final ShardingRuleConfiguration shardingRuleConfiguration, final DatabaseType databaseType, final Collection<String> dataSourceNames) {
    RuleConfigurationListener ruleConfigListener = new RuleConfigurationListener(shardingRuleConfiguration, dataSourceNames);
    registerListener(ruleConfigListener);
    return ruleConfigListener.getInstanceDataSourceMap();
}

RuleConfigurationListener 类中没有太多复杂的逻辑,主要负责加载并解析 dataSourceNames 后进行 InstanceDataSourceMap 的创建,如下所示:

public class RuleConfigurationListener implements EventListener {
    private volatile Map<String, Collection<String>> instanceDataSourceMap = new ConcurrentHashMap<>();
    ...
    public RuleConfigurationListener(final ShardingRuleConfiguration shardingRuleConfiguration, final Collection<String> dataSourceNames) {
        init(shardingRuleConfiguration.getShardingRuleBuilder().getSQLParserEngine(), dataSourceNames);
    }
    ...
}

我们需要重点关注一下 RuleConfigurationListener 类中 register 方法的调用逻辑,这个方法的具体实现在 RuleRegistry 中,并其实就是在 RuleRegistry 中添加了一个广播事件,如下所示:

//监听 ShardingSphereEvent。
RuleRegistry.getInstance().register(userDataSourceNamesEvent);

走到这里是不是感觉整个流程已经清晰不少了呢?其实事件的广播和处理原理整体上来看非常简单,核心代码位于 AbstractRuleRegistry 类的 register 方法,如下所示:

public abstract class AbstractRuleRegistry<T extends RuleConfiguration> {
    private final Collection<RuleConfiguration> configurations = new ConcurrentLinkedQueue<>();
    public void register(final T ruleConfig) {
        configurations.add(ruleConfig);
        post(ruleConfig);
    }
    ...
    protected abstract void post(T ruleConfig);
}

这里的 post 方法就是广播事件的过程,它通过调用 ShardingSphereEventBus 的 post 方法来实现实际的事件广播,而 ShardingSphereEventBus 的真正调用逻辑位于 org.apache.shardingsphere.spi.YoungerSiblingFirstShardingSphereServiceLoader.SPI_PROVIDER.loadService(ShardingSphereEventBus.class).post(event) 中。

ShardingSphereEventBus 类是 ShardingSphere 提供的事件总线,通过 ShardingSphereEventBus 可以实现对自定义事件的发布、订阅以及事件的处理。ShardingSphereEventBus 是一个单例的类,它在初始化时会初始化事件处理线程池并启动一个消费者进行事件处理。目前 ShardingSphereEventBus 提供了两种事件的消费方式,一种是同步消费者,用于处理一些需要更快获得事件处理结果的场景;另一种是异步消费者,用于处理一些比较耗时的任务。最后,在 post 方法中会通过调用 YoungerSiblingFirstShardingSphereServiceLoader.SPI_PROVIDER 中的方法获取到 ShardingEventListener 的实例,然后根据实例的类型来分别调用不同事件的处理器进行事件的处理。 以上就是整个 ShardingSphere 编排治理服务的核心代码,通过上面引用的三个类将 ShardingSphere 编排服务的规则匹配与注册中心的数据状态管理相结合,使得治理数据与配置的变更变得可控,更加方便了用户发布数据。

instanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split(DELIMITER)[0] + DELIMITER + UUID.randomUUID().toString();

Please note that in the StateService, the mechanism for storing instances and datasources is different:

// Save Instance using the ephemeral node
public void persistInstanceOnline() {
    regCenter.persistEphemeral(stateNode.getInstancesNodeFullPath(instance.getInstanceId()), "");
}

// Save DataSources using the persistent node
public void persistDataSourcesNode() {
    regCenter.persist(stateNode.getDataSourcesNodeFullRootPath(), "");
}

You can see that the persistEphemeral method is used to save instances using the RegistryCenter’s ephemeral node mechanism, while the persist method is used to save DataSources using the persistent node mechanism. This is because it is intentional to handle availability design by designating running instances as ephemeral nodes, automatically registering them when they come online and cleaning them up when they go offline.

3. ShardingOrchestrationListenerManager #

Next, let’s take a look at the last variable ShardingOrchestrationListenerManager in ShardingOrchestrationFacade. From the naming, it seems that this class is used to manage various listener Listeners for processing change events. Based on the analysis so far, it is evident that there should be two types of listeners in the system: one for monitoring configuration changes, and one for monitoring instance status changes.

Sure enough, in ShardingOrchestrationListenerManager, we find two ListenerManager: ConfigurationChangedListenerManager and StateChangedListenerManager, as shown below:

public final class ShardingOrchestrationListenerManager {

    // Manager for configuration change listeners
    private final ConfigurationChangedListenerManager configurationChangedListenerManager;

    // Manager for state change listeners
    private final StateChangedListenerManager stateChangedListenerManager;

    public ShardingOrchestrationListenerManager(final String name, final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
        configurationChangedListenerManager = new ConfigurationChangedListenerManager(name, regCenter, shardingSchemaNames);
        stateChangedListenerManager = new StateChangedListenerManager(name, regCenter);
    }

    public void initListeners() {
        configurationChangedListenerManager.initListeners();
        stateChangedListenerManager.initListeners();
    }
}

We create these two ListenerManager objects and call their initListeners() methods to initialize them. Taking ConfigurationChangedListenerManager as an example, let’s see its internal structure:

public final class ConfigurationChangedListenerManager {

    private final SchemaChangedListener schemaChangedListener;

    private final PropertiesChangedListener propertiesChangedListener;

    private final AuthenticationChangedListener authenticationChangedListener;

    public ConfigurationChangedListenerManager(final String name, final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
        schemaChangedListener = new SchemaChangedListener(name, regCenter, shardingSchemaNames);
        propertiesChangedListener = new PropertiesChangedListener(name, regCenter);
        authenticationChangedListener = new AuthenticationChangedListener(name, regCenter);
    }

    public void initListeners() {
        schemaChangedListener.watch(ChangedType.UPDATED, ChangedType.DELETED);
        propertiesChangedListener.watch(ChangedType.UPDATED);
        authenticationChangedListener.watch(ChangedType.UPDATED);
    }
}

As you can see, it defines three listeners: SchemaChangedListener, PropertiesChangedListener, and AuthenticationChangedListener. Obviously, they correspond to the three top-level configuration items in the configuration structure introduced by ConfigurationService: schema, props, and authentication. Then, for each of these configuration items, we add monitoring for specific operations as needed. From the above code, we can see that for the schema configuration item, we need to respond to UPDATE and DELETE events, while for the props and authentication configuration items, we only need to pay attention to the UPDATE operation.

Because these specific events and the handling of the monitoring mechanism are similar, let’s take a closer look at SchemaChangedListener. SchemaChangedListener extends the abstract class PostShardingOrchestrationEventListener, which in turn implements the ShardingOrchestrationListener interface. Let’s first look at the definition of this interface:

public interface ShardingOrchestrationListener {
    // Listen to events
    void watch(ChangedType... watchedChangedTypes);
}

PostShardingOrchestrationEventListener implements this interface, and the implementation process is as follows:

public abstract class PostShardingOrchestrationEventListener implements ShardingOrchestrationListener {

    // Create an EventBus
    private final EventBus eventBus = ShardingOrchestrationEventBus.getInstance();
    private final RegistryCenter regCenter;
    private final String watchKey;

    @Override
    public final void watch(final ChangedType... watchedChangedTypes) {
        final Collection<ChangedType> watchedChangedTypeList = Arrays.asList(watchedChangedTypes);
        regCenter.watch(watchKey, new DataChangedEventListener() {
            @Override
            public void onChange(final DataChangedEvent dataChangedEvent) {
                if (watchedChangedTypeList.contains(dataChangedEvent.getChangedType())) {
                    // Publish event through EventBus
                    eventBus.post(createShardingOrchestrationEvent(dataChangedEvent));
                }
          }
      });
    }

    protected abstract ShardingOrchestrationEvent createShardingOrchestrationEvent(DataChangedEvent event);
}

The core mechanism of the above code is to add an event handler to specific events using the watch method of RegistryCenter, and the event processing process is further forwarded using the post method of Guava’s EventBus. The specific event types to be forwarded are provided by the abstract method createShardingOrchestrationEvent, which needs to be implemented by each subclass of PostShardingOrchestrationEventListener.

Next, let’s take a closer look at the method responsible for event creation in the subclass SchemaChangedListener. We will expand on the createDataSourceChangedEvent method, which is a typical event creation method:

private DataSourceChangedEvent createDataSourceChangedEvent(final String shardingSchemaName, final DataChangedEvent event) {
    Map<String, YamlDataSourceConfiguration> dataSourceConfigurations = (Map) YamlEngine.unmarshal(event.getValue());
    Preconditions.checkState(null != dataSourceConfigurations && !dataSourceConfigurations.isEmpty(), "No available data sources to load for orchestration.");
    // create DataSourceChangedEvent
    return new DataSourceChangedEvent(shardingSchemaName, Maps.transformValues(dataSourceConfigurations, new Function<YamlDataSourceConfiguration, DataSourceConfiguration>() {
            @Override

            public DataSourceConfiguration apply(final YamlDataSourceConfiguration input) {

                return new DataSourceConfigurationYamlSwapper().swap(input);

            }

        }));

}

重要说明:本文档讨论了ShardingSphere注册中心的实现细节,在日常开发中使用注册中心时,您可能只需关注相关的功能和接口调用。

可以看到,这里再次用到了前面提到的YamlDataSourceConfiguration以及YamlEngine,不同的是这次的处理流程是从YamlDataSourceConfigurationDataSourceConfiguration。最终,我们构建了一个DataSourceChangedEvent,包含了shardingSchemaName以及一个dataSourceConfigurations对象。

关于整个Listener机制,可以简单归纳为通过监听注册中心上相关数据项的操作情况来生成具体的事件,并对事件进行包装之后再进行转发。至于如何处理这些转发后的事件,取决于具体的应用场景,典型的一个应用场景就是控制数据访问的熔断,让我们一起来看一下。

注册中心的应用:数据访问熔断机制 #

ShardingOrchestrationFacade 是一个典型的外观类,通过分析代码的调用关系,我们发现该类的创建过程都发生在sharding-jdbc-orchestration工程的几个DataSource类中。我们先来到AbstractOrchestrationDataSource这个抽象类,该类的核心变量如下所示:

private final ShardingOrchestrationFacade shardingOrchestrationFacade;

//是否熔断

private boolean isCircuitBreak;

private final Map<String, DataSourceConfiguration> dataSourceConfigurations = new LinkedHashMap<>();

注意到这里还有一个isCircuitBreak变量,用来表示是否需要进行熔断,接下来我们会对熔断机制以及该变量的使用方法做详细展开。

我们继续来看AbstractOrchestrationDataSource的构造函数,如下所示:

public AbstractOrchestrationDataSource(final ShardingOrchestrationFacade shardingOrchestrationFacade) {

        this.shardingOrchestrationFacade = shardingOrchestrationFacade;

        //通过 EventBus 注册自己

        ShardingOrchestrationEventBus.getInstance().register(this);

}

可以看到这里用到了Guava中EventBus的register方法,这个方法用于对注册事件的订阅。在前面的内容中,我们留下了一个疑问,即所创建的这些ShardingOrchestrationEvent是如何被处理的呢?

答案就在这里进行了揭晓,即所有通过EventBus的post方法所发布的事件的最终消费者就是这个AbstractOrchestrationDataSource类以及它的各个子类。而在AbstractOrchestrationDataSource类中就存在了如下所示的renew方法,用于处理CircuitStateChangedEvent事件:

@Subscribe

public final synchronized void renew(final CircuitStateChangedEvent circuitStateChangedEvent) {

        isCircuitBreak = circuitStateChangedEvent.isCircuitBreak();

}

在这个方法上添加了@Subscribe注解,即一旦在系统中生成了CircuitStateChangedEvent事件,这个方法就可以自动响应这类事件。在这个处理方法中,我们看到它从CircuitStateChangedEvent事件中获取了是否熔断的信息并赋值给前面介绍的isCircuitBreak变量。

AbstractOrchestrationDataSourcegetConnection方法中调用了getDataSource抽象方法以获取特定的DataSource,进而获取特定的Connection,如下所示:

@Override

public final Connection getConnection() throws SQLException {

        return isCircuitBreak ? new CircuitBreakerDataSource().getConnection() : getDataSource().getConnection();

}

在这里,我们看到了isCircuitBreak变量的作用。当该变量为真时,我们返回的是一个特定的CircuitBreakerDataSource用于完成熔断操作。所谓熔断,其作用类似于我们家用的保险丝,当某个服务出现不可用或响应超时的情况时,为了防止整个系统出现雪崩,暂时停止对该服务的调用。

那么 ShardingSphere 如何实现这一点呢?我们来看一下CircuitBreakerDataSource类, 它的实现如下所示:

public final class CircuitBreakerDataSource extends AbstractUnsupportedOperationDataSource implements AutoCloseable {

    @Override

    public void close() {

    }

    @Override

    public Connection getConnection() {

        return new CircuitBreakerConnection();

    }

    @Override

    public Connection getConnection(final String username, final String password) {

        return new CircuitBreakerConnection();

    }

    @Override

    public PrintWriter getLogWriter() {

        return null;

    }

    @Override

    public void setLogWriter(final PrintWriter out) {

    }

    @Override

    public Logger getParentLogger() {

        return null;

    }

}

可以看到这个类的getConnection方法返回了一个CircuitBreakerConnection,而这个CircuitBreakerConnection中的createStatementprepareStatement方法分别返回了CircuitBreakerStatementCircuitBreakerPreparedStatement,我们发现这些Statement类以及代表执行结果的CircuitBreakerResultSet类基本都是空实现,即不会对数据库执行任何具体的操作,相当于实现了访问的熔断。

那么回到一个问题,即什么时候会触发熔断机制,也就是什么时候会发送这个CircuitStateChangedEvent事件呢?让我们跟踪这个事件的创建过程,来到了如下所示的InstanceStateChangedListener类:

public final class InstanceStateChangedListener extends PostShardingOrchestrationEventListener {

    public InstanceStateChangedListener(final String name, final RegistryCenter regCenter) {

        super(regCenter, new StateNode(name).getInstancesNodeFullPath(OrchestrationInstance.getInstance().getInstanceId()));

    }

    @Override

    protected CircuitStateChangedEvent createShardingOrchestrationEvent(final DataChangedEvent event) {

        return new CircuitStateChangedEvent(StateNodeStatus.DISABLED.toString().equalsIgnoreCase(event.getValue()));

    }

}

通过上述代码,我们不难发现当StateNodeStatusDISABLED时,也就是当前的节点已经不可用时会发送CircuitStateChangedEvent,从而触发熔断机制。

从源码解析到日常开发 #

今天的内容虽然关注的是注册中心,但在篇幅上实际上更多的是在讨论基于事件驱动架构的设计和实现方法。基于配置信息,以及数据库实例信息的变更情况,ShardingSphere抽象了一套完整的事件发送和消费机制,来实现诸如数据访问熔断等非功能性需求。

我们注意到 ShardingSphere 实现事件驱动架构时使用了 Guava 框架中的 EventBus 工具类,在日常开发过程中,我们也可以直接使用这个类来构建自定义的事件处理机制。

小结与预告 #

注册中心是 ShardingSphere 编排治理机制中的一个重要组成部分,但注册中心本身也只是一个工具,需要根据不同的业务场景来设计对应的应用方式。在 ShardingSphere 中,配置信息管理以及数据库实例管理就是典型的应用场景,我们基于这些场景详细分析了基于注册中心的事件驱动架构的设计和实现方法,并给出了基于数据访问熔断机制的案例分析。

这里给你留一道思考题:在 ShardingSphere 中,如何把服务实例的状态与注册中心整合在一起进行编排治理?

在下一课时中,我们将介绍 ShardingSphere 编排治理中的另一个重要主题,即服务访问的链路监控和跟踪机制。