31 Configuration Center How to Implement Dynamic Management of Configuration Information Based on Configuration Center

31 Configuration Center How to Implement Dynamic Management of Configuration Information Based on Configuration Center #

ShardingSphere’s orchestration governance includes the following capabilities: dynamic configuration, registry center, database circuit breaker disablement, and call linkage.

Today, let’s first introduce the simplest configuration center, that is, how to achieve dynamic management of configuration information based on the configuration center.

Abstraction of Configuration Center in ShardingSphere #

The core interface ConfigCenter of the configuration center is located in the sharding-orchestration-config-api project and is defined as follows:

public interface ConfigCenter extends TypeBasedSPI {

    // Initialize the configuration center
    void init(ConfigCenterConfiguration config);

    // Get the configuration item data
    String get(String key);

    // Directly get the configuration item data
    String getDirectly(String key);

    // Check if the configuration item exists
    boolean isExisted(String key);

    // Get a list of sub-configuration items
    List<String> getChildrenKeys(String key);

    // Persist the configuration item
    void persist(String key, String value);

    // Update the configuration item
    void update(String key, String value);

    // Persist temporary data
    void persistEphemeral(String key, String value);

    // Monitor the configuration item or path
    void watch(String key, DataChangedEventListener dataChangedEventListener);

    // Close the configuration center
    void close();
}

Among the above methods, the only method worth mentioning is the watch method, which takes a DataChangedEventListener interface representing the event listener, as shown below:

public interface DataChangedEventListener {

    // Triggered when the data is changed
    void onChange(DataChangedEvent dataChangedEvent);

}

The DataChangedEvent class used here is defined as follows. We can see that there are three types of events: UPDATED, DELETED, and IGNORED:

public final class DataChangedEvent {

    private final String key;

    private final String value;

    private final ChangedType changedType;

    public enum ChangedType {

        UPDATED, DELETED, IGNORED

    }

}

We also noticed that the ConfigCenter interface inherits the TypeBasedSPI interface, so it integrates the SPI mechanism. In ShardingSphere, the ConfigCenter interface has two implementation classes, ApolloConfigCenter based on Apollo, and CuratorZookeeperConfigCenter based on ZooKeeper.

Let’s explain them one by one.

ApolloConfigCenter #

1. Implementation process of ApolloConfigCenter #

Let’s start with ApolloConfigCenter based on Apollo. Its init method is as follows:

@Override
public void init(final ConfigCenterConfiguration config) {
    // Get configuration information from the configuration object and set system properties
    System.getProperties().setProperty("app.id", properties.getProperty("appId", "APOLLO_SHARDING_SPHERE"));
    System.getProperties().setProperty("env", properties.getProperty("env", "DEV"));
    System.getProperties().setProperty(ConfigConsts.APOLLO_CLUSTER_KEY, properties.getProperty("clusterName", ConfigConsts.CLUSTER_NAME_DEFAULT));
    System.getProperties().setProperty(ConfigConsts.APOLLO_META_KEY, config.getServerLists());
    
    // Build an ApolloConfig based on the configuration object
    apolloConfig = ConfigService.getConfig(config.getNamespace());
}

The purpose of the init method above is to set system properties and build a Config object at the same time. In Apollo, based on this Config object, operations on configuration items can be implemented. For example:

@Override
public String get(final String key) {
    return apolloConfig.getProperty(key.replace("/", "."), "");
}

@Override
public String getDirectly(final String key) {
    return get(key);
}
public boolean isExisted(final String key) {

    return !Strings.isNullOrEmpty(get(key));

}


注意这里的 getDirectly 方法和 get 方法的处理方式实际上是一致的而对于 Apollo 而言getChildrenKeyspersistupdate 和 persistEphemeral 等方法都是无效的因为不支持这样的操作但是对于常见的监听机制Apollo 也提供了它的实现方案可以通过对 Config 对象添加 ChangeListener 来实现监听效果如下所示

@Override

public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {

    //添加 Apollo 中的监听器

 apolloConfig.addChangeListener(new ConfigChangeListener() {

        @Override

        public void onChange(final ConfigChangeEvent changeEvent) {

            for (String key : changeEvent.changedKeys()) {

              //获取 Apollo 监听事件

                ConfigChange change = changeEvent.getChange(key);

                DataChangedEvent.ChangedType changedType = getChangedType(change.getChangeType());

                if (DataChangedEvent.ChangedType.IGNORED != changedType) {

                   //将 Apollo 中的监听事件转化为 ShardingSphere 中的监听事件

                   //通过 EventListener 触发事件

                    dataChangedEventListener.onChange(new DataChangedEvent(key, change.getNewValue(), changedType));

                }

            }

        }

    }, Sets.newHashSet(key));

}

上述代码的逻辑在于当事件被 Apollo 监听,并触发上述 watch 方法时,我们会将 Apollo 中的事件类型转化为 ShardingSphere 中自身的事件类型,并通过 DataChangedEventListener 进行传播和处理。

2.ShardingSphere 中的事件驱动架构 #

讲到 DataChangedEventListener,我们不得不对 ShardingSphere 中的事件驱动框架做一些展开。

显然,从命名上看,DataChangedEventListener 是一种事件监听器,用于监听各种 DataChangedEvent。

注意到 ShardingSphere 并没有提供 DataChangedEventListener 接口的任何实现类,而是大量采用了匿名方法进行事件的监听,一种典型的实现方式如下所示:

new DataChangedEventListener() {

 @Override

 public void onChange(final DataChangedEvent dataChangedEvent) {

    

//通过 EventBus 发布事件

                eventBus.post(createXXXEvent(dataChangedEvent));

        }

  }

});

在通过 DataChangedEventListener 监听到某一个 DataChangedEvent 并进行传播时,ShardingSphere 的处理过程就是通过 EventBus 类的 post 方法将事件进行进一步转发。这里使用的 EventBus 同样来自 Google 的 Guava 框架,代表了一种事件总线的实现方式。

现在,事件已经可以通过 EventBus 进行发送了,那么这些被发送的事件是怎么被消费的呢?在 ShardingSphere 中,存在一个 ShardingOrchestrationEventBus 包装类,包装了对 EventBus 的使用过程。

这个包装过程非常简单,只是使用单例模式构建了一个 EventBus 对象而已,如下所示:

public final class ShardingOrchestrationEventBus {

    private static final EventBus INSTANCE = new EventBus();

    //使用单例模式构建单例对象

    public static EventBus getInstance() {

        return INSTANCE;

    }

}

如果我们想要订阅通过 EventBus 发送的事件,只要把自身注册到 EventBus 上即可,可以直接通过 EventBus 提供的 register 方法实现这一目标,如下所示:

ShardingOrchestrationEventBus.getInstance().register(this);

另一方面,在 Guava 的 EventBus 机制中,提供了 @Subscribe 注解用来标识对具体某一种事件的处理方法。一旦在某个方法上添加了 @Subscribe 注解,这个方法就可以自动用来处理所传入的事件。

所以,我们进一步总结事件订阅者的代码结构,可以得到如下所示的伪代码:

public class Subscriber {

 public Subscriber() {

    

    //将自己注册到 EventBus 中

    ShardingOrchestrationEventBus.getInstance().register(this);

}

    @Subscribe

    public void renew(DataSourceChangedEvent dataSourceChangedEvent){

         //消费事件

        

    }

}

可以想象,ShardingSphere 中势必存在一批符合上述代码结构的实现类,用于监听配置中心所产生的配置信息变更事件。以如下所示的 LogicSchema 类为例,我们可以看到它的实现过程就是很典型的一种事件订阅者:

@Getter
                    curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
    
                } else {
    
                  //更新节点数据
                    curatorClient.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8));
    
                }
    
            } catch (final Exception ex) {
    
                throw new ShardingSphereException("persist key: %s, value: %s", ex);
    
            }
    
        }
    
    }
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
    
} else {

    update(key, value);

}

} catch (final Exception ex) {

    CuratorZookeeperExceptionHandler.handleException(ex);

}

The CreateMode.PERSISTENT mode is used here to create an interface, which means that a persistent node is created. In the persistEphemeral method, on the other hand, an ephemeral node is created by setting CreateMode.EPHEMERAL.

The update method is worth a look as well. We can see how to implement the specific method of updating data in a transaction using Curator:

@Override
public void update(final String key, final String value) {

    try {

        // Update data in transaction
        client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();

    } catch (final Exception ex) {

        CuratorZookeeperExceptionHandler.handleException(ex);

    }

}

Next, let’s look at the get method for retrieving data:

@Override
public String get(final String key) {

    // First, try to get data from cache. If not found, directly retrieve data through getDirectly

    TreeCache cache = findTreeCache(key);

    if (null == cache) {

        return getDirectly(key);

    }

    ChildData resultInCache = cache.getCurrentData(key);

    if (null != resultInCache) {

        return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);

    }

    return getDirectly(key);

}

Note that in this get method, ShardingSphere uses a caching mechanism to improve data retrieval efficiency. If the cache is not hit, the getDirectly method is called directly to retrieve data from Zookeeper.

Finally, let’s look at the watch method, which is the most critical:

@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {

    final String path = key + "/";

    if (!caches.containsKey(path)) {

        addCacheData(key);

    }

    TreeCache cache = caches.get(path);

    // Add Zookeeper listener
    cache.getListenable().addListener(new TreeCacheListener() {

        @Override
        public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws UnsupportedEncodingException {

            // Get Zookeeper event
            ChildData data = event.getData();

            if (null == data || null == data.getPath()) {

                return;

            }

            // Transform Zookeeper event to ShardingSphere event
            // Trigger event through EventListener
            DataChangedEvent.ChangedType changedType = getChangedType(event);

            if (DataChangedEvent.ChangedType.IGNORED != changedType) {

                dataChangedEventListener.onChange(new DataChangedEvent(data.getPath(), null == data.getData() ? null : new String(data.getData(), "UTF-8"), changedType));

            }

        }

    });

}

In this watch method, we can see that the final processing result is to transform the listener events in Zookeeper into ShardingSphere listener events and trigger events through the EventListener. We have already discussed this process in detail when we introduced ApolloConfigCenter.

From Source Code Analysis to Daily Development #

Many of the topics we discussed today can also be applied to daily development, including how to use Apollo and Zookeeper, two typical configuration center tools, to store and monitor configuration information. We can directly refer to and learn from the implementation details of these two tools based on our own needs, and we can expand the application scenarios and scope of configuration centers to various business data that needs to be dynamically managed. The implementation details of achieving this goal based on these two tools can be directly referenced and adopted.

Summary and Preview #

In this lesson, we focused on the abstraction and implementation process of the configuration center in ShardingSphere. The core mechanism of the configuration center is to achieve dynamic loading of configuration information, and both Apollo and Zookeeper provide listening mechanisms to achieve this goal. ShardingSphere integrates these two popular open-source tools, as well as the EventBus utility class in the Guava framework, to implement the entire event-driven architecture from event listening to subscription and consumption.

Here’s a question for you to think about: How does ShardingSphere abstract and unify the event generation and listening mechanisms in Apollo and Zookeeper into a unified event-driven architecture? Feel free to discuss with everyone in the comments, and I will review and answer them one by one.

Configuration center and registry center have certain similarities in terms of implementation, but they are aimed at different application scenarios. In the next lesson, we will introduce the implementation mechanism and application scenarios of the registry center in ShardingSphere.