25 Merging Engine How to Understand Stream Merging and in Memory Merging in Complex Merge Scenarios

25 Merging Engine How to Understand Stream Merging and In-Memory Merging in Complex Merge Scenarios #

Building on the content of the previous lesson, today we will continue to introduce the remaining merge strategies in ShardingSphere, including group merge, aggregation merge, and pagination merge.

  • Among them, group merge is the most complex merge type;
  • Aggregation merge is a merge that is added on top of group merge;
  • Pagination merge, on the other hand, is a typical merge type implemented using the decorator pattern.

The Most Complex Merge: Group Merge #

Among all merge mechanisms in ShardingSphere, group merge is the most complex. It can be implemented in two ways: stream group merge and memory group merge.

For stream group merge, the sorting items and grouping items in the SQL statement must be consistent in terms of fields and sorting types. Otherwise, only memory merge can ensure the correctness of the data.

Because group merge is very complex, let’s continue with an example and combine it with the source code to explain the implementation process of group merge. First, let’s take a look at the following SQL statement:

SELECT task_name, SUM(health_point) FROM health_task GROUP BY task_name ORDER BY task_name;

Obviously, the grouping item and sorting item in the above SQL statement are the same, both using the task_name column. Hence, the data obtained is continuous. Therefore, the data required for grouping is all stored in the data values pointed to by the current cursor of each data result set, so stream merge can be used.

As shown in the figure below, we have sorted each health_task result set based on the task_name:

Drawing 0.png

Let’s start with some initialization code. Go back to DQLMergeEngine and find the getGroupByMergedResult method used for group merge, as shown below:

private MergedResult getGroupByMergedResult(final Map<String, Integer> columnLabelIndexMap) throws SQLException {
    return selectSQLStatementContext.isSameGroupByAndOrderByItems()
        ? new GroupByStreamMergedResult(columnLabelIndexMap, queryResults, selectSQLStatementContext)
        : new GroupByMemoryMergedResult(queryResults, selectSQLStatementContext);
}

Here we can see a isSameGroupByAndOrderByItems check, which determines whether the grouping conditions and sorting conditions are the same. Based on the previous analysis, if the grouping conditions and sorting conditions are the same, the stream group merge strategy GroupByStreamMergedResult is executed; otherwise, the memory group merge strategy GroupByMemoryMergedResult is used.

Let’s take stream merge as an example to introduce the implementation mechanism of group merge in ShardingSphere. Before we delve into the code, let’s first clarify the steps involved in stream group merge. We will use a series of diagrams for illustration.

Now that we have sorted each health_task result set based on task_name, the “task1” in health_task0, health_task1, and health_task2, is positioned at the beginning of the queue, i.e., the first element.

  • First next call

When we make the first next call, the health_task0 at the top of the queue will be popped out, along with the data from other result sets that have the same “task1” grouping value. Then, after obtaining all the health_point values for task_name “task1”, we perform the sum operation.

So, after the first next call, the result set obtained is the sum of scores for “task1”, which is 46+43+40=129, as shown in the following figure:

Drawing 2.png

  • Second next call

At the same time, the cursors for all data result sets move to the next distinct data value for “task1”, and the results are re-sorted based on the current cursor value. In the figure above, we see that the second “task2” exists in both health_task0 and health_task1, which means that the data result sets containing the name “task2” are at the top of the queue.

When we make the next call again, we obtain the scores for “task2” and perform the sum operation, which is 42+50=92, as shown in the figure below:

Lark20200907-164326.png

For the following next calls, we follow similar handling mechanisms, finding the data records for “task3”, “task4”, “task5”, and so on in these three health_task tables, and so on.

After gaining an intuitive understanding of stream group merge, let’s go back to the source code. Let’s start with the GroupByStreamMergedResult, which represents the result. We find that GroupByStreamMergedResult actually inherits from OrderByStreamMergedResult, which was introduced in the previous lesson for sorting merging, and thus uses the priority queue PriorityQueue and OrderByValue objects mentioned earlier.

However, to manage runtime status, GroupByStreamMergedResult adds the currentRow variable representing the current result record and the currentGroupByValues variable representing the current grouping value:

private final List<Object> currentRow;
private List<?> currentGroupByValues;

Next, let’s take a look at the constructor of GroupByStreamMergedResult:

public GroupByStreamMergedResult(
    final Map<String, Integer> labelAndIndexMap, final List<QueryResult> queryResults, final SelectSQLStatementContext selectSQLStatementContext) throws SQLException {
    super(queryResults, selectSQLStatementContext.getOrderByContext().getItems());

    this.selectSQLStatementContext = selectSQLStatementContext;
    currentRow = new ArrayList<>(labelAndIndexMap.size());
    // If the priority queue is not empty, assign the grouping value of the first element in the queue to the currentGroupByValues variable
    currentGroupByValues = getOrderByValuesQueue().isEmpty()
        ? Collections.emptyList() : new GroupByValue(getCurrentQueryResult(), selectSQLStatementContext.getGroupByContext().getItems()).getGroupValues();
}

Here, we use a GroupByValue object to store the grouping values. As the name suggests, the purpose of this object is to calculate the value of each grouping condition from the result set QueryResult, as shown below:

public final class GroupByValue {

    private final List<?> groupValues;

    public GroupByValue(final QueryResult queryResult, final Collection<OrderByItem> groupByItems) throws SQLException {
        groupValues = getGroupByValues(queryResult, groupByItems);
    }
private List<?> getGroupByValues(final QueryResult queryResult, final Collection<OrderByItem> groupByItems) throws SQLException {
    List<Object> result = new ArrayList<>(groupByItems.size());
    for (OrderByItem each : groupByItems) {

        // Get the value of each grouping condition from the QueryResult
        result.add(queryResult.getValue(each.getIndex(), Object.class));
    }
    return result;
}

public boolean next() throws SQLException {
    // Clear the current result record
    currentRow.clear();

    if (getOrderByValuesQueue().isEmpty()) {
        return false;
    }
    if (isFirstNext()) {
        super.next();
    }

    // Merge the records with the same grouping conditions in order
    if (aggregateCurrentGroupByRowAndNext()) {
        // Generate the next result record grouping values
        currentGroupByValues = new GroupByValue(getCurrentQueryResult(), selectSQLStatementContext.getGroupByContext().getItems()).getGroupValues();
    }
    return true;
}

private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
    boolean result = false;

    // Generate the calculation unit
    Map<AggregationProjection, AggregationUnit> aggregationUnitMap = Maps.toMap(
        selectSQLStatementContext.getProjectionsContext().getAggregationProjections(),
        input -> AggregationUnitFactory.create(input.getType(), input instanceof AggregationDistinctProjection)
    );

    // Loop and merge the records with the same grouping conditions in order
    while (currentGroupByValues.equals(new GroupByValue(getCurrentQueryResult(), selectSQLStatementContext.getGroupByContext().getItems()).getGroupValues())) {
        // Calculate the aggregation values
        aggregate(aggregationUnitMap);

        // Cache the current record to the result record
        cacheCurrentRow();

        // Get the next record, call the next method in the parent class to point currentResultSet to the next element
        result = super.next();

        // End the loop if all values have been traversed
        if (!result) {
            break;
        }
    }

    // Set the aggregation field result of the current record
    setAggregationValueToCurrentRow(aggregationUnitMap);

    return result;
}

public static AggregationUnit create(final AggregationType type, final boolean isDistinct) {
    switch (type) {
        case MAX:
            return new ComparableAggregationUnit(false);
        case MIN:
            return new ComparableAggregationUnit(true);
        case SUM:
            return isDistinct ? new DistinctSumAggregationUnit() : new AccumulationAggregationUnit();
        case COUNT:
            return isDistinct ? new DistinctCountAggregationUnit() : new AccumulationAggregationUnit();
        case AVG:
            return isDistinct ? new DistinctAverageAggregationUnit() : new AverageAggregationUnit();
        default:
            throw new UnsupportedOperationException(type.name());
    }
}

The code provided is written in Java and contains comments in Chinese. The code includes three methods: getGroupByValues, next, and aggregateCurrentGroupByRowAndNext.

The getGroupByValues method takes a QueryResult and a collection of OrderByItem objects as parameters. It iterates over the groupByItems collection and gets the value of each grouping condition from the QueryResult, adding them to a list.

The next method is the core method in the GroupByStreamMergedResult class. It clears the current result record and checks if the orderByValuesQueue is empty. If it is empty, it returns false. If it is not empty and it’s the first call to next, it calls the next method in the parent class. It then merges the records with the same grouping conditions in order and generates the next result record grouping values.

The aggregateCurrentGroupByRowAndNext method contains the core logic for grouping and aggregating records. It generates a calculation unit map using the AggregationProjection objects from the selectSQLStatementContext. It then enters a loop and merges records with the same grouping conditions in order. It calculates the aggregation values using the aggregate method and caches the current record to the result record. It continues to the next record, and the loop ends if there are no more records. Finally, it sets the aggregation field result of the current record.

The create method in the AggregationUnitFactory class is a factory method that creates an instance of AggregationUnit based on the given AggregationType and isDistinct flag. It supports five aggregation types: MAX, MIN, SUM, COUNT, and AVG. Each type corresponds to a different AggregationUnit implementation.

This code belongs to the ShardingSphere project and is related to grouping and aggregating data in a query result. These classes all implement the AggregationUnit interface, which is defined as follows:

public interface AggregationUnit {

    // Merge the aggregate values
    void merge(List<Comparable<?>> values);

    // Get the aggregate result
    Comparable<?> getResult();
}

The AggregationUnit interface provides two methods, merge and getResult, for merging and retrieving aggregate values, respectively. So what is this AggregationUnit used for? Let’s take a look at the aggregate method in the aggregateCurrentGroupByRowAndNext code flow, which is included in the example SQL shown in previous lessons. The core of the aggregate method is to call the merge method of AggregationUnit to calculate the aggregate value. Here is the simplified code:

private void aggregate(final Map<AggregationProjection, AggregationUnit> aggregationUnitMap) throws SQLException {
    for (Entry<AggregationProjection, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
        // Calculate the aggregate value
        entry.getValue().merge(values);
    }
}

Clearly, the core of the above aggregate method is to calculate the aggregate value by calling the merge method in AggregationUnit. For the example SQL in today’s lesson, the specific AggregationUnit used should be AccumulationAggregationUnit. The implementation of the AccumulationAggregationUnit class is also quite simple. Its merge method sums up a series of values passed in. Here is the code:

public final class AccumulationAggregationUnit implements AggregationUnit {

    private BigDecimal result;

    @Override
    public void merge(final List<Comparable<?>> values) {
        if (null == values || null == values.get(0)) {
            return;
        }
        if (null == result) {
            result = new BigDecimal("0");
        }
        result = result.add(new BigDecimal(values.get(0).toString()));
    }

    @Override
    public Comparable<?> getResult() {
        return result;
    }
}

So far, the main content of the GroupByStreamMergedResult class in ShardingSphere for grouped streaming merge has been introduced.

Now let’s continue with the aggregation merge derived from the group merge.

Appended Merge: Aggregation Merge #

In fact, through the previous analysis, we have already touched on the content related to aggregation merge, and we are also discussing aggregation merge based on the foundation of group merge. Before that, we need to clarify that the aggregation operation itself has nothing to do with grouping, meaning that besides SQL with grouping, aggregation functions can also be used in SQL without grouping. On the other hand, the processing of aggregation functions is consistent whether it is stream-based group merge or memory-based group merge. Aggregation merge can be understood as an added merge capability on top of the previously introduced merge mechanism.

MAX, MIN, SUM, COUNT, and AVG, the 5 aggregation functions supported by ShardingSphere, can be divided into three categories of aggregation scenarios: MAX and MIN are used for comparison scenarios, SUM and COUNT are used for accumulation scenarios, and AVG is used to calculate the average.

In the sharding-core-merge project, there is implementation code for the aggregation engine. We have already given the AggregationUnit interface and the implementation class AccumulationAggregationUnit used to calculate the aggregate value when introducing aggregation merge earlier. For other AggregationUnit implementations, it is not difficult to imagine their internal implementation methods.

For example, taking AverageAggregationUnit as an example, here are its merge and getResult methods:

public final class AverageAggregationUnit implements AggregationUnit {

    private BigDecimal count;
    private BigDecimal sum;

    @Override
    public void merge(final List<Comparable<?>> values) {
        if (null == values || null == values.get(0) || null == values.get(1)) {
            return;
        }
        if (null == count) {
            count = new BigDecimal("0");
        }
        if (null == sum) {
            sum = new BigDecimal("0");
        }
        count = count.add(new BigDecimal(values.get(0).toString()));
        sum = sum.add(new BigDecimal(values.get(1).toString()));
    }

    @Override
    public Comparable<?> getResult() {
        if (null == count || BigDecimal.ZERO.equals(count)) {
            return count;
        } 
        return sum.divide(count, 4, BigDecimal.ROUND_HALF_UP);
    }
}

The meaning of the above code is quite clear, and the implementation methods of other aggregation classes are similar. We won’t go into details.

Next, let’s continue to introduce the last common application scenario in the merge engine, which is the paged merge.

Decorated Merge: Paged Merge #

From the perspective of implementation, pagination merging is different from the sorting merging and grouping merging introduced earlier. It adopts a decorator pattern, which means that after the sorting and grouping merging are completed, the results are further processed for pagination.

In the DQLMergeEngine class, the decorate method for the decorator is as follows:

// Decorate the result set using the decorator pattern for pagination merging
private MergedResult decorate(final MergedResult mergedResult) throws SQLException {
        PaginationContext paginationContext = selectSQLStatementContext.getPaginationContext();
        if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
            return mergedResult;
        } 
        // Merge the pagination result set based on different database types
        String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();
        if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
            return new LimitDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("Oracle".equals(trunkDatabaseName)) {
            return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("SQLServer".equals(trunkDatabaseName)) {
            return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        return mergedResult;
}

Here, it first determines whether the results need to be merged for pagination. If the PaginationContext does not have pagination requirements or there is only one query result, pagination merging is not needed. If pagination merging is required, different decorator merged result objects, DecoratorMergedResult, are constructed based on the three different types of database types.

DecoratorMergedResult is the base class for these three specific pagination merging implementations. The methods in DecoratorMergedResult are just proxies based on another MergedResult. For example, the getValue method is shown below:

private final MergedResult mergedResult;

@Override
public final Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
    return mergedResult.getValue(columnIndex, type);
}

Next, let’s take a look at the LimitDecoratorMergedResult, a pagination merging result decorator for MySQL or PostgreSQL. This class inherits from DecoratorMergedResult. As we know, the implementation of pagination in MySQL is to find the target starting row and then set the number of rows needed using the LIMIT keyword. A typical pagination SQL statement is as follows:

SELECT * FROM user WHERE user_id > 1000 LIMIT 20;

Since the necessary result set has already been obtained through grouping and sorting, the main task for pagination is to obtain the current starting row or offset. In the LimitDecoratorMergedResult, the following skipOffset method is used to calculate this offset:

private boolean skipOffset() throws SQLException {
        for (int i = 0; i < pagination.getActualOffset(); i++) {
            if (!getMergedResult().next()) {
                return true;
            }
        }
        rowNumber = 0;
        return false;
}

Here, the actual offset is obtained using the getActualOffset method from the PaginationContext pagination context object. Then, the next method of the parent class MergedResult is called in a loop to determine whether this target offset can be reached. If it can, it means that the pagination operation is feasible.

Next, let’s take a look at the next method of LimitDecoratorMergedResult:

@Override
public boolean next() throws SQLException {
        if (skipAll) {
            return false;
        }
        if (!pagination.getActualRowCount().isPresent()) {
            return getMergedResult().next();
        }
        return ++rowNumber <= pagination.getActualRowCount().get() && getMergedResult().next();
}

This method actually implements the logic of the LIMIT keyword. It increases the rowNumber counter and compares it with the target row count, and streams back the data.

So far, the introduction to the five types of merging engines in DQLMergeEngine is complete.

From Source Code Analysis to Daily Development #

In today’s content, we once again see the powerful effect of the decorator pattern. Compared with the decorator pattern used in the engine based on SQLRewriteContext, ShardingSphere uses a more simple and direct way to use decorators in pagination merging.

We directly call an explicit decorate method on the previous MergedResult to decorate the result. The key point of applying the decorator pattern in this way is to design a complete class hierarchy similar to MergedResult, ensuring that various decorator classes can flow in the same system before and after decoration.

The key point of using decorators in the engine’s rewriting is to put all the information that needs to be decorated in a context object.

Both of these implementations of the decorator pattern are worth learning from in daily development.

Summary and Preview #

Today’s content revolves around the detailed discussion of the five complex merging types in ShardingSphere.

Among them, grouping merging is the most complex merging type. When introducing grouping merging, we also introduced the concepts and implementation methods related to aggregation. Therefore, aggregation merging can be considered as an additional merging type on top of grouping merging. The implementation of pagination merging needs to consider different database types. ShardingSphere also uses the decorator pattern to adapt to the differences in pagination mechanisms in different databases when implementing pagination merging.

Finally, here’s a question for you to think about: How does ShardingSphere use the decorator pattern to complete the pagination merging strategy for different databases?

Up to now, we have provided a detailed introduction to the five merging engines in the ShardingSphere slicing engine. In the next lesson, we will focus on the implementation principle of read-write separation in a master-slave architecture.