22 Execution Engine How to Master the Execution Model of Executor in Sharding Sphere Above

22 Execution Engine How to Master the Execution Model of Executor in ShardingSphere Above #

In the previous lesson, we introduced ShardingGroupExecuteCallback and SQLExecuteTemplate. From a design perspective, the former acts as the callback entry point for ShardingExecuteEngine, while the latter is a template class that encapsulates ShardingExecuteEngine and provides a unified external entry point. These classes are all located in the sharding-core-execute project at the bottom layer.

image.png

Starting from today, we will enter the sharding-jdbc-core project to look at several core classes in the upper-level design of the execution engine in ShardingSphere.

AbstractStatementExecutor #

As shown in the above figure, according to the overall structure diagram of the execution engine in the previous lesson, it can be seen that the direct user of SQLExecuteTemplate is the AbstractStatementExecutor class. Today, we will start the discussion from this class. This class has many variables, let’s take a look:

// Database type
private final DatabaseType databaseType;
// resultSetType used in JDBC to specify the result processing mode
private final int resultSetType;
// resultSetConcurrency used in JDBC to specify whether the result set can be modified
private final int resultSetConcurrency; 
// resultSetConcurrency used in JDBC to specify whether the result set is still available after transaction commit or rollback
private final int resultSetHoldability;
// Sharding Connection
private final ShardingConnection connection;
// Template class for data preparation
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// SQL execution template class
private final SQLExecuteTemplate sqlExecuteTemplate;
// List of JDBC Connections
private final Collection<Connection> connections = new LinkedList<>();
// SQLStatement context
private SQLStatementContext sqlStatementContext;
// Parameter sets
private final List<List<Object>> parameterSets = new LinkedList<>(); 
// List of JDBC Statements
private final List<Statement> statements = new LinkedList<>(); 
// List of JDBC ResultSets
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
// List of ShardingExecuteGroups
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups = new LinkedList<>();

Starting from this class, we will slowly come into contact with objects related to the JDBC specification, because the design goal of ShardingSphere is to rewrite a system that is fully compatible with the current JDBC specification. The Connection, Statement, and ResultSet objects we see here, as well as the resultSetType, resultSetConcurrency, and resultSetHoldability parameters, are all part of the JDBC specification, and we have made special annotations in the comments, which you are also familiar with.

Important classes like ShardingConnection, which is self-encapsulated by ShardingSphere, have been explained in detail in Lesson 03: “Compatibility: What is the relationship between the JDBC specification and ShardingSphere?”.

In AbstractStatementExecutor, the expansion of these variables will involve many sharding-jdbc-core code projects. We need to have a certain understanding of classes related to database access, including ShardingStatement and ShardingPreparedStatement, which we have already encountered before, so before we delve into the specific implementation methods of the AbstractStatementExecutor class, we need to have a certain understanding of these classes.

In the constructor of AbstractStatementExecutor, we can see the creation process of the execution engine ShardingExecuteEngine introduced in the previous lesson, and create the SQLExecuteTemplate template class through it, as shown in the following code:

public AbstractStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final ShardingConnection shardingConnection) {
    …
    ShardingExecuteEngine executeEngine = connection.getRuntimeContext().getExecuteEngine();
    sqlExecuteTemplate = new SQLExecuteTemplate(executeEngine, connection.isHoldTransaction());
}

At the same time, the cacheStatements method in AbstractStatementExecutor is also very unique. This method fills the statements and parameterSets objects separately based on the ShardingExecuteGroup it holds, for use by the subclasses of AbstractStatementExecutor:

protected final void cacheStatements() {
    for (ShardingExecuteGroup<StatementExecuteUnit> each : executeGroups) {
        statements.addAll(Lists.transform(each.getInputs(), new Function<StatementExecuteUnit, Statement>() {

            @Override
            public Statement apply(final StatementExecuteUnit input) {
                return input.getStatement();
            }
        }));
        parameterSets.addAll(Lists.transform(each.getInputs(), new Function<StatementExecuteUnit, List<Object>>() {

            @Override
            public List<Object> apply(final StatementExecuteUnit input) {
                return input.getRouteUnit().getSqlUnit().getParameters();
            }
        }));
    }
}

Note: The implementation here uses the Lists.transform method provided by Google’s Guava framework to complete the conversion between different objects. This implementation method is widely used in ShardingSphere and is worth learning.

Next, let’s take a look at the most core method in AbstractStatementExecutor, which is the executeCallback method for executing callbacks:

protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
    List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
    refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
    return result;
}

Obviously, SQLExecuteTemplate should be used here to complete the specific callback execution process. At the same time, I can see that there is a refreshMetaDataIfNeeded auxiliary method here to refresh metadata.

AbstractStatementExecutor has two implementation classes: StatementExecutor and PreparedStatementExecutor. I will explain them separately next.

image

StatementExecutor #

Let’s move on to StatementExecutor and take a look at its init method used for initialization:

(Note: The translation for the remaining content has reached the limit. Please continue to the next message for the full translation.)

public void init(final SQLRouteResult routeResult) throws SQLException {
    setSqlStatementContext(routeResult.getSqlStatementContext());
    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
    cacheStatements();
}

The cacheStatements method has been introduced earlier, while the obtainExecuteGroups method is used to obtain the required ShardingExecuteGroup collection. To implement this method, the SQLExecutePrepareTemplate and corresponding callback SQLExecutePrepareCallback need to be introduced.

1. SQLExecutePrepareCallback #

Judging from the naming, one might feel that SQLExecutePrepareTemplate and SQLExecuteTemplate should be a pair, especially since one of the names contains the word “Prepare,” which brings to mind PreparedStatement.

However, in fact, SQLExecutePrepareTemplate and SQLExecuteTemplate are not related. It does not provide an encapsulation of ShardingExecuteEngine like SQLExecuteTemplate does. Instead, its main focus is on the collection and assembly of ShardingExecuteGroup data, in other words, to prepare the data.

In SQLExecutePrepareTemplate, the core functionality lies in the following method, which takes a SQLExecutePrepareCallback object as a parameter and returns a collection of ShardingExecuteGroup<StatementExecuteUnit>:

public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
    return getSynchronizedExecuteUnitGroups(routeUnits, callback);
}

To construct this collection, SQLExecutePrepareTemplate implements several auxiliary methods. It also introduces a SQLExecutePrepareCallback callback to populate some data in the ShardingExecuteGroup data structure. The SQLExecutePrepareCallback interface is defined as follows, revealing that the Connection and StatementExecuteUnit objects are created through the callback:

public interface SQLExecutePrepareCallback {

    // Get a list of connections
    List<Connection> getConnections(ConnectionMode connectionMode, String dataSourceName, int connectionSize) throws SQLException;

    // Get the statement execution unit
    StatementExecuteUnit createStatementExecuteUnit(Connection connection, RouteUnit routeUnit, ConnectionMode connectionMode) throws SQLException;
}

Once we have obtained the desired ShardingExecuteGroup, the initialization work of StatementExecutor is essentially completed. The remaining part of this class consists of a series of SQL execution methods starting with “execute,” including executeQuery, executeUpdate, and their various overloaded versions. Let’s first look at the executeQuery method for querying:

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    // Create a SQLExecuteCallback and execute the query
    SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {

        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
            return getQueryResult(sql, statement, connectionMode);
        }
    };
    // Execute the SQLExecuteCallback and return the result
    return executeCallback(executeCallback);
}

We have already introduced this method in the previous lesson. We know that SQLExecuteCallback implements the ShardingGroupExecuteCallback interface and provides the executeSQL template method. In the executeQuery method above, the implementation process of the executeSQL template method is to call the getQueryResult method as shown below:

private QueryResult getQueryResult(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
    // Execute SQL using Statement and get the result
    ResultSet resultSet = statement.executeQuery(sql);
    getResultSets().add(resultSet);
    // Construct the result based on the connection mode
    return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}

2. ConnectionMode #

In the getQueryResult method, querying is performed and the result is returned based entirely on the JDBC Statement and ResultSet objects.

However, an important concept of the ShardingSphere execution engine is also introduced here, namely ConnectionMode. It is an enumeration:

public enum ConnectionMode {
    MEMORY_STRICTLY, CONNECTION_STRICTLY
}

There are two specific connection modes: MEMORY_STRICTLY and CONNECTION_STRICTLY.

  • MEMORY_STRICTLY represents the memory-strict mode,
  • CONNECTION_STRICTLY represents the connection-strict mode.

ConnectionMode is a unique concept proposed by ShardingSphere, which reflects a design balance. From the perspective of accessing database resources, it is on the one hand about control and protection of database connection resources, and on the other hand about using a more optimal merge mode to save middleware memory resources. How to handle the relationship between the two is a problem that the ShardingSphere execution engine needs to solve.

To this end, ShardingSphere proposes the concept of connection mode, with a simple example:

  • When using the memory-strict mode, for the same data source, if there are 10 sharding tables, executing will acquire 10 connections and perform parallel execution.
  • When using the connection-strict mode, during execution, only one connection will be acquired and perform serial execution.

So how is this ConnectionMode determined?

In fact, this part of the code is located in SQLExecutePrepareTemplate. We compare it with the maxConnectionsSizePerQuery configuration item and the number of SQLs that need to be executed for each database. Based on this, we determine the specific ConnectionMode:

ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;

About this judgment condition, we can use a simple diagram to explain it, as shown below:

image

As shown in the above figure, we can see that if the number of SQL statements pointed to by each database connection is more than one, the memory limit mode is used, otherwise the connection limit mode is used.

3.StreamQueryResult VS MemoryQueryResult #

After understanding the design concept of ConnectionMode, let’s take a look at the QueryResult returned by the executeQuery method of StatementExecutor.

In ShardingSphere, QueryResult is an interface that represents the query result, and this interface encapsulates many methods for accessing raw data:

public interface QueryResult {
    boolean next() throws SQLException;
    Object getValue(int columnIndex, Class<?> type) throws SQLException;
    Object getCalendarValue(int columnIndex, Class<?> type, Calendar calendar) throws SQLException;
    InputStream getInputStream(int columnIndex, String type) throws SQLException;
    boolean wasNull() throws SQLException;
    int getColumnCount() throws SQLException;
    String getColumnLabel(int columnIndex) throws SQLException;
    boolean isCaseSensitive(int columnIndex) throws SQLException;
}

In ShardingSphere, QueryResult interface exists in two implementation classes: StreamQueryResult (representing a stream merge result) and MemoryQueryResult (representing a memory merge result).

The reason why ShardingSphere adopts this design is directly related to the previously introduced ConnectionMode.

  • We know that in the memory limit mode, ShardingSphere does not limit the number of database connections consumed by an operation;
  • However, when using the connection limit mode, ShardingSphere strictly controls the number of database connections consumed by an operation.

Based on this design principle, as shown in the calculation diagram of ConnectionMode above: within the range allowed by maxConnectionSizePerQuery, when the number of requests that a connection needs to execute is greater than 1, it means that the current database connection cannot hold the corresponding data result set, so it must use memory merge; otherwise, it can use stream merge.

  • StreamQueryResult

By comparing the implementation process of StreamQueryResult and MemoryQueryResult, we can further analyze the above principle. In StreamQueryResult, its next method is very simple:

@Override
public boolean next() throws SQLException {
    return resultSet.next();
}

Obviously, this is a stream processing method, getting the next data row from the ResultSet.

  • MemoryQueryResult

Let’s take a look at MemoryQueryResult. In its constructor, all the data rows in the ResultSet are first obtained and stored in the memory variable rows through the getRows method:

private Iterator<List<Object>> getRows(final ResultSet resultSet) throws SQLException {
    Collection<List<Object>> result = new LinkedList<>();
    while (resultSet.next()) {
        List<Object> rowData = new ArrayList<>(resultSet.getMetaData().getColumnCount());
        for (int columnIndex = 1; columnIndex <= resultSet.getMetaData().getColumnCount(); columnIndex++) {
          // Get the data for each row
            Object rowValue = getRowValue(resultSet, columnIndex);
          // Store it in memory
            rowData.add(resultSet.wasNull() ? null : rowValue);
        }
        result.add(rowData);
    }
    return result.iterator();
}

Based on the above method, the next method of MemoryQueryResult should be to get the next data row from this rows variable, as shown below:

public boolean next() {
    if (rows.hasNext()) {
        currentRow = rows.next();
        return true;
    }
    currentRow = null;
    return false;
}

In this way, we convert the traditional stream processing method into a memory processing method.

This concludes our discussion on ConnectionMode and the two QueryResult’s. Let’s go back to StatementExecutor. After understanding the executeQuery method of StatementExecutor, let’s take a look at its more general execute method:

public boolean execute() throws SQLException {
     return execute(new Executor() {

         @Override
         public boolean execute(final Statement statement, final String sql) throws SQLException {
             return statement.execute(sql);
         }
     });
}

Notice that the above execute method does not use the SQLExecuteCallback callback, but instead uses an Executor interface, which is defined as follows:

private interface Executor {
    //execute SQL
    boolean execute(Statement statement, String sql) throws SQLException;
}

Then, as we continue to look further, we find that the SQLExecuteCallback callback is actually used in the actual execution process of this method:

private boolean execute(final Executor executor) throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    //create SQLExecuteCallback and execute
    SQLExecuteCallback<Boolean> executeCallback = new SQLExecuteCallback<Boolean>(getDatabaseType(), isExceptionThrown) {
        @Override
        protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
            //execute using Executor
            return executor.execute(statement, sql);
        }
    };
    List<Boolean> result = executeCallback(executeCallback);
    if (null == result || result.isEmpty() || null == result.get(0)) {
        return false;
    }
    return result.get(0);
}

The purpose of this nested layer is to better separate the responsibilities of the code and process the execution results. The same processing technique is also reflected in the executeUpdate method of StatementExecutor.

PreparedStatementExecutor #

After explaining StatementExecutor, let’s take a look at PreparedStatementExecutor. PreparedStatementExecutor includes the same init method for initialization as StatementExecutor. Then, let’s look at its executeQuery method as shown below, and we can see that the processing method here is the same as in StatementExecutor:

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    //create SQLExecuteCallback and execute
    SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
            return getQueryResult(statement, connectionMode);
        }
    };
    return executeCallback(executeCallback);
}

Then, let’s take a look at its execute method, and we will find the difference:

public boolean execute() throws SQLException {
    boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<Boolean> executeCallback = SQLExecuteCallbackFactory.getPreparedSQLExecuteCallback(getDatabaseType(), isExceptionThrown);
    List<Boolean> result = executeCallback(executeCallback);
    if (null == result || result.isEmpty() || null == result.get(0)) {
        return false;
    }
    return result.get(0);
}

Unlike StatementExecutor, when implementing the execute method, PreparedStatementExecutor does not design an interface like Executor, but directly provides a factory class SQLExecuteCallbackFactory:

public final class SQLExecuteCallbackFactory {
    ...
    public static SQLExecuteCallback<Boolean> getPreparedSQLExecuteCallback(final DatabaseType databaseType, final boolean isExceptionThrown) {
        return new SQLExecuteCallback<Boolean>(databaseType, isExceptionThrown) {
            @Override
            protected Boolean executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return ((PreparedStatement) statement).execute();
            }
        };
    }
}

Note that the static method getPreparedSQLExecuteCallback here returns an implementation of the SQLExecuteCallback callback. In this implementation, the JDBC underlying PreparedStatement is used to complete the execution process of the specific SQL.

So far, we have provided a detailed introduction to the two main executors, StatementExecutor and PreparedStatementExecutor, in ShardingSphere.

From Source Code Analysis to Daily Development #

The design ideas of the two types of QueryResult introduced in this lesson can also be applied to daily development. When we face how to handle data from databases or external data sources, we can design stream access and memory access based on our needs. These two access methods have certain representativeness in the data access process.

Usually, we would first consider storing all accessed data in memory and then performing secondary processing. However, this processing method will face performance issues. Stream access is more efficient, but we need to explore suitable application scenarios.

Summary and Preview #

In this second lesson on the ShardingSphere execution engine topic, we focus on the executors in the execution engine and provide the implementation of StatementExecutor and PreparedStatementExecutor. We also provide a detailed discussion on the connection modes in ShardingSphere.

Here is a question for you to ponder: What is the concept and purpose of the connection modes in ShardingSphere? Feel free to discuss this with everyone in the comments, and I will provide comments on each answer.

From a class-level perspective, StatementExecutor and PreparedStatementExecutor both belong to the underlying components. In the next lesson, we will introduce the higher-level execution engine components, including ShardingStatement and PreparedShardingStatement.