23 Execution Engine How to Master the Execution Model of Executor in Sharding Sphere Below

23 Execution Engine How to Master the Execution Model of Executor in ShardingSphere Below #

In the previous lesson, we have provided a comprehensive introduction to the SQLExecuteTemplate at the bottom layer of the ShardingSphere execution engine, as well as the StatementExecutor and PreparedStatementExecutor objects at the upper layer.

Today, we will go one step further and focus on the ShardingStatement and ShardingPreparedStatement objects, which are the users of the StatementExecutor and PreparedStatementExecutor respectively.

ShardingStatement #

Let’s start with the ShardingStatement class, the variables of which have been introduced in the previous content:

private final ShardingConnection connection;
private final StatementExecutor statementExecutor;
private boolean returnGeneratedKeys;
private SQLRouteResult sqlRouteResult;
private ResultSet currentResultSet;

The constructor of the ShardingStatement class is not complicated either. We can see that the StatementExecutor is created in this constructor:

public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
    super(Statement.class);
    this.connection = connection;
    // Create StatementExecutor
    statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);
}

Before we continue to introduce ShardingStatement, let’s review the class hierarchy related to it. In the article “06 | Specification Compatibility: What is the Relationship Between JDBC Specification and ShardingSphere?” the ShardingConnection mentioned states that ShardingSphere wraps its implementation class through the adapter pattern. In addition to the ShardingConnection class that has been introduced, it also includes the ShardingStatement and ShardingPreparedStatement that we will introduce today.

Based on this point, we can imagine that ShardingStatement should have a similar class hierarchy as the ShardingConnection:

Drawing 0.png

Then we come to the AbstractStatementAdapter class in the above diagram. The style of many methods here is consistent with the parent class of ShardingConnection, AbstractConnectionAdapter. For example, the setPoolable method is shown as follows:

public final void setPoolable(final boolean poolable) throws SQLException {
    this.poolable = poolable;
    recordMethodInvocation(targetClass, "setPoolable", new Class[] {boolean.class}, new Object[] {poolable});
    forceExecuteTemplate.execute((Collection) getRoutedStatements(), new ForceExecuteCallback<Statement>() {

        @Override
        public void execute(final Statement statement) throws SQLException {
            statement.setPoolable(poolable);
        }
    });

The recordMethodInvocation method, ForceExecuteTemplate, and ForceExecuteCallback mentioned here have been introduced in the previous article “03 | Specification Compatibility: What is the Relationship Between JDBC Specification and ShardingSphere?”. They will not be elaborated here.

Similarly, the role of the parent class AbstractUnsupportedOperationStatement of AbstractStatementAdapter is completely the same as that of the AbstractUnsupportedOperationConnection.

After understanding the class hierarchy of ShardingStatement, let’s look at its core method, which is the executeQuery method:

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
    if (Strings.isNullOrEmpty(sql)) {
        throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
    }
    ResultSet result;
    try {
        // Clear relevant variables in StatementExecutor
        clearPrevious();
        // Execute route engine to get route result
        shard(sql);
        // Initialize StatementExecutor
        initStatementExecutor();
        // Call merge engine
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), statementExecutor.executeQuery());
        // Get merged result
        result = getResultSet(mergeEngine);
    } finally {
        currentResultSet = null;
    }
    currentResultSet = result;
    return result;
}

There are several sub-methods worth noting in this method, starting with the shard method:

private void shard(final String sql) {
    // Get ShardingRuntimeContext from Connection
    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
    // Create SimpleQueryShardingEngine
    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine());
    // Execute sharding routing and get route result
    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}

This code is the entry point of the routing engine. We create a SimpleQueryShardingEngine and call its shard method to get the route result object SQLRouteResult.

Then let’s take a look at the initStatementExecutor method as shown below:

private void initStatementExecutor() throws SQLException {
    statementExecutor.init(sqlRouteResult);
    replayMethodForStatements();
}

Here, the statementExecutor is initialized with the SQLRouteResult object, and then a replayMethodForStatements method is executed:

private void replayMethodForStatements() {
    for (Statement each : statementExecutor.getStatements()) {
        replayMethodsInvocation(each);
    }
}

This method actually calls the replayMethodsInvocation method based on reflection, and this replayMethodsInvocation method will execute the target method for all SQL operations of the Statement in statementExecutor.

Finally, we use the statementExecutor.executeQuery() method to get the result of the SQL execution, and use this result to create a merge engine MergeEngine, and get the final execution result through the merge engine MergeEngine.

The merge engine is an engine in ShardingSphere that is parallel to the SQL parsing engine, routing engine, and execution engine. We will start introducing this content in the next lesson, so we won’t go into details here.

Taking one of the executeUpdate methods in ShardingStatement as an example, you can see that its execution process is very similar to the previous executeQuery method:

@Override
public int executeUpdate(final String sql) throws SQLException {
    try {
        // Clear relevant variables in StatementExecutor
        clearPrevious();
        // Execute the routing engine to get the routing result
        shard(sql);
        // Initialize the StatementExecutor
        initStatementExecutor();
        return statementExecutor.executeUpdate();
    } finally {
        currentResultSet = null;
    }
}

Of course, for update operations, result merging is not necessary through the merge engine.

ShardingPreparedStatement #

Next, let’s take a look at the ShardingPreparedStatement class. The variables in this class are basically the same as the objects introduced earlier:

private final ShardingConnection connection;
private final String sql;
private final PreparedQueryShardingEngine shardingEngine;
private final PreparedStatementExecutor preparedStatementExecutor;
private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
private SQLRouteResult sqlRouteResult;
private ResultSet currentResultSet;

The creation of ShardingEngine, PreparedStatementExecutor, and BatchPreparedStatementExecutor objects all occur in the constructor of ShardingPreparedStatement.

Then let’s take a look at its representative method ExecuteQuery, as shown below:

@Override
public ResultSet executeQuery() throws SQLException {
    ResultSet result;
    try {
        clearPrevious();
        shard();
        initPreparedStatementExecutor();
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), preparedStatementExecutor.executeQuery());
        result = getResultSet(mergeEngine);
    } finally {
        clearBatch();
    }
    currentResultSet = result;
    return result;
}

We didn’t add comments here, but it’s important to understand the execution process of this method because its style is very similar to the same-named method in ShardingStatement.

There isn’t much more to say about ShardingPreparedStatement. Next, let’s take a look at its parent class AbstractShardingPreparedStatementAdapter, and see that this class holds a list of SetParameterMethodInvocation and a list of parameters:

private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();
private final List<Object> parameters = new ArrayList<>();

The SetParameterMethodInvocation class here directly inherits the JdbcMethodInvocation class mentioned when introducing ShardingConnection:

public final class SetParameterMethodInvocation extends JdbcMethodInvocation {

    @Getter
    private final int index;

    @Getter
    private final Object value;

    public SetParameterMethodInvocation(final Method method, final Object[] arguments, final Object value) {
        super(method, arguments);
        this.index = (int) arguments[0];
        this.value = value;
    }

    public void changeValueArgument(final Object value) {
        getArguments()[1] = value;
    }
}

For ShardingPreparedStatement, the purpose of this class is to add parameter information needed during SQL execution based on the method and parameters saved in JdbcMethodInvocation.

So its replaySetParameter method becomes the following style:

protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {
    setParameterMethodInvocations.clear();
    // Add parameter information
    addParameters(parameters);
    for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
        each.invoke(preparedStatement);
    }
}

One thing to note about AbstractShardingPreparedStatementAdapter is its class hierarchy. As shown in the diagram below, you can see that AbstractShardingPreparedStatementAdapter extends AbstractUnsupportedOperationPreparedStatement, and AbstractUnsupportedOperationPreparedStatement extends AbstractStatementAdapter and implements PreparedStatement:

Drawing 2.png

The reason for this class hierarchy is that PreparedStatement is originally based on Statement and adds various parameter setting functions. In other words, PreparedStatement should have all the functions of Statement. So on the one hand, AbstractStatementAdapter provides all the functionality of Statement; on the other hand, AbstractShardingPreparedStatementAdapter first inherits all the functionality of AbstractStatementAdapter, but it may have some unsupported functionality related to PreparedStatement, so it also provides AbstractUnsupportedOperationPreparedStatement class and is finally inherited by the AbstractShardingPreparedStatementAdapter adapter class.

This forms the complex class hierarchy shown in the figure above.

ShardingConnection #

After introducing ShardingStatement and ShardingPreparedStatement, let’s focus on their specific application scenarios, which is also the last part of the ShardingSphere execution engine.

By examining the calling relationships, we find that the entry points for creating these two classes are in the ShardingConnection class, which contains the createStatement method for creating ShardingStatement and the prepareStatement method for creating ShardingPreparedStatement, as well as their various overloaded methods:

@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
    return new ShardingStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
    return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}

At the same time, ShardingConnection contains ShardingTransactionManager for managing distributed transactions. The discussion on distributed transactions is not the focus for today, and we will have a dedicated topic to explain in detail later.

But let’s take a look at the commit and rollback methods:

@Override
public void commit() throws SQLException {
    if (TransactionType.LOCAL == transactionType) {
        super.commit();
    } else {
        shardingTransactionManager.commit();
    }
}

@Override
public void rollback() throws SQLException {
    if (TransactionType.LOCAL == transactionType) {
        super.rollback();
    } else {
        shardingTransactionManager.rollback();
    }
}

It can be seen that the logic of these two methods is quite clear. When the transaction type is a local transaction, it directly calls the commit and rollback methods in the AbstractConnectionAdapter class, and these two methods will call the relevant methods of the actual connection.

Taking the commit method as an example, we can see the implementation process of AbstractConnectionAdapter based on this design thinking:

@Override
public void commit() throws SQLException {
    forceExecuteTemplate.execute(cachedConnections.values(), new ForceExecuteCallback<Connection>() {

        @Override
        public void execute(final Connection connection) throws SQLException {
            connection.commit();
        }
    });
}

ShardingDataSource #

We know that in the JDBC specification, the Connection object can be obtained through DataSource. ShardingSphere fully complies with JDBC specification, so the creation process of ShardingConnection should also be in the corresponding DataSource, which is ShardingDataSource.

The ShardingDataSource class is relatively simple, and its constructor is as follows:

public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
    super(dataSourceMap);
    checkDataSourceType(dataSourceMap);
    runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}

As we can see, the ShardingRuntimeContext context object is created in the constructor of ShardingDataSource, and the creation process of ShardingConnection is also straightforward:

@Override
public final ShardingConnection getConnection() {
    return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}

In the implementation of ShardingDataSource, the decorator pattern is also used, so its class hierarchy is similar to that of ShardingConnection. In the parent class AbstractDataSourceAdapter of ShardingDataSource, the main work is to create DatabaseType, and the core method createDatabaseType is as follows:

private DatabaseType createDatabaseType(final DataSource dataSource) throws SQLException {
    if (dataSource instanceof AbstractDataSourceAdapter) {
        return ((AbstractDataSourceAdapter) dataSource).databaseType;
    }
    try (Connection connection = dataSource.getConnection()) {
        return DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL());
    }
}

It can be seen that the DatabaseTypes class is used here, which is responsible for dynamically managing instances of DatabaseType. In ShardingSphere, the DatabaseType interface represents the database type:

public interface DatabaseType {
    // Get the name of the database
    String getName();
    // Get the JDBC URL prefix
    Collection<String> getJdbcUrlPrefixAlias();
    // Get data source metadata
    DataSourceMetaData getDataSourceMetaData(String url, String username);
}

It can be imagined that ShardingSphere provides implementation classes of DatabaseType interface for various databases, taking MySQLDatabaseType as an example.

public final class MySQLDatabaseType implements DatabaseType {

    @Override
    public String getName() {
        return "MySQL";
    }

    @Override
    public Collection<String> getJdbcUrlPrefixAlias() {
        return Collections.singletonList("jdbc:mysqlx:");
    }

    @Override
    public MySQLDataSourceMetaData getDataSourceMetaData(final String url, final String username) {
        return new MySQLDataSourceMetaData(url);
    }
}

The above code shows that MySQLDataSourceMetaData implements the DataSourceMetaData interface and provides a process to parse the input URL as shown below:

public MySQLDataSourceMetaData(final String url) {
    Matcher matcher = pattern.matcher(url);
    if (!matcher.find()) {
        throw new UnrecognizedDatabaseURLException(url, pattern.pattern());
    }
    hostName = matcher.group(4);
    port = Strings.isNullOrEmpty(matcher.group(5)) ? DEFAULT_PORT : Integer.valueOf(matcher.group(5));
    catalog = matcher.group(6);
    schema = null;
}

Clearly, DatabaseType is used to store information related to specific database metadata, and ShardingSphere also dynamically manages various instances of DatabaseType based on the SPI mechanism.

Finally, we come to the ShardingDataSourceFactory factory class, which is responsible for creating ShardingDataSource:

public final class ShardingDataSourceFactory {

    public static DataSource createDataSource(
            final Map<String, DataSource> dataSourceMap, final ShardingRuleConfiguration shardingRuleConfig, final Properties props) throws SQLException {
        return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
    }
}

Here we create the ShardingDataSource, and we also find that the creation process of ShardingRule is actually done here, by building a new ShardingRule object based on the passed ShardingRuleConfiguration.

Once the DataSource is created, we can use the API that is fully compatible with the JDBC specification to execute various SQL statements through this DataSource. We can review the usage process of ShardingDataSourceFactory to deepen our understanding of it:

public DataSource dataSource() throws SQLException {
    // Create sharding rule configuration
    ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();

    // Create table rule configuration
    TableRuleConfiguration tableRuleConfig = new TableRuleConfiguration("user", "ds${0..1}.user${0..1}");

    // Create distributed primary key generation configuration
    Properties properties = new Properties();
    properties.setProperty("worker.id", "33");
    KeyGeneratorConfiguration keyGeneratorConfig = new KeyGeneratorConfiguration("SNOWFLAKE", "id", properties);
    tableRuleConfig.setKeyGeneratorConfig(keyGeneratorConfig);
    shardingRuleConfig.getTableRuleConfigs().add(tableRuleConfig);

    // Sharding database based on age, divided into 2 databases in total
    shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("sex", "ds${sex % 2}"));

    // Sharding table based on user id, divided into 2 tables in total
    shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("id", "user${id % 2}"));

    // Create the actual DataSource using the factory class
    return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new Properties());
}

Once we have the target DataSource, we can use the core interfaces in JDBC to execute the given SQL statements:

List<User> getUsers(final String sql) throws SQLException {
    List<User> result = new LinkedList<>();
    try (Connection connection = dataSource.getConnection();
         PreparedStatement preparedStatement = connection.prepareStatement(sql);
         ResultSet resultSet = preparedStatement.executeQuery()) {
        while (resultSet.next()) {
            User user = new User();
            // code omitted for setting values of User object
            result.add(user);
        }
    }
    return result;
}

ShardingSphere obtains the connection mode in the preparation phase and generates a memory merge result set or a stream merge result set in the execution phase, and passes it to the result merge engine for further processing.

From Source Code Analysis to Daily Development #

Completing the rewrite of the JDBC specifications based on the adapter pattern is an important entry point for us to learn the ShardingSphere framework and to apply this pattern to daily development work.

The adapter pattern serves as a bridge between two incompatible interfaces. In business systems, we often encounter scenarios where we need to integrate and connect with external systems. In order to ensure the independent development of internal systems and their ability to evolve independently of external systems, we generally need to use the adapter pattern to isolate the two.

When designing such systems, we can refer to the interface definition in the JDBC specification and the implementation classes of the core interfaces in ShardingSphere based on this interface definition, in order to achieve concrete adaptation.

Summary and Preview #

This is the last lesson of the ShardingSphere execution engine. We have introduced various core interface implementation classes in the JDBC specification with the prefix “Sharding” around the upper-level components of the execution engine.

ShardingStatement and ShardingPreparedStatement directly depend on the StatementExecutor and PreparedStatementExecutor introduced in the previous lesson, while ShardingConnection and ShardingDataSource provide entry points for using the execution engine.

Here’s a question for you to think about: why is the class structure of AbstractShardingPreparedStatementAdapter more complex than AbstractStatementAdapter in ShardingSphere? Feel free to discuss it with others in the comments, and I will provide a detailed response to each answer.

Now, we have obtained result data from different data sources through the execution engine. For query statements, we usually need to merge these result data in order to return them to the client. In the next section, let’s analyze the merge engine of ShardingSphere.