24 Merging Engine How to Understand the Types of Data Merging and the Implementation Process of Simple Merge Strategies

24 Merging Engine How to Understand the Types of Data Merging and the Implementation Process of Simple Merge Strategies #

In the previous lesson, we mentioned that in ShardingStatement and ShardingPreparedStatement, when executing the executeQuery or executeUpdate methods, the merge engine MergeEngine will be used:

// Call the merge engine
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getRuntimeContext().getDatabaseType(), connection.getRuntimeContext().getRule(), sqlRouteResult, connection.getRuntimeContext().getMetaData().getRelationMetas(), statementExecutor.executeQuery());
// Get the merge result
result = getResultSet(mergeEngine);

In the overall structure of ShardingSphere’s sharding mechanism, the merge engine is the next step after the execution engine, and the last step of the entire data sharding engine.

In today’s lesson and the next lesson, I will guide you through a detailed explanation of the merge engine in ShardingSphere. Let’s start with the basic concept of merge.

Merge and Merge Engine #

As we know, in a sharding environment, a logical SQL statement will ultimately be parsed into multiple real SQL statements and routed to different databases for execution, and each database may return a portion of the final result.

This raises a question: how do we combine these partial results from different databases into the final result and return it correctly to the requesting client? This requires the concept of merge.

1. Classification of Merge and Implementation Solutions #

The so-called “merge” refers to the process of combining the multiple result sets obtained from various data nodes into one result set and returning it correctly to the requesting client through a certain strategy.

Based on different SQL types and application scenarios, the types of merge can be divided into 5 types: traversal, sorting, grouping, paging, and aggregation. These 5 types are combinations rather than mutually exclusive relationships.

Among them, traversal merge is the simplest merge, and sorting merge is the most commonly used merge. I will explain the two in detail later.

Lark20200903-185718.png

The five types of merge

Based on the implementation structure of merge, there are three merge schemes in ShardingSphere: streaming merge, in-memory merge, and decorator merge.

  • The so-called “streaming merge” is similar to the handling of getting results from a ResultSet in JDBC, which means returning the correct data one by one by obtaining them individually.
  • The in-memory merge, on the other hand, is different. It stores all the data of the result set in memory, and after unified calculation, it encapsulates them into a data result set that can be accessed one by one and returns it.
  • The last “decorator merge” refers to the process of merging all result sets and performing unified functional enhancement through the decorator pattern, which is similar to the process of decorating SQLRewriteContext with SQLRewriteContextDecorator in the engine.

Clearly, streaming merge and in-memory merge are exclusive, and decorator merge can further process on top of streaming merge and in-memory merge.

There is also a certain relationship between the merge scheme and merge types. Traversal, sorting, and streaming grouping are all types of streaming merge. In-memory merge can be applied to unified grouping, sorting, and aggregation, while decorator merge has 2 types: paging merge and aggregation merge. The corresponding relationship between them is shown in the figure below:

Lark20200903-185710.png

The corresponding relationship between merge types and merge schemes

2. Merge Engine #

After explaining the concepts, let’s go back to the code. We first come to the MergeEngine interface in the shardingsphere-merge code project:

public interface MergeEngine {
    // Perform merge
    MergedResult merge() throws SQLException;
}

As we can see, the MergeEngine interface is very simple, with only one merge method. In ShardingSphere, there are five implementations of this interface, and their class hierarchy is as follows:

Drawing 2.png

MergeEngine class hierarchy

From the naming, we can see that the two MergeEngine implementations with “Encrypt” in their names are related to data desensitization and will be explained in later topics. We will analyze the other three.

Before that, let’s also pay attention to the MergedResult interface that represents the merge result:

public interface MergedResult {
    boolean next() throws SQLException;
    Object getValue(int columnIndex, Class<?> type) throws SQLException;
    Object getCalendarValue(int columnIndex, Class<?> type, Calendar calendar) throws SQLException;
    InputStream getInputStream(int columnIndex, String type) throws SQLException;
    boolean wasNull() throws SQLException;
}

As we can see, MergedResult is very similar to QueryResult in the execution engine, except that it has fewer methods. After understanding the definition of the merge engine and the representation of the merge result, let’s analyze the process of creating the MergeEngine. As we have seen before, this actually relies on the factory class MergeEngineFactory. The implementation process is as follows:

public static MergeEngine newInstance(final DatabaseType databaseType, final ShardingRule shardingRule,
                                          final SQLRouteResult routeResult, final RelationMetas relationMetas, final List<QueryResult> queryResults) {
    ...
    // If it is a query statement, create a DQLMergeEngine
    if (routeResult.getSqlStatementContext() instanceof SelectSQLStatementContext) {
        return new DQLMergeEngine(databaseType, (SelectSQLStatementContext) routeResult.getSqlStatementContext(), queryResults);
    } 
    // If it is a database management statement, create a DALMergeEngine
    if (routeResult.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
        return new DALMergeEngine(shardingRule, queryResults, routeResult.getSqlStatementContext(), relationMetas);
    }
    return new TransparentMergeEngine(queryResults);
}

This newInstance method has some interesting parameters worth paying attention to. These parameters are familiar to us, including DatabaseType, ShardingRule, SQLRouteResult, List<QueryResult>.

Then, we see that the logic of the code will return different types of MergeEngine based on the different types of SqlStatementContext in SQLRouteResult. If it is a SelectSQLStatementContext, it returns a DQLMergeEngine for querying; if the SQLStatement is a DALStatement, which is a database management statement, it returns a DALMergeEngine; otherwise, it returns a TransparentMergeEngine.

For merge, the most important engine type is DQLMergeEngine. Let’s dive into it.

Its merge method is as follows:

public MergedResult merge() throws SQLException {
    // If there is only one result set
    if (1 == queryResults.size()) {
        return new IteratorStreamMergedResult(queryResults);
    }
    Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
    selectSQLStatementContext.setIndexes(columnLabelIndexMap);
    // If there is more than one result set, build different merge plans
    return decorate(build(columnLabelIndexMap));
}

Here we can see a condition check. When the number of query result sets is 1, we only need to iterate over the result set for merging. This type of merge is the first one we are going to introduce, called iterator merge. Iterator merge is the simplest type among all merge types.

If there is more than one result set, it means that we need to merge them. We will call the build method with different conditions to build different MergedResult and return it:

private MergedResult build(final Map<String, Integer> columnLabelIndexMap) throws SQLException {
    // If there is a GROUP BY clause or aggregate function in the SELECT statement, perform GROUP BY merge
    if (isNeedProcessGroupBy()) {
        return getGroupByMergedResult(columnLabelIndexMap);
    } 
    // If there is a DISTINCT column in the SELECT statement, set the GROUP BY context and perform GROUP BY merge
    if (isNeedProcessDistinctRow()) {
        setGroupByForDistinctRow();
        return getGroupByMergedResult(columnLabelIndexMap);
    } 
    // If there is an ORDER BY clause, perform ORDER BY merge
    if (isNeedProcessOrderBy()) {
        return new OrderByStreamMergedResult(queryResults, selectSQLStatementContext.getOrderByContext().getItems());
    } 
    // If none of the above conditions are met, perform iterator merge
    return new IteratorStreamMergedResult(queryResults);
}

Here, we see two major merge strategies: grouped merge and ordered merge. Then, we see a decorate method that is built on top of the build method. This decorate method represents a decorator merge, used to perform pagination merge operation for different database dialects. We will discuss this method in detail in the next lesson.

As a result, we have linked various merge types in ShardingSphere through the merge engine MergeEngine. Let’s now discuss the specific implementation mechanisms of each merge type.

Let’s start with iterator merge.

The Simplest Merge: Iterator Merge #

Iterator merge is the simplest merge method. We only need to combine multiple query result sets into a single linked list. The operation of iterating over the data is equivalent to iterating over a linked list. On the implementation side, the representation of iterating over the result set is a class called IteratorStreamMergedResult, which inherits from StreamMergedResult and represents a stream merge result.

The next method of IteratorStreamMergedResult is as follows:

@Override
public boolean next() throws SQLException {
    if (getCurrentQueryResult().next()) {
        return true;
    }
    if (!queryResults.hasNext()) {
        return false;
    }
    // Get the result set stream and set it as the current QueryResult
    setCurrentQueryResult(queryResults.next());
    boolean hasNext = getCurrentQueryResult().next();
    if (hasNext) {
        return true;
    }
    while (!hasNext && queryResults.hasNext()) {
        setCurrentQueryResult(queryResults.next());
hasNext = getCurrentQueryResult().next();
}
return hasNext;
}

The getValue method for it is in the parent class StreamMergedResult as follows:

@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
    Object result = getCurrentQueryResult().getValue(columnIndex, type);
    wasNull = getCurrentQueryResult().wasNull();
    return result;
}

Here, the getCurrentQueryResult method is also used to stream and obtain the current data item, and then obtain the specific value.

### Most commonly used merge: sorting merge

The second type of merge we are going to introduce is sorting merge, which returns an OrderByStreamMergedResult. This class also inherits the StreamMergedResult class used for streaming merge.

Before introducing OrderByStreamMergedResult, let's imagine the scenario of sorting merge.

When executing a SQL statement in multiple databases, we can accomplish the sorting function within each database. That is, our execution result contains multiple QueryResults that are sorted internally. What we need to do is to put them in one place and perform a global sort. Since the content of each QueryResult is already ordered, we only need to sort the data value pointed to by the current cursor in the QueryResult, which is equivalent to sorting multiple ordered arrays.

This process is a bit abstract. Let's further illustrate it with the following diagram. Suppose in our health_task table, there is a health_point field used to represent the health score that can be obtained by completing this health task.

Next, we need to perform a sorting merge based on this health_point field. The initial data is shown in the following diagram:

![Lark20200903-190058.png](../images/Ciqc1F9QzRSADVUyAABkYnJfMvs829.png)

Initial data in the three health_task tables

The above figure shows the data result sets returned by the three tables. Each data result set has already been sorted based on the health_point field, but the three data result sets are unordered. The method of sorting merge is to sort the data value pointed to by the current cursor in the three data result sets and put them into a sorted queue.

In the above figure, we can see that the first health_point of health_task0 is the smallest, the first health_point of health_task1 is the largest, and the first health_point of health_task2 is second. Therefore, the queue should be sorted in the order of health_task1, health_task2, and health_task0, as shown in the following diagram:

![Lark20200903-185846.png](../images/CgqCHl9QzHiAf33WAABsaH9vLR0050.png) Sorted three health_task tables in the queue

In OrderByStreamMergedResult, we can see the definition of the queue as follows, using the Queue interface in JDK:

private final Queue<OrderByValue> orderByValuesQueue;

And in the constructor of OrderByStreamMergedResult, we further see that the orderByValuesQueue is actually a PriorityQueue:

public OrderByStreamMergedResult(final List<QueryResult> queryResults, final Collection<OrderByItem> orderByItems) throws SQLException {
    this.orderByItems = orderByItems;
    // Build PriorityQueue
    this.orderByValuesQueue = new PriorityQueue<>(queryResults.size());
    // Initialize PriorityQueue
    orderResultSetsToQueue(queryResults);
    isFirstNext = true;
}

At this point, it is necessary to give a brief introduction to PriorityQueue in JDK. For PriorityQueue, its feature is that it can automatically sort the elements it contains. It can store wrapper classes of basic data types and support custom classes. For the wrapper classes of basic data types, the priority queue arranges the elements in ascending order by default, while for custom classes, a customized comparator needs to be defined.

The commonly used methods of PriorityQueue are as follows:

peek(): Returns the element at the head of the queue.
poll(): Returns the element at the head of the queue and removes it.
offer(): Adds an element to the queue.
size(): Returns the number of elements in the queue.
isEmpty(): Checks if the queue is empty.

After understanding the features of PriorityQueue, let's take a look at how to initialize the queue based on a QueryResult list. The orderResultSetsToQueue method is as follows:

private void orderResultSetsToQueue(final List<QueryResult> queryResults) throws SQLException {
    for (QueryResult each : queryResults) {
        // Build OrderByValue
        OrderByValue orderByValue = new OrderByValue(each, orderByItems);
        if (orderByValue.next()) {
            // Add OrderByValue to the queue
            orderByValuesQueue.offer(orderByValue);
        }
    }
    setCurrentQueryResult(orderByValuesQueue.isEmpty() ? queryResults.get(0) : orderByValuesQueue.peek().getQueryResult());
}
Here, an OrderByValue object is constructed based on the QueryResult, and its next method is used to determine whether it needs to be added to the PriorityQueue.

We can see that the offer method of the PriorityQueue is called here to insert specific elements into the priority queue.

After adding all the OrderByValue objects to the PriorityQueue, the OrderByStreamMergedResult sets the first element in the PriorityQueue as the current query result through the setCurrentQueryResult method of its parent class StreamMergedResult. At this point, the PriorityQueue points to the first globally sorted element, which is 50 in the diagram above.

Obviously, for PriorityQueue, the newly created OrderByValue here is a custom class, so a custom comparator needs to be implemented. In the OrderByValue class, we can see that it implements the Comparable interface in Java, and the compareTo method is implemented as follows, comparing the values for each sorting item OrderByItem:

@Override
public int compareTo(final OrderByValue o) {
    int i = 0;
    for (OrderByItem each : orderByItems) {
        int result = CompareUtil.compareTo(orderValues.get(i), o.orderValues.get(i), each.getSegment().getOrderDirection(),
            each.getSegment().getNullOrderDirection(), orderValuesCaseSensitive.get(i));
        if (0 != result) {
            return result;
        }
        i++;
    }
    return 0;
}

Based on the previous diagram, when using PriorityQueue to get the next data each time, we only need to move the cursor of the top result set of the queue down, and re-enter the priority queue based on the new cursor position to find its own position.

This process is reflected in the next method as shown below:

@Override
public boolean next() throws SQLException {
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }
    if (isFirstNext) {
        isFirstNext = false;
        return true;
    }

    // Get the first element in the PriorityQueue and remove it
    OrderByValue firstOrderByValue = orderByValuesQueue.poll();

    // Move the cursor to the next element of firstOrderByValue and re-insert it into the PriorityQueue, which will cause PriorityQueue to automatically re-sort
    if (firstOrderByValue.next()) {
        orderByValuesQueue.offer(firstOrderByValue);
    }
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }

    // Set the current result set to the first element of PriorityQueue
    setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult());
    return true;
}

This process also needs to be explained using a series of diagrams. When the next method is called for the first time, the health_task1 at the top of the queue will be popped out of the queue, and the current cursor points to the value 50 of the data will be returned. At the same time, we will move the cursor down and put health_task1 back into the priority queue. The priority queue will also sort based on the current data value pointed by health_task1, and health_task1 will be ranked third in the queue. It looks like this:

![Lark20200903-185915.png](../images/Ciqc1F9QzJaANhr0AABsaCCFqA0376.png)

Three health_task tables in the priority queue after the first next

The data result set of health_task2, which was ranked second in the queue before, automatically moves to the first place in the queue. When the second next is performed, we only need to pop out health_task2, which is currently ranked first in the queue, and return the value pointed by its data result set cursor. Of course, for health_task2, we also move the cursor down and continue to add it to the priority queue, and so on.

![Lark20200903-185920.png](../images/CgqCHl9QzJ2AMQzxAABrQS5M0oA899.png)

Three health_task tables in the priority queue after the second next

As can be seen, based on the above design and implementation method, in the case where the data within each data result set is ordered but the overall multiple data result sets are unordered, we can perform sorting without loading all the data into memory.

Therefore, ShardingSphere uses a streaming merge method here, which greatly improves the efficiency of merging.

### From Source Code Analysis to Daily Development

Queue is a commonly used data structure, and priority queue is very useful for scenarios that require data comparison and sorting. Based on its inherent sorting characteristics, priority queue provides an elegant and efficient solution for handling global sorting scenarios like ShardingSphere. It can be applied to daily development processes as needed.

### Summary and Preview

Today's content focuses on the merge engine in ShardingSphere, which is the final step in processing SQL execution results in a sharding environment. We abstract several common merge types and implementation solutions in ShardingSphere. At the same time, we provide the design ideas and implementation details of the simplest traversal merge and the most commonly used sorting merge.

Here's a question for you to think about: In ShardingSphere, which data structure in JDK is used for sorting shard data?

In the next lesson, we will continue to introduce the remaining merge types in ShardingSphere's merge engine, including group merge, aggregation merge, and pagination merge.