32 Registry Center How to Implement Database Access Circuit Break Mechanism Based on Registry Center #
In the previous lesson, we discussed the related content of the configuration center in ShardingSphere. Today, we will continue to discuss another core function of the orchestration and governance module, which is the registry center. Compared to the configuration center, the registry center is more widely used in ShardingSphere.
Registry Center Implementation Classes in ShardingSphere #
Similar to the configuration center, the registry center in ShardingSphere also consists of three independent projects in terms of code structure, namely the API project representing abstract interfaces, and two concrete implementation projects: nacos and zookeeper-curator. As we can see, here we also use Zookeeper as one of the ways to implement the registry center, which was introduced in the previous lesson, and the other implementation method is based on Alibaba’s Nacos.
Let’s first take a look at the abstraction of the registry center in ShardingSphere, which is the RegistryCenter
interface shown below:
public interface RegistryCenter extends TypeBasedSPI {
// Initialize the registry center based on configuration information
void init(RegistryCenterConfiguration config);
// Get data
String get(String key);
// Get data directly
String getDirectly(String key);
// Check if a data item exists
boolean isExisted(String key);
// Get a list of child data items
List<String> getChildrenKeys(String key);
// Persist data item
void persist(String key, String value);
// Update data item
void update(String key, String value);
// Persist ephemeral data
void persistEphemeral(String key, String value);
// Watch a data item or path
void watch(String key, DataChangedEventListener dataChangedEventListener);
// Close the registry center
void close();
// Initialize a lock for a data item
void initLock(String key);
// Acquire lock for a data item
boolean tryLock();
// Release lock for a data item
void tryRelease();
}
We can see that, except for the last few methods related to lock handling, RegistryCenter
is very similar to ConfigCenter
introduced in the previous lesson. From this point, it is not hard to imagine why Zookeeper can be used as both a configuration center and a typical solution for implementing the registry center. Following this line of thinking, let’s first take a look at the CuratorZookeeperRegistryCenter
, which is the registry center implementation class based on Zookeeper.
1. CuratorZookeeperRegistryCenter #
Let’s quickly go through the CuratorZookeeperRegistryCenter
class and find that the implementation process of the common interface methods is exactly the same as that of the CuratorZookeeperConfigCenter
. As for the newly added methods related to lock, the implementation is also very simple. We can directly use the InterProcessMutex
encapsulated by Curator, as shown below:
private InterProcessMutex leafLock;
@Override
public void initLock(final String key) {
leafLock = new InterProcessMutex(client, key);
}
@Override
@SneakyThrows
public boolean tryLock() {
return leafLock.acquire(5, TimeUnit.SECONDS);
}
@Override
@SneakyThrows
public void tryRelease() {
leafLock.release();
}
This is about the explanation of CuratorZookeeperRegistryCenter
. Next, let’s take a look at another implementation class of the registry center, NacosRegistryCenter
.
2. NacosRegistryCenter #
The Nacos framework also provides a client component called ConfigService
, which is used to implement the get
, getDirectly
, and isExisted
methods to retrieve data. These methods actually use the getConfig
method of ConfigService
for implementation, so we don’t need to discuss it in detail.
The persist
method of NacosRegistryCenter
actually calls its update
method, and the latter updates the data by using the publishConfig
method of ConfigService
, as shown below:
@Override
public void persist(final String key, final String value) {
update(key, value);
}
@Override
public void update(final String key, final String value) {
try {
String dataId = key.replace("/", ".");
String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
configService.publishConfig(dataId, group, value);
} catch (final NacosException ex) {
log.debug("exception for: {}", ex.toString());
}
}
Unlike Zookeeper, for Nacos, the methods getChildrenKeys
, persistEphemeral
, close
, initLock
, tryLock
, and tryRelease
cannot be implemented or are not necessary. As for the watch
method, ConfigService
also provides the addListener
method to use listeners. It also relies on the DataChangedEventListener
class introduced in the previous lesson to handle events, as shown below:
@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
try {
String dataId = key.replace("/", ".");
String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
configService.addListener(dataId, group, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
OrchestrationInstance...
private String instanceId;
private String ipAddress;
private String pid;
private String uuid;
private String separator = "@";
...
OrchestrationInstance.getInstance().withInstanceId(
"orchestration-" + InetAddress.getLocalHost().getHostName() + "-" + getProcessId() + "@-@-@-@")
.init();
StateService 中实现的方法比较简单,可以通过以下三个方法获取对应的状态信息:
- getAllInstances 接口返回的是获取注册中心中的实例集合。
- getServersNodePath 接口返回了注册中心中的数据库节点。
- persistInstanceOnline 接口用于记录当前数据库实例上线。
3.ShardingOrchestrationListenerManager #
ShardingOrchestrationListenerManager 类定义了 ShardingSphere 编排治理服务的所有待监听事件,同样也定义了每个事件在被监听后的处理逻辑,如下所示:
public ShardingOrchestrationListenerManager(final ShardingOrchestrationFacade shardingOrchestrationFacade) {
//注册实例上线监听器
configureStateChangedListenerEngine.register(new InstanceStateChangedListener(shardingOrchestrationFacade));
//注册规则配置监听器
configureStateChangedListenerEngine.register(new RuleChangedListener(shardingOrchestrationFacade));
//注册分片数据源配置监听器
configureStateChangedListenerEngine.register(new DataSourceChangedListener(shardingOrchestrationFacade));
//注册认证配置监听器
configureStateChangedListenerEngine.register(new AuthenticationChangedListener(shardingOrchestrationFacade));
//注册属性配置监听器
configureStateChangedListenerEngine.register(new PropChangedListener(shardingOrchestrationFacade));
//注册发布监听数据源状态的监听器
instanceStateChangedListenerEngine.register(new DataSourceListener(shardingOrchestrationFacade));
}
在 ShardingOrchestrationListenerManager 中,我们可以看到它是在构造方法中注册各种类型的 Listener,Listener 的类型包括 InstanceStateChangedListener、RuleChangedListener、DataSourceChangedListener、AuthenticationChangedListener 以及 PropChangedListener。这些监听器用于监听相应的事件,当事件发生时,会触发相应的处理逻辑。
听到这里,以上就是 ShardingOrchestrationListenerManager 类中的所有代码,注册了一组监听器但却不知道是如何触发相应事件的。实际上,事件的触发是由 RuleRegistry 并通过 ShardingRuleConfiguration 的 isInstanceDataSourceExists 方法决定的,下面我们着重来看一下这两块。
RuleRegistry #
触发事件的入口是在 RuleRegistry 的 getInstanceDataSourceMap 方法中,其内部创建了 RuleConfigurationListener 实例并调用了 registerListener 方法,如下所示:
private Map<String, Collection<String>> getInstanceDataSourceMap(final ShardingRuleConfiguration shardingRuleConfiguration, final DatabaseType databaseType, final Collection<String> dataSourceNames) {
RuleConfigurationListener ruleConfigListener = new RuleConfigurationListener(shardingRuleConfiguration, dataSourceNames);
registerListener(ruleConfigListener);
return ruleConfigListener.getInstanceDataSourceMap();
}
RuleConfigurationListener 类中没有太多复杂的逻辑,主要负责加载并解析 dataSourceNames 后进行 InstanceDataSourceMap 的创建,如下所示:
public class RuleConfigurationListener implements EventListener {
private volatile Map<String, Collection<String>> instanceDataSourceMap = new ConcurrentHashMap<>();
...
public RuleConfigurationListener(final ShardingRuleConfiguration shardingRuleConfiguration, final Collection<String> dataSourceNames) {
init(shardingRuleConfiguration.getShardingRuleBuilder().getSQLParserEngine(), dataSourceNames);
}
...
}
我们需要重点关注一下 RuleConfigurationListener 类中 register 方法的调用逻辑,这个方法的具体实现在 RuleRegistry 中,并其实就是在 RuleRegistry 中添加了一个广播事件,如下所示:
//监听 ShardingSphereEvent。
RuleRegistry.getInstance().register(userDataSourceNamesEvent);
走到这里是不是感觉整个流程已经清晰不少了呢?其实事件的广播和处理原理整体上来看非常简单,核心代码位于 AbstractRuleRegistry 类的 register 方法,如下所示:
public abstract class AbstractRuleRegistry<T extends RuleConfiguration> {
private final Collection<RuleConfiguration> configurations = new ConcurrentLinkedQueue<>();
public void register(final T ruleConfig) {
configurations.add(ruleConfig);
post(ruleConfig);
}
...
protected abstract void post(T ruleConfig);
}
这里的 post 方法就是广播事件的过程,它通过调用 ShardingSphereEventBus 的 post 方法来实现实际的事件广播,而 ShardingSphereEventBus 的真正调用逻辑位于 org.apache.shardingsphere.spi.YoungerSiblingFirstShardingSphereServiceLoader.SPI_PROVIDER.loadService(ShardingSphereEventBus.class).post(event)
中。
ShardingSphereEventBus 类是 ShardingSphere 提供的事件总线,通过 ShardingSphereEventBus 可以实现对自定义事件的发布、订阅以及事件的处理。ShardingSphereEventBus 是一个单例的类,它在初始化时会初始化事件处理线程池并启动一个消费者进行事件处理。目前 ShardingSphereEventBus 提供了两种事件的消费方式,一种是同步消费者,用于处理一些需要更快获得事件处理结果的场景;另一种是异步消费者,用于处理一些比较耗时的任务。最后,在 post 方法中会通过调用 YoungerSiblingFirstShardingSphereServiceLoader.SPI_PROVIDER 中的方法获取到 ShardingEventListener 的实例,然后根据实例的类型来分别调用不同事件的处理器进行事件的处理。 以上就是整个 ShardingSphere 编排治理服务的核心代码,通过上面引用的三个类将 ShardingSphere 编排服务的规则匹配与注册中心的数据状态管理相结合,使得治理数据与配置的变更变得可控,更加方便了用户发布数据。
instanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split(DELIMITER)[0] + DELIMITER + UUID.randomUUID().toString();
Please note that in the StateService, the mechanism for storing instances
and datasources
is different:
// Save Instance using the ephemeral node
public void persistInstanceOnline() {
regCenter.persistEphemeral(stateNode.getInstancesNodeFullPath(instance.getInstanceId()), "");
}
// Save DataSources using the persistent node
public void persistDataSourcesNode() {
regCenter.persist(stateNode.getDataSourcesNodeFullRootPath(), "");
}
You can see that the persistEphemeral
method is used to save instances
using the RegistryCenter’s ephemeral node mechanism, while the persist
method is used to save DataSources
using the persistent node mechanism. This is because it is intentional to handle availability design by designating running instances as ephemeral nodes, automatically registering them when they come online and cleaning them up when they go offline.
3. ShardingOrchestrationListenerManager #
Next, let’s take a look at the last variable ShardingOrchestrationListenerManager
in ShardingOrchestrationFacade
. From the naming, it seems that this class is used to manage various listener Listeners
for processing change events. Based on the analysis so far, it is evident that there should be two types of listeners in the system: one for monitoring configuration changes, and one for monitoring instance status changes.
Sure enough, in ShardingOrchestrationListenerManager
, we find two ListenerManager
: ConfigurationChangedListenerManager
and StateChangedListenerManager
, as shown below:
public final class ShardingOrchestrationListenerManager {
// Manager for configuration change listeners
private final ConfigurationChangedListenerManager configurationChangedListenerManager;
// Manager for state change listeners
private final StateChangedListenerManager stateChangedListenerManager;
public ShardingOrchestrationListenerManager(final String name, final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
configurationChangedListenerManager = new ConfigurationChangedListenerManager(name, regCenter, shardingSchemaNames);
stateChangedListenerManager = new StateChangedListenerManager(name, regCenter);
}
public void initListeners() {
configurationChangedListenerManager.initListeners();
stateChangedListenerManager.initListeners();
}
}
We create these two ListenerManager
objects and call their initListeners()
methods to initialize them. Taking ConfigurationChangedListenerManager
as an example, let’s see its internal structure:
public final class ConfigurationChangedListenerManager {
private final SchemaChangedListener schemaChangedListener;
private final PropertiesChangedListener propertiesChangedListener;
private final AuthenticationChangedListener authenticationChangedListener;
public ConfigurationChangedListenerManager(final String name, final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
schemaChangedListener = new SchemaChangedListener(name, regCenter, shardingSchemaNames);
propertiesChangedListener = new PropertiesChangedListener(name, regCenter);
authenticationChangedListener = new AuthenticationChangedListener(name, regCenter);
}
public void initListeners() {
schemaChangedListener.watch(ChangedType.UPDATED, ChangedType.DELETED);
propertiesChangedListener.watch(ChangedType.UPDATED);
authenticationChangedListener.watch(ChangedType.UPDATED);
}
}
As you can see, it defines three listeners: SchemaChangedListener
, PropertiesChangedListener
, and AuthenticationChangedListener
. Obviously, they correspond to the three top-level configuration items in the configuration structure introduced by ConfigurationService
: schema
, props
, and authentication
. Then, for each of these configuration items, we add monitoring for specific operations as needed. From the above code, we can see that for the schema
configuration item, we need to respond to UPDATE and DELETE events, while for the props
and authentication
configuration items, we only need to pay attention to the UPDATE operation.
Because these specific events and the handling of the monitoring mechanism are similar, let’s take a closer look at SchemaChangedListener
. SchemaChangedListener
extends the abstract class PostShardingOrchestrationEventListener
, which in turn implements the ShardingOrchestrationListener
interface. Let’s first look at the definition of this interface:
public interface ShardingOrchestrationListener {
// Listen to events
void watch(ChangedType... watchedChangedTypes);
}
PostShardingOrchestrationEventListener
implements this interface, and the implementation process is as follows:
public abstract class PostShardingOrchestrationEventListener implements ShardingOrchestrationListener {
// Create an EventBus
private final EventBus eventBus = ShardingOrchestrationEventBus.getInstance();
private final RegistryCenter regCenter;
private final String watchKey;
@Override
public final void watch(final ChangedType... watchedChangedTypes) {
final Collection<ChangedType> watchedChangedTypeList = Arrays.asList(watchedChangedTypes);
regCenter.watch(watchKey, new DataChangedEventListener() {
@Override
public void onChange(final DataChangedEvent dataChangedEvent) {
if (watchedChangedTypeList.contains(dataChangedEvent.getChangedType())) {
// Publish event through EventBus
eventBus.post(createShardingOrchestrationEvent(dataChangedEvent));
}
}
});
}
protected abstract ShardingOrchestrationEvent createShardingOrchestrationEvent(DataChangedEvent event);
}
The core mechanism of the above code is to add an event handler to specific events using the watch
method of RegistryCenter
, and the event processing process is further forwarded using the post
method of Guava’s EventBus
. The specific event types to be forwarded are provided by the abstract method createShardingOrchestrationEvent
, which needs to be implemented by each subclass of PostShardingOrchestrationEventListener
.
Next, let’s take a closer look at the method responsible for event creation in the subclass SchemaChangedListener
. We will expand on the createDataSourceChangedEvent
method, which is a typical event creation method:
private DataSourceChangedEvent createDataSourceChangedEvent(final String shardingSchemaName, final DataChangedEvent event) {
Map<String, YamlDataSourceConfiguration> dataSourceConfigurations = (Map) YamlEngine.unmarshal(event.getValue());
Preconditions.checkState(null != dataSourceConfigurations && !dataSourceConfigurations.isEmpty(), "No available data sources to load for orchestration.");
// create DataSourceChangedEvent
return new DataSourceChangedEvent(shardingSchemaName, Maps.transformValues(dataSourceConfigurations, new Function<YamlDataSourceConfiguration, DataSourceConfiguration>() {
@Override
public DataSourceConfiguration apply(final YamlDataSourceConfiguration input) {
return new DataSourceConfigurationYamlSwapper().swap(input);
}
}));
}
重要说明:本文档讨论了ShardingSphere注册中心的实现细节,在日常开发中使用注册中心时,您可能只需关注相关的功能和接口调用。
可以看到,这里再次用到了前面提到的YamlDataSourceConfiguration
以及YamlEngine
,不同的是这次的处理流程是从YamlDataSourceConfiguration
到DataSourceConfiguration
。最终,我们构建了一个DataSourceChangedEvent
,包含了shardingSchemaName
以及一个dataSourceConfigurations
对象。
关于整个Listener机制,可以简单归纳为通过监听注册中心上相关数据项的操作情况来生成具体的事件,并对事件进行包装之后再进行转发。至于如何处理这些转发后的事件,取决于具体的应用场景,典型的一个应用场景就是控制数据访问的熔断,让我们一起来看一下。
注册中心的应用:数据访问熔断机制 #
ShardingOrchestrationFacade
是一个典型的外观类,通过分析代码的调用关系,我们发现该类的创建过程都发生在sharding-jdbc-orchestration
工程的几个DataSource
类中。我们先来到AbstractOrchestrationDataSource
这个抽象类,该类的核心变量如下所示:
private final ShardingOrchestrationFacade shardingOrchestrationFacade;
//是否熔断
private boolean isCircuitBreak;
private final Map<String, DataSourceConfiguration> dataSourceConfigurations = new LinkedHashMap<>();
注意到这里还有一个isCircuitBreak
变量,用来表示是否需要进行熔断,接下来我们会对熔断机制以及该变量的使用方法做详细展开。
我们继续来看AbstractOrchestrationDataSource
的构造函数,如下所示:
public AbstractOrchestrationDataSource(final ShardingOrchestrationFacade shardingOrchestrationFacade) {
this.shardingOrchestrationFacade = shardingOrchestrationFacade;
//通过 EventBus 注册自己
ShardingOrchestrationEventBus.getInstance().register(this);
}
可以看到这里用到了Guava中EventBus的register
方法,这个方法用于对注册事件的订阅。在前面的内容中,我们留下了一个疑问,即所创建的这些ShardingOrchestrationEvent
是如何被处理的呢?
答案就在这里进行了揭晓,即所有通过EventBus的post
方法所发布的事件的最终消费者就是这个AbstractOrchestrationDataSource
类以及它的各个子类。而在AbstractOrchestrationDataSource
类中就存在了如下所示的renew
方法,用于处理CircuitStateChangedEvent
事件:
@Subscribe
public final synchronized void renew(final CircuitStateChangedEvent circuitStateChangedEvent) {
isCircuitBreak = circuitStateChangedEvent.isCircuitBreak();
}
在这个方法上添加了@Subscribe
注解,即一旦在系统中生成了CircuitStateChangedEvent
事件,这个方法就可以自动响应这类事件。在这个处理方法中,我们看到它从CircuitStateChangedEvent
事件中获取了是否熔断的信息并赋值给前面介绍的isCircuitBreak
变量。
在AbstractOrchestrationDataSource
的getConnection
方法中调用了getDataSource
抽象方法以获取特定的DataSource
,进而获取特定的Connection
,如下所示:
@Override
public final Connection getConnection() throws SQLException {
return isCircuitBreak ? new CircuitBreakerDataSource().getConnection() : getDataSource().getConnection();
}
在这里,我们看到了isCircuitBreak
变量的作用。当该变量为真时,我们返回的是一个特定的CircuitBreakerDataSource
用于完成熔断操作。所谓熔断,其作用类似于我们家用的保险丝,当某个服务出现不可用或响应超时的情况时,为了防止整个系统出现雪崩,暂时停止对该服务的调用。
那么 ShardingSphere 如何实现这一点呢?我们来看一下CircuitBreakerDataSource
类, 它的实现如下所示:
public final class CircuitBreakerDataSource extends AbstractUnsupportedOperationDataSource implements AutoCloseable {
@Override
public void close() {
}
@Override
public Connection getConnection() {
return new CircuitBreakerConnection();
}
@Override
public Connection getConnection(final String username, final String password) {
return new CircuitBreakerConnection();
}
@Override
public PrintWriter getLogWriter() {
return null;
}
@Override
public void setLogWriter(final PrintWriter out) {
}
@Override
public Logger getParentLogger() {
return null;
}
}
可以看到这个类的getConnection
方法返回了一个CircuitBreakerConnection
,而这个CircuitBreakerConnection
中的createStatement
和prepareStatement
方法分别返回了CircuitBreakerStatement
和CircuitBreakerPreparedStatement
,我们发现这些Statement
类以及代表执行结果的CircuitBreakerResultSet
类基本都是空实现,即不会对数据库执行任何具体的操作,相当于实现了访问的熔断。
那么回到一个问题,即什么时候会触发熔断机制,也就是什么时候会发送这个CircuitStateChangedEvent
事件呢?让我们跟踪这个事件的创建过程,来到了如下所示的InstanceStateChangedListener
类:
public final class InstanceStateChangedListener extends PostShardingOrchestrationEventListener {
public InstanceStateChangedListener(final String name, final RegistryCenter regCenter) {
super(regCenter, new StateNode(name).getInstancesNodeFullPath(OrchestrationInstance.getInstance().getInstanceId()));
}
@Override
protected CircuitStateChangedEvent createShardingOrchestrationEvent(final DataChangedEvent event) {
return new CircuitStateChangedEvent(StateNodeStatus.DISABLED.toString().equalsIgnoreCase(event.getValue()));
}
}
通过上述代码,我们不难发现当StateNodeStatus
为DISABLED
时,也就是当前的节点已经不可用时会发送CircuitStateChangedEvent
,从而触发熔断机制。
从源码解析到日常开发 #
今天的内容虽然关注的是注册中心,但在篇幅上实际上更多的是在讨论基于事件驱动架构的设计和实现方法。基于配置信息,以及数据库实例信息的变更情况,ShardingSphere抽象了一套完整的事件发送和消费机制,来实现诸如数据访问熔断等非功能性需求。
我们注意到 ShardingSphere 实现事件驱动架构时使用了 Guava 框架中的 EventBus 工具类,在日常开发过程中,我们也可以直接使用这个类来构建自定义的事件处理机制。
小结与预告 #
注册中心是 ShardingSphere 编排治理机制中的一个重要组成部分,但注册中心本身也只是一个工具,需要根据不同的业务场景来设计对应的应用方式。在 ShardingSphere 中,配置信息管理以及数据库实例管理就是典型的应用场景,我们基于这些场景详细分析了基于注册中心的事件驱动架构的设计和实现方法,并给出了基于数据访问熔断机制的案例分析。
这里给你留一道思考题:在 ShardingSphere 中,如何把服务实例的状态与注册中心整合在一起进行编排治理?
在下一课时中,我们将介绍 ShardingSphere 编排治理中的另一个重要主题,即服务访问的链路监控和跟踪机制。