21 Execution Engine How Should the Overall SQL Execution Process in a Sharded Environment Be Abstracted

21 Execution Engine How Should the Overall SQL Execution Process in a Sharded Environment be Abstracted #

Starting today, we will begin a new topic, which is the Execution Engine of ShardingSphere. Once we have obtained the SQL generated from the Routing Engine and Rewrite Engine, the Execution Engine will perform the execution of these SQL statements in the specific databases.

The Execution Engine is the core module of ShardingSphere. In the next three lessons, we will provide a comprehensive introduction to it. Today, let’s first discuss the abstraction process of the overall process of SQL execution in a sharded environment. The following two lessons will explain to you how to understand the Executor execution model in ShardingSphere.

Overall Structure of ShardingSphere Execution Engine #

Before explaining the specific source code, let’s start by looking at the entry points of using PreparedQueryShardingEngine and SimpleQueryShardingEngine in ShardingSphere, as mentioned in Lesson 17: “Routing Engine: How to understand the operation mechanism of the core class ShardingRouter in sharding?”

We found the following shard method in the ShardingStatement class, which uses the SimpleQueryShardingEngine:

private void shard(final String sql) {
    // Get ShardingRuntimeContext from Connection
    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext(); 
    // Create SimpleQueryShardingEngine
    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(runtimeContext.getRule(), runtimeContext.getProps(), runtimeContext.getMetaData(), runtimeContext.getParseEngine()); 
    // Perform sharding and get routing results
    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}

Similarly, a similar shard method exists in ShardingPreparedStatement.

From the design pattern perspective, ShardingStatement and ShardingPreparedStatement are actually typical facade classes that integrate entry classes for SQL routing and execution.

By reading the source code, we can see that ShardingStatement contains a StatementExecutor, while ShardingPreparedStatement contains PreparedStatementExecutor and BatchPreparedStatementExecutor. These classes all end with Executor, which clearly indicates that they are the entry classes for the SQL execution engine we are looking for.

We find that the above three Executors are located in the sharding-jdbc-core project.

In addition, there is also a sharding-core-execute project, which is parallel to sharding-core-route and sharding-core-rewrite. In terms of naming, this project should also be related to the execution engine. Indeed, we found the ShardingExecuteEngine class in this project, which is the entry class for the sharding execution engine.

Then, we also found the SQLExecuteTemplate and SQLExecutePrepareTemplate classes, which are typical SQL execution template classes.

Based on the understanding of the ShardingSphere component design and code layering style so far, it can be imagined that the ShardingExecuteEngine is the underlying object, and SQLExecuteTemplate should depend on ShardingExecuteEngine. The StatementExecutor, PreparedStatementExecutor, and BatchPreparedStatementExecutor belong to the higher-level objects and should depend on SQLExecuteTemplate. This conjecture is confirmed by the simple reading of the reference relationships before these core classes.

Based on the above analysis, we can provide the overall structure diagram of the SQL execution engine (shown in the following figure). The part above the horizontal line is located in the sharding-core-execute project and belongs to the underlying components, while the part below the horizontal line is located in sharding-jdbc-core and belongs to the upper-level components. This ability to analyze the source code is also a concrete manifestation of “reading the source code based on the principle of package design” mentioned in Lesson 12: “From Application to Principle: How to efficiently read the code of ShardingSphere”.

Drawing 0.png

Layered structure diagram of ShardingSphere execution engine

On the other hand, we also see SQLExecuteCallback and SQLExecutePrepareCallback in the above diagram. Clearly, their role is to handle callbacks during the SQL execution process. This is a very typical extensibility handling approach.

ShardingExecuteEngine #

As usual, let’s start with the underlying ShardingExecuteEngine. Unlike the routing and rewrite engines, the ShardingExecuteEngine is the only execution engine in ShardingSphere, so it is directly designed as a class rather than an interface. This class includes the following variables and constructors:

private final ShardingExecutorService shardingExecutorService;    
private ListeningExecutorService executorService;

public ShardingExecuteEngine(final int executorSize) {
    shardingExecutorService = new ShardingExecutorService(executorSize);
    executorService = shardingExecutorService.getExecutorService();
}

1.ExecutorService #

As shown above, we can see that there are two variables ending with ExecutorService. Obviously, from the names, it is not difficult to see that they are both executor services, similar to java.util.concurrent.ExecutorService in JDK. ListeningExecutorService is from Google’s Guava toolkit, while ShardingExecutorService is a custom class in ShardingSphere that includes the construction process of ListeningExecutorService. Let’s discuss both of them separately.

  • ShardingExecutorService

We found that ShardingExecutorService includes a JDK ExecutorService. Its creation process is as follows, where newCachedThreadPool and newFixedThreadPool are common methods provided by JDK:

private ExecutorService getExecutorService(final int executorSize, final String nameFormat) {
    ThreadFactory shardingThreadFactory = ShardingThreadFactoryBuilder.build(nameFormat);
    return 0 == executorSize ? Executors.newCachedThreadPool(shardingThreadFactory) : Executors.newFixedThreadPool(executorSize, shardingThreadFactory);
}
  • ListeningExecutorService

Since the Future returned by a normal thread pool in JDK has relatively limited features, Guava provides ListeningExecutorService to decorate it. We wrap ExecutorService with ListeningExecutorService to return a ListenableFuture instance. ListenableFuture is an extension of Future that adds a addListener listener method, so this method will be called back actively when the task is completed. The construction process of ListeningExecutorService is as follows:

executorService = MoreExecutors.listeningDecorator(getExecutorService(executorSize, nameFormat));
oreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);

After clarifying the ExecutorService, we go back to the ShardingExecuteEngine class, which has the groupExecute method as the entry point. This method has multiple parameters, and I’ll list them separately:

/**
 * @param inputGroups: input groups
 * @param firstCallback: the callback for the first shard execution
 * @param callback: the callback for shard execution
 * @param serial: whether to use multiple threads for execution
 * @param <I>: type of the input value
 * @param <O>: type of the return value
 * @return the execution result
 * @throws SQLException: throws exception
 */
public <I, O> List<O> groupExecute(
    final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback, final boolean serial)
    throws SQLException {
    if (inputGroups.isEmpty()) {
        return Collections.emptyList();
    }
    return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
}

The ShardingExecuteGroup object in this shard execution group is actually a list that contains input information, and the input for the above groupExecute method is a collection of ShardingExecuteGroup. By checking if the input parameter serial is true or not, the code branches to the serialExecute and parallelExecute branches. I will explain these two branches separately.

2. SerialExecute #

Let’s start with the serialExecute method, which is used for serial execution:

private <I, O> List<O> serialExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                     final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
    Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
    // Get the first input ShardingExecuteGroup
    ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
    // Complete synchronous execution using the first callback firstCallback
    List<O> result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback));
    // For the remaining ShardingExecuteGroups, execute syncGroupExecute synchronously using the callback callback one by one
    for (ShardingExecuteGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
        result.addAll(syncGroupExecute(each, callback));
    }
    return result;
}

The basic flow of the above code is to get the first input ShardingExecuteGroup and complete synchronous execution using the first callback firstCallback with the syncGroupExecute method. Then, for the remaining ShardingExecuteGroup, execute syncGroupExecute synchronously using the callback callback one by one. The syncGroupExecute method is as follows:

private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
        return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap());
}

We can see that the synchronous execution process is actually delegated to the ShardingGroupExecuteCallback callback interface:

public interface ShardingGroupExecuteCallback<I, O> {
    
    Collection<O> execute(Collection<I> inputs, boolean isTrunkThread, Map<String, Object> shardingExecuteDataMap) throws SQLException;
}

The ShardingExecuteDataMap here is like a data dictionary for SQL execution, which is stored in the ThreadLocal to ensure thread safety. We can get the corresponding DataMap object based on the current execution thread.

3. ParallelExecute #

So far, I have explained the flow of serial execution. Next, let’s see the parallelExecute method for parallel execution:

private <I, O> List<O> parallelExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback,
                                       final ShardingGroupExecuteCallback<I, O> callback) throws SQLException {
    Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator();
    // Get the first input ShardingExecuteGroup
    ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next();
    // Execute asynchronous callback using asyncGroupExecute
    Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback);
    // Get execution results and assemble the return value
    return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}

Note that there is an asynchronous execution method asyncGroupExecute being used here, which takes a list of ShardingExecuteGroup as a parameter:

private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
        for (ShardingExecuteGroup<I> each : inputGroups) {
            result.add(asyncGroupExecute(each, callback));
        }
        return result;
    }

In this method, for each ShardingExecuteGroup passed in, it calls another overloaded asynchronous asyncGroupExecute method:

private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) {
        final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap();
        return executorService.submit(new Callable<Collection<O>>() {
                
            @Override
            public Collection<O> call() throws SQLException {
                return callback.execute(inputGroup.getInputs(), false, dataMap);
            }
        });
    }

Clearly, as an asynchronous execution method, it uses Guava’s ListeningExecutorService to submit an asynchronous task and return a ListenableFuture. This asynchronous task is the specific callback.

Finally, let’s look at the last line of the parallelExecute method, where the getGroupResults method is called to get the execution results:

private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
            List<O> result = new LinkedList<>(firstResults);
            for (ListenableFuture<Collection<O>> each : restFutures) {
                try {
                    result.addAll(each.get());
                } catch (final InterruptedException | ExecutionException ex) {
                    return throwException(ex);
                }
            }
            return result;
    }

For those who are familiar with Future usage, the above code should be familiar. We iterate through ListenableFuture, then use the get method to synchronously wait for the return result, and finally assemble the results into a list and return it. This is a common way to use Future.

Looking back, whether it is the serialExecute method or the parallelExecute method, they both extract the first element firstInputs from ShardingExecuteGroup and execute it, and then perform synchronous or asynchronous execution on the remaining elements. Behind the scenes, ShardingSphere has a unique design philosophy for using threads. Considering that the current thread is also a valuable resource, by executing the first task with the current thread, we can fully utilize the current thread and maximize the utilization of threads.

So far, we have completed the introduction of the ShardingExecuteEngine class. As an execution engine, ShardingExecuteEngine provides a multi-threaded execution environment. In terms of system design, this is also a technique that can be referred to in daily development. We can design and implement a multi-threaded execution environment that does not need to complete specific business operations, but only needs to execute the callback function passed in. The ShardingExecuteEngine in ShardingSphere provides such an environment. Similar implementation methods can be seen in other open-source frameworks such as Spring.

Next, let’s see how ShardingSphere uses callbacks to perform the actual execution of SQL.

Callback interface ShardingGroupExecuteCallback #

The definition of the callback interface ShardingGroupExecuteCallback is very simple:

public interface ShardingGroupExecuteCallback<I, O> {
       
        Collection<O> execute(Collection<I> inputs, boolean isTrunkThread, Map<String, Object> shardingExecuteDataMap) throws SQLException;
    }

This interface completes the actual SQL execution operation based on the passed-in generic inputs collection and shardingExecuteDataMap. In ShardingSphere, there are many places where anonymous methods implement the ShardingGroupExecuteCallback interface, but there is only one class that explicitly implements this interface, namely the SQLExecuteCallback class. This is an abstract class, and its execute method is as follows:

@Override
public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, 
                                   final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
    Collection<T> result = new LinkedList<>();
    for (StatementExecuteUnit each : statementExecuteUnits) {
        result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
    }
    return result;
}
For each input StatementExecuteUnit data structure, the execute method mentioned above will further execute an execute0 method, as shown below:

```java
private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
    // Set ExecutorExceptionHandler
    ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
    // Get DataSourceMetaData, using cache mechanism
    DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData());
    // Initiate SQLExecutionHook
    SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
    try {
        RouteUnit routeUnit = statementExecuteUnit.getRouteUnit();
        // Start execution hook
        sqlExecutionHook.start(routeUnit.getDataSourceName(), routeUnit.getSqlUnit().getSql(), routeUnit.getSqlUnit().getParameters(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
        // Execute SQL
        T result = executeSQL(routeUnit.getSqlUnit().getSql(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
        // Successful hook
        sqlExecutionHook.finishSuccess();
        return result;
    } catch (final SQLException ex) {
        // Failed hook
        sqlExecutionHook.finishFailure(ex);
        // Exception handling
        ExecutorExceptionHandler.handleException(ex);
        return null;
    }
}

The meaning of each line of this code is quite clear. Here, an ExecutorExceptionHandler is introduced for exception handling, and an SPISQLExecutionHook is also introduced to embed hooks into the execution process. We have seen many times the implementation mechanism of hooks based on the SPI mechanism in the previous lessons on SQL parsing and routing engines, so we won’t go into details here. We can see that the actual execution of SQL is handed over to the executeSQL template method, and various subclasses of SQLExecuteCallback need to implement this template method.

In ShardingSphere, no SQLExecuteCallback implementation class is provided, but anonymous methods are heavily used to implement the executeSQL template method. For example, in the executeQuery method in the next lesson “Lesson 22 | Execution Engine: How to Handle the Executor Execution Model in ShardingSphere? (Part 1)”, a SQLExecuteCallback anonymous implementation method is created to perform the query operation:

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    // Create a SQLExecuteCallback and execute the query
    SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            
        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
            return getQueryResult(sql, statement, connectionMode);
        }
    };
    // Execute the SQLExecuteCallback and return the result
    return executeCallback(executeCallback);
}

SQLExecuteTemplate Template Class #

In the underlying components of the ShardingSphere execution engine, there is another class that needs to be expanded, which is the SQLExecuteTemplate class, which is the direct user of ShardingExecuteEngine. From its name, it is a typical template utility class, similar to JdbcTemplate in Spring. But in general, the implementation of this kind of template utility class is relatively simple, basically a simple encapsulation of the underlying objects.

SQLExecuteTemplate is no exception. Its purpose is to encapsulate and process the entry methods in ShardingExecuteEngine. ShardingExecuteEngine has only one core method, which is the executeGroup method:

public <T> List<T> executeGroup(final Collection<ShardingExecuteGroup<? extends StatementExecuteUnit>> sqlExecuteGroups, final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
    try {
        return executeEngine.groupExecute((Collection) sqlExecuteGroups, firstCallback, callback, serial);
    } catch (final SQLException ex) {
        ExecutorExceptionHandler.handleException(ex);
        return Collections.emptyList();
    }
}

As can be seen, what this method does is simply calling the groupExecute method of ShardingExecuteEngine to complete the specific execution work and adding an exception handling mechanism.

From Source Code Analysis to Daily Development #

We can extract many techniques from today’s content and apply them in daily development. One practical technique is that we can use Guava’s ListeningExecutorService to strengthen the ExecutorService executor service based on ordinary Future in JDK. At the same time, we also saw the system extension mechanism based on callbacks. We can use this extension mechanism to build an independent runtime environment to implement all business operations through callbacks.

Summary and Preview #

This lesson is the first part of introducing the ShardingSphere execution engine, which describes the abstraction process of the SQL execution process in the sharding environment. We first introduced the execution engine, then detailed the entire execution process from the dimensions of the executor service, execution callbacks, and execution template classes.

Here’s a question for you to think about: What techniques did ShardingSphere apply when implementing the Executor using multi-threading technology? Feel free to discuss with everyone in the comments, and I will provide comments and answers.

In the next lesson, we will continue to introduce the execution engine of ShardingSphere, focusing on the statement executor for SQL execution.