07 How to Implement Database and Table Sharding as Well as Enforce Routing Below

07 How to Implement Database and Table Sharding as well as Enforce Routing Below #

In the previous lesson, we introduced how to transform a single database and single table architecture into a sharding architecture based on business scenarios. Today, we will continue with the subsequent transformation work, mainly focusing on how to implement table sharding, sharding + table sharding, and forced routing.

System Transformation: How to Implement Table Sharding? #

Compared to sharding, table sharding involves splitting a table within the same database. Therefore, from the perspective of the data source, we only need to define a DataSource object. Here, we name this new DataSource as ds2:

spring.shardingsphere.datasource.names=ds2 
spring.shardingsphere.datasource.ds2.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds2.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds2.url=jdbc:mysql://localhost:3306/ds2
spring.shardingsphere.datasource.ds2.username=root
spring.shardingsphere.datasource.ds2.password=root

Similarly, to improve the access performance, we set the binding tables and broadcast tables:

spring.shardingsphere.sharding.binding-tables=health_record, health_task
spring.shardingsphere.sharding.broadcast-tables=health_level

Now, let’s recall the TableRuleConfiguration configuration. In this configuration, tableShardingStrategyConfig represents the table sharding strategy. Similar to the database sharding strategy, specifying a sharding key and sharding expression is the way to set the table sharding strategy:

spring.shardingsphere.sharding.tables.health_record.table-strategy.inline.sharding-column=record_id
spring.shardingsphere.sharding.tables.health_record.table-strategy.inline.algorithm-expression=health_record$->{record_id % 2}

In the code, it can be seen that for the health_record table, we set record_id as the sharding key for table sharding, and the sharding expression is health_record$->{record_id % 2}. This means that we will split the health_record table into two sharded tables, health_record0 and health_record1, based on record_id. With the table sharding strategy, along with the actualDataNodes and keyGeneratorConfig configuration options, we can complete the full table sharding configuration for the health_record table:

spring.shardingsphere.sharding.tables.health_record.actual-data-nodes=ds2.health_record$->{0..1}
spring.shardingsphere.sharding.tables.health_record.table-strategy.inline.sharding-column=record_id
spring.shardingsphere.sharding.tables.health_record.table-strategy.inline.algorithm-expression=health_record$->{record_id % 2}
spring.shardingsphere.sharding.tables.health_record.key-generator.column=record_id
spring.shardingsphere.sharding.tables.health_record.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.health_record.key-generator.props.worker.id=33

For the health_task table, the same configuration method can be used to perform table sharding:

spring.shardingsphere.sharding.tables.health_task.actual-data-nodes=ds2.health_task$->{0..1}
spring.shardingsphere.sharding.tables.health_task.table-strategy.inline.sharding-column=record_id
spring.shardingsphere.sharding.tables.health_task.table-strategy.inline.algorithm-expression=health_task$->{record_id % 2}
spring.shardingsphere.sharding.tables.health_task.key-generator.column=task_id
spring.shardingsphere.sharding.tables.health_task.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.health_task.key-generator.props.worker.id=33

As you can see, because health_task and health_record are bound tables, we also shard the health_task table based on the record_id column, which means that we will split the health_task table into two sharded tables, health_task0 and health_task1, based on record_id. Of course, the generated column for the auto-increment key still needs to be set as the task_id field in the health_task table.

With this, the complete configuration for table sharding is finished. Now, let’s rerun the HealthRecordTest unit test and you will see that the data has been correctly split into the sharded tables. The following images show the health_record0 and health_record1 tables after sharding:

Drawing 0.png Data in sharded table health_record0

Drawing 1.png Data in sharded table health_record1

And these are the health_task0 and health_task1 tables after sharding:

Drawing 2.png Data in sharded table health_task0

Drawing 3.png Data in sharded table health_task1

System Transformation: How to Implement Sharding + Table Sharding? #

After completing the independent sharding and table sharding operations, the third step in system transformation is to combine sharding and table sharding together. This process may sound complicated, but in fact, with the powerful configuration system provided by ShardingSphere, all developers need to do is integrate the configuration items for sharding and table sharding. Here, we create 3 new data sources: ds3, ds4, and ds5:

spring.shardingsphere.datasource.names=ds3,ds4,ds5 
spring.shardingsphere.datasource.ds3.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds3.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds3.url=jdbc:mysql://localhost:3306/ds3
spring.shardingsphere.datasource.ds3.username=root
spring.shardingsphere.datasource.ds3.password=root

spring.shardingsphere.datasource.ds4.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds4.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds4.url=jdbc:mysql://localhost:3306/ds4
spring.shardingsphere.datasource.ds4.username=root
spring.shardingsphere.datasource.ds4.password=root 

spring.shardingsphere.datasource.ds5.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.ds5.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds5.url=jdbc:mysql://localhost:3306/ds5
spring.shardingsphere.datasource.ds5.username=root
spring.shardingsphere.datasource.ds5.password=root

Note that there are now 3 data sources, named ds3, ds4, and ds5. Therefore, to shard data to the corresponding data sources based on user_id, we need to adjust the row expression. The row expression should now be ds$->{user_id % 3 + 3}:

spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=ds$->{user_id % 3 + 3}
spring.shardingsphere.sharding.binding-tables=health_record,health_task
spring.shardingsphere.sharding.broadcast-tables=health_level

For the tables health_record and health_task, the corresponding row expressions also need to be adjusted. We set the actual-data-nodes to ds\(->{3..5}.health_record\)->{0..2}, which means each original table will be split into 3 sub-tables:

spring.shardingsphere.sharding.tables.health_record.actual-data-nodes=ds$->{3..5}.health_record$->{0..2}
spring.shardingsphere.sharding.tables.health_record.table-strategy.inline.sharding-column=record_id
spring.shardingsphere.sharding.tables.health_record.table-strategy.inline.algorithm-expression=health_record$->{record_id % 3}
spring.shardingsphere.sharding.tables.health_record.key-generator.column=record_id
spring.shardingsphere.sharding.tables.health_record.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.health_record.key-generator.props.worker.id=33
spring.shardingsphere.sharding.tables.health_task.actual-data-nodes=ds$->{3..5}.health_task$->{0..2}
spring.shardingsphere.sharding.tables.health_task.table-strategy.inline.sharding-column=record_id
spring.shardingsphere.sharding.tables.health_task.table-strategy.inline.algorithm-expression=health_task$->{record_id % 3}
spring.shardingsphere.sharding.tables.health_task.key-generator.column=task_id
spring.shardingsphere.sharding.tables.health_task.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.health_task.key-generator.props.worker.id=33

With this configuration, we have completed the integration of the sharding and splitting configuration scheme. As can be seen, we did not introduce any new configuration items to re-execute the unit test to confirm whether the data has been correctly sharded and split. Here are the health_record0, health_record1, and health_record2 tables in ds3:

Drawing 4.png Data in the health_record0 table in ds3

Drawing 5.png Data in the health_record1 table in ds3

Drawing 6.png Data in the health_record2 table in ds3

Here are the health_record0, health_record1, and health_record2 tables in ds4:

Drawing 7.png Data in the health_record0 table in ds4

Drawing 8.png Data in the health_record1 table in ds4

Drawing 9.png Data in the health_record2 table in ds4

And here are the health_record0, health_record1, and health_record2 tables in ds5:

Drawing 10.png Data in the health_record0 table in ds5

Drawing 11.png Data in the health_record1 table in ds5

Drawing 12.png Data in the health_record2 table in ds5

For the health_task table, we get a similar sharding result.

System Refactoring: How to Implement Forced Routing? #

From the perspective of SQL execution, sharding can be seen as a routing mechanism, which means routing SQL statements to target databases or tables and retrieving data. On the basis of implementing sharding, we will introduce a different routing method, which is forced routing.

What is Forced Routing? #

Forced routing is different from general sharding routing. It does not use any sharding keys and sharding strategies. We know that ShardingSphere implements the rewriting of the JDBC specifications by parsing SQL statements, extracting sharding keys, and setting sharding strategies to perform sharding. However, if we don’t have a sharding key, does that mean we can only access all databases and tables for full routing? Obviously, this approach is not suitable. Sometimes, we need to open a “backdoor” for SQL execution, allowing the target databases and tables to be set externally even without a sharding key. This is the design concept of forced routing.

In ShardingSphere, forced routing is implemented through the Hint mechanism. Here, we further explain the concept of Hint. In relational databases, Hint plays a very important role as a supplementary SQL syntax. It allows users to influence the execution of SQL statements, change the execution plan of SQL statements, and perform special optimizations on SQL statements. Many database tools also provide special Hint syntax. Taking MySQL as an example, one of the typical ways to use Hint is to force the execution and ignore mechanism of all indexes.

In MySQL, the forced index ensures that the SQL statement to be executed only acts on the specified index. This can be achieved by using the FORCE INDEX Hint syntax:

SELECT * FROM TABLE1 FORCE INDEX (FIELD1)

Similarly, the IGNORE INDEX Hint syntax allows the index set on specific fields to be ignored:

SELECT * FROM TABLE1 IGNORE INDEX (FIELD1, FIELD2)

For scenarios where the sharding field is not determined by SQL but by other external conditions, the sharding field can be flexibly injected using SQL Hint.

How to Design and Develop Forced Routing? #

The design and development process of forced routing based on Hint needs to follow certain conventions, and ShardingSphere also provides a dedicated HintManager to simplify the development process of forced routing.

  • HintManager

The usage of the HintManager class is relatively fixed. We can understand the operations it contains by examining the class definition and core variables in the source code:

public final class HintManager implements AutoCloseable {

    // Stores the HintManager instance based on ThreadLocal
    private static final ThreadLocal<HintManager> HINT_MANAGER_HOLDER = new ThreadLocal<>();
    // Sharding values for databases
    private final Multimap<String, Comparable<?>> databaseShardingValues = HashMultimap.create();
    // Sharding values for tables
    private final Multimap<String, Comparable<?>> tableShardingValues = HashMultimap.create();
    // Whether there are only database sharding values
    private boolean databaseShardingOnly;
    // Whether to route only to master databases
    private boolean masterRouteOnly;
    ...
}

Introduction #

In terms of variable definition, we noticed that HintManager uses ThreadLocal to store the HintManager instance. Obviously, based on this approach, the scope of the shard information is limited to the current thread. We also see two Multimap objects for storing database shard values and table shard values respectively, as well as flags for specifying whether only database sharding is used and whether only the master database is routed. It can be imagined that HintManager provides a set of get/set methods based on these variables for developers to set shard keys according to specific business scenarios.

At the same time, in the definition of the class, we also noticed that HintManager implements the AutoCloseable interface, which is a new interface introduced in JDK 7 for automatically releasing resources. The AutoCloseable interface has only one close method, and we can implement this method to release various custom resources.  

public interface AutoCloseable {
    void close() throws Exception;
}

  Before JDK 1.7, we needed to manually release resources using the finally statement in a try/catch/finally block. With the AutoCloseable interface, when the try statement ends, these resources will be automatically closed without implementing the finally statement. JDK will callback the close method to achieve this. This mechanism is called “try-with-resources”. The AutoCloseable interface also provides syntactic sugar, allowing multiple resources implementing this interface to be used simultaneously in a try statement and separated by semicolons.

For HintManager, the so-called resources are actually the HintManager instance stored in ThreadLocal. The following code implements the close method of the AutoCloseable interface to release resources:

public static void clear() {
    HINT_MANAGER_HOLDER.remove();
}

@Override
public void close() {
    HintManager.clear();
}

The creation process of HintManager uses the typical singleton design pattern. The following code shows how to obtain or set the HintManager instance for the current thread from ThreadLocal through a static getInstance method.

public static HintManager getInstance() {
    Preconditions.checkState(null == HINT_MANAGER_HOLDER.get(), "Hint has previous value, please clear first.");
    HintManager result = new HintManager();
    HINT_MANAGER_HOLDER.set(result);
    return result;
}

After understanding the basic structure of HintManager, the process of obtaining HintManager in an application becomes very simple. Here is the recommended usage:

try (HintManager hintManager = HintManager.getInstance();
    Connection connection = dataSource.getConnection();
    Statement statement = connection.createStatement()) {
    ...
}

As you can see, we obtain HintManager, Connection, and Statement instances in the try statement, and then we can use these instances to perform specific SQL executions.

Implement and configure forcing sharding algorithm #

Developing a forced sharding based on Hint relies on configuration. Before introducing the configuration related to Hint, let’s recall the design of the configuration system in ShardingSphere in lesson 5: “How is the configuration system in ShardingSphere designed?”. We know that TableRuleConfiguration contains two ShardingStrategyConfigurations, each used to set the database sharding strategy and table sharding strategy. ShardingSphere specifically provides HintShardingStrategyConfiguration for configuring the sharding strategy based on Hint. The following code demonstrates this:

public final class HintShardingStrategyConfiguration implements ShardingStrategyConfiguration {

    private final HintShardingAlgorithm shardingAlgorithm;

    public HintShardingStrategyConfiguration(final HintShardingAlgorithm shardingAlgorithm) {
        Preconditions.checkNotNull(shardingAlgorithm, "ShardingAlgorithm is required.");
        this.shardingAlgorithm = shardingAlgorithm;
    }
}

As you can see, HintShardingStrategyConfiguration needs to set a HintShardingAlgorithm. HintShardingAlgorithm is an interface, and we need to provide its implementation to perform sharding based on the Hint information.

public interface HintShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
    // Perform sharding based on the Hint information
    Collection<String> doSharding(Collection<String> availableTargetNames, HintShardingValue<T> shardingValue);
}

In ShardingSphere, there is a built-in implementation of HintShardingAlgorithm called DefaultHintShardingAlgorithm, but this implementation does not perform any sharding logic. It simply returns all the availableTargetNames passed in. The following code shows this:

public final class DefaultHintShardingAlgorithm implements HintShardingAlgorithm<Integer> {

    @Override
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final HintShardingValue<Integer> shardingValue) {
        return availableTargetNames;
    }
}

We can provide our own implementation of HintShardingAlgorithm and integrate it into HintShardingStrategyConfiguration as needed. For example, we can compare all available database and table shard values and accurately match them with the incoming forced shard keys to determine the target database and table information.

public final class MatchHintShardingAlgorithm implements HintShardingAlgorithm<Long> {
    @Override
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final HintShardingValue<Long> shardingValue) {
        Collection<String> result = new ArrayList<>();
        for (String each : availableTargetNames) {
            for (Long value : shardingValue.getValues()) {
                if (each.endsWith(String.valueOf(value))) {
                    result.add(each);
                }
            }
        }
        return result;
    }
}

Once a custom HintShardingAlgorithm implementation is provided, it needs to be added to the configuration system. Here, we use Yaml configuration style to complete this operation:

defaultDatabaseStrategy:
    hint:
        algorithmClassName: com.tianyilan.shardingsphere.demo.hint.MatchHintShardingAlgorithm

When ShardingSphere performs routing and finds that the Hint sharding algorithm is set in the TableRuleConfiguration, it will obtain the sharding values from the HintManager and perform routing operations.

How to route to the target database and table by means of forced routing? #

After understanding the concept and development process of forced routing, let’s go back to the case and give a specific implementation process using forced routing for databases as an example. In order to better organize the code structure, let’s first build two helper classes. One is DataSourceHelper for obtaining the DataSource. In this helper class, we create the DataSource by loading the .yaml configuration file:

public class DataSourceHelper {
    static DataSource getDataSourceForShardingDatabases() throws IOException, SQLException {
       return YamlShardingDataSourceFactory.createDataSource(getFile("/META-INF/hint-databases.yaml"));
    }

    private static File getFile(final String configFile) {
        return new File(Thread.currentThread().getClass().getResource(configFile).getFile());
    }
}

Here, we use the YamlShardingDataSourceFactory factory class. You can review the implementation of the Yaml configuration solution in Lesson 5.

The other helper class is HintManagerHelper, which wraps the HintManager. In this helper class, we set the database sharding value for forced routing by using the setDatabaseShardingValue method exposed by HintManager. In this example, we only want to get the target data from the first database. HintManager also provides methods like addDatabaseShardingValue and addTableShardingValue to set the sharding value for forced routing.

public class HintManagerHelper {
    static void initializeHintManagerForShardingDatabases(final HintManager hintManager) {
       hintManager.setDatabaseShardingValue(1L);
    }
}

Finally, let’s build a HintService to encapsulate the entire process of forced routing:

public class HintService {
    private static void processWithHintValueForShardingDatabases() throws SQLException, IOException {
       DataSource dataSource = DataSourceHelper.getDataSourceForShardingDatabases();
       try (HintManager hintManager = HintManager.getInstance();
              Connection connection = dataSource.getConnection();
              Statement statement = connection.createStatement()) {
           HintManagerHelper.initializeHintManagerForShardingDatabases(hintManager);
           ResultSet result = statement.executeQuery("select * from health_record");

           while (result.next()) {
              System.out.println(result.getInt(0) + result.getString(1));
            }
       }
    }
}

As can be seen, in the processWithHintValueForShardingDatabases method, we first obtain the target DataSource through DataSourceHelper. Then, we use the try-with-resources mechanism to obtain instances of HintManager, Connection, and Statement in the try statement, and set the sharding value for forced routing using the HintManagerHelper helper class. Finally, we execute a full table query through Statement and print the query result:

2020-05-25 21:58:13.932  INFO 20024 --- [           main] ShardingSphere-SQL                       : Logic SQL: select user_id, user_name from user
…
2020-05-25 21:58:13.932  INFO 20024 --- [           main] ShardingSphere-SQL                       : Actual SQL: ds1 ::: select user_id, user_name from user
6: user_6
7: user_7
8: user_8
9: user_9
10: user_10

By obtaining the log information during the execution process, we can see that the original logic SQL is “select user_id, user_name from user”, while the actual executed SQL is “ds1 ::: select user_id, user_name from user”. Obviously, the forced routing has taken effect, and we only obtain the User information from ds1.

Summary #

Continuing from the previous lesson, today we continue to explain the specific details of shardings for single-database and single-table architecture, as well as the implementation of sharding, sharding+sharding, and forced routing. With the experience of sharding, it is relatively easy to implement sharding and sharding+sharding, and the work done is just adjusting and setting the corresponding configuration items. Forced routing is a new routing mechanism, and we have spent a considerable amount of space explaining its concept and implementation methods, and provided a case analysis based on a business scenario.

Here’s a question for you to think about: How does ShardingSphere implement forced routing in the sharding+sharding scenario based on the Hint mechanism?

From a routing perspective, the read/write separation mechanism based on the database master-slave architecture can also be considered as a routing mechanism. In the next lesson, we will explain the read/write separation mechanism provided by ShardingSphere, and provide specific methods for integrating read/write separation with sharding+sharding and forced routing.