24 Spark 3.0 Feature I Aqe How to Make Good Use of 3 Aqe Characteristics

24 Spark 3 #

Hello, I am Wu Lei.

Currently, it has been almost a year since the release of Spark 3.0, which introduced new features such as Adaptive Query Execution (AQE), Dynamic Partition Pruning (DPP), and Extended Join Hints. Taking advantage of these new features can greatly enhance our performance tuning. Therefore, I will spend three sessions discussing them with you. Today, let’s start with AQE.

I have noticed that many students often complain that despite enabling the AQE switch and setting the relevant configuration options, the performance improvement is still not achieved. This is usually because we do not have a thorough understanding of AQE, and optimization is done without a clear understanding, so in this session, we will start by discussing the design concept of AQE, then talk about its working principle, and finally explore how to make the most of AQE.

Why does Spark need AQE? #

Before version 2.0, Spark SQL only supported a heuristic, static optimization process, as we introduced in lectures 21, 22, and 23.

Heuristic optimization, also known as Rule-Based Optimization (RBO), is often implemented based on rules and strategies such as predicate pushdown and column pruning, which come from existing application experience in the database field. In other words, heuristic optimization is actually a kind of empiricism.

The drawback of empiricism is that it treats similar problems and scenarios with the same approach, without distinguishing between them. The Spark community realized the limitations of RBO and introduced Cost-Based Optimization (CBO) in version 2.2.

The characteristic of CBO is “based on reality”. It selects optimization strategies based on statistical information about the data tables, such as table size and column distribution. CBO supports various statistical information, such as the number of rows in a data table, the cardinality of each column, the number of null values, and the maximum, minimum, and histogram values. With the support of statistical data, the optimization strategies selected by CBO are often better than the optimization rules selected by RBO.

However, CBO also faces three dilemmas: “narrow, slow, and static”. “Narrow” means that it has a limited scope of applicability. CBO only supports data tables registered in the Hive Metastore, but in many application scenarios, the data source is often various files stored in distributed file systems, such as Parquet, ORC, CSV, etc.

“Slow” refers to the low efficiency of collecting statistical information. For data tables registered in Hive Metastore, developers need to use the “ANALYZE TABLE COMPUTE STATISTICS” statement to collect statistical information, which consumes a lot of time.

“Static” means static optimization, which is the same as RBO. CBO formulates execution plans based on various statistical information, and once the execution plan is executed, the mission of CBO is considered complete. In other words, if the data distribution changes dynamically during runtime, the previously formulated execution plan by CBO will not be adjusted or adapted accordingly.

What is AQE? #

Considering the various limitations of RBO and CBO, Spark introduced AQE (Adaptive Query Execution) in version 3.0. In summary, AQE is a dynamic optimization mechanism for Spark SQL. During runtime, whenever the Shuffle Map stage is completed, AQE combines the statistics of that stage and dynamically adjusts and corrects the yet-to-be-executed logical and physical plans based on established rules, in order to optimize the original query statement.

From the definition, it’s easy to see that AQE is triggered when the Shuffle Map stage is completed. In other words, the frequency of AQE optimization is consistent with the number of shuffles in the execution plan. Conversely, if your query statement does not involve shuffle operations, Spark SQL will not trigger AQE. For such queries, no matter how you adjust the AQE-related configuration settings, AQE won’t be of any help.

I’m sure you have many questions about the definition of AQE. For example, what exactly are the statistics that AQE relies on? What do the established rules and strategies refer to? Let’s answer them one by one.

Firstly, unlike CBO, the statistics that AQE relies on are not about specific tables or columns, but the intermediate files outputted during the Shuffle Map stage. After understanding how shuffle works, we know that each Map Task outputs data files with “data” suffix and index files with “index” suffix. These files collectively are referred to as intermediate files. Statistics such as the size of each data file, the number of empty files and their proportion, and the partition size corresponding to each Reduce Task, all these statistics based on the intermediate files form the information source for AQE optimization.

Secondly, looking at the end-to-end optimization process diagram of Spark SQL, we can see that AQE acquires statistics during runtime, and when conditions allow, the optimization decisions will be applied to both the logical and physical plans.

AQE has four established rules and strategies, consisting of one logical optimization rule and three physical optimization strategies. I have summarized these rules/strategies, along with the corresponding AQE features and the statistics that each feature relies on, in the table below. Feel free to take a look.

How to use AQE effectively? #

So, how does AQE dynamically adjust and correct the logical and physical plans that have not yet been executed based on the statistical information from the Map stage and these four rules and strategies? This brings us to the three main features of AQE: Join strategy adjustment, automatic partition merge, and automatic skew handling. We need to analyze the dynamic optimization process of AQE with the help of these features. We have already talked about the basic concepts of these features in Lesson 9, but let’s briefly review them here.

  • Join strategy adjustment: If the size of a table after filtering is smaller than the broadcast variable threshold, the data association involving this table will be downgraded from Shuffle Sort Merge Join to the more efficient Broadcast Hash Join.
  • Automatic partition merge: After the Shuffle stage, the distribution of data in the Reduce Tasks is uneven, and AQE will automatically merge small data partitions.
  • Automatic skew handling: Combined with configuration options, AQE automatically splits large data partitions in the Reduce stage to reduce the workload of individual Reduce Tasks.

Next, let’s analyze the dynamic optimization process of these three features.

Join strategy adjustment #

Let’s start with Join strategy adjustment, which involves a logical rule and a physical strategy, namely, DemoteBroadcastHashJoin and OptimizeLocalShuffleReader.

The purpose of the DemoteBroadcastHashJoin rule is to downgrade Shuffle Joins to Broadcast Joins. It’s important to note that this rule only applies to the Shuffle Sort Merge Join mechanism, and it is not supported for other mechanisms such as Shuffle Hash Join or Shuffle Nested Loop Join. For the two tables involved in the Join, after completing the calculation in the Shuffle Map stage, DemoteBroadcastHashJoin will check if the intermediate files satisfy the following conditions:

  • The total size of the intermediate files is smaller than the broadcast threshold spark.sql.autoBroadcastJoinThreshold.
  • The ratio of empty files is smaller than the configuration option spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

As long as either of the tables’ statistical information satisfies these two conditions, Shuffle Sort Merge Join will be downgraded to Broadcast Hash Join. Now you might ask, “Since the DemoteBroadcastHashJoin logical rule can convert Sort Merge Join to Broadcast Join, what is the purpose of the OptimizeLocalShuffleReader rule that also adjusts Join strategy? It seems redundant!”

I don’t know if you’ve noticed, but I have always emphasized that AQE relies on the statistical information generated from the intermediate files in the Shuffle Map stage. What does this mean? It means that before AQE starts optimizing, the Shuffle operation has already completed more than half of the execution!

Let’s take an example. Suppose we have two tables: the fact table “Order” and the dimension table “User”. The query statement and the initial execution plan are as follows:

-- Join the order table with the user table
SELECT sum(order.price * order.volume), user.id
FROM order
INNER JOIN user ON order.userId = user.id
WHERE user.type = 'Head Users'
GROUP BY user.id

Since both tables have sizes that exceed the broadcast threshold, Spark SQL initially chooses Sort Merge Join in the execution plan. AQE needs to combine the outputs of the Shuffle (Exchange) in both branches to determine whether it can be downgraded to Broadcast Join and which table to use for the downgrade. This means that both the large and small tables need to complete the calculation in the Shuffle Map stage, write the intermediate files to disk, and only then can AQE make a decision.

Join Strategy Adjustment

You might say, “There’s no need for the large table to perform Shuffle. AQE only needs to check the intermediate files of the small table.” But the problem is, AQE cannot distinguish between a large table and a small table. Before the Shuffle Map stage is complete, the size of the data tables is “transparent” to AQE. Therefore, AQE must wait for both tables to complete the Shuffle Map calculation, then collect statistics on the intermediate files in order to determine whether the downgrade conditions are met and which table should be used as the broadcast variable.

In the regular Shuffle calculation process, the calculation in the Reduce stage requires cross-node access to retrieve data shards from the intermediate files. If we follow the regular steps, even if AQE downgrades Shuffle Sort Merge Join to Broadcast Join at runtime, the intermediate files of the large table still need to be distributed over the network. At this point, the dynamic Join strategy adjustment of AQE loses its practical value because the most computationally intensive Shuffle calculation of the large table has already been completed, and switching to Broadcast Join has no further implications. In this context, the OptimizeLocalShuffleReader physical strategy is very important. Since the large table has already completed the Shuffle Map stage calculation, we don’t want to waste these calculations. By adopting the OptimizeLocalShuffleReader strategy, we can avoid the network distribution in the regular Shuffle steps, and the Reduce Task can read the intermediate files on the local node (Local) and complete the join operation with the broadcast small table.

However, it is important to note that whether the OptimizeLocalShuffleReader physical strategy takes effect or not is determined by a configuration item. This configuration item is spark.sql.adaptive.localShuffleReader.enabled. Although its default value is True, you must not change its value to False. Otherwise, as we just mentioned, the adjustment of AQE’s Join strategy will become useless.

Speaking of which, you may say, “From this perspective, the adjustment of AQE’s Join strategy seems to be somewhat redundant! After all, the memory consumption and disk I/O of the Shuffle Map stage are not saved at all!” Indeed, the computational cost of the Shuffle Map stage is not reduced at all. However, the OptimizeLocalShuffleReader strategy avoids the full distribution of data in the network during the Reduce stage, and most applications can benefit from this alone. Therefore, for the adjustment of AQE’s Join strategy, we can use an idiom to describe it: “It’s better to be late than never”.

Automatic Partition Merge #

Next, let’s talk about automatic partition merge. The principle of partition merge is relatively simple. In the Reduce stage, when the Reduce Task pulls back data shards from the entire network, AQE merges the partitions that are smaller than the target size in the order of partition numbers. The target partition size is jointly determined by the following two parameters. We discussed this in detail in Lesson 10. If you don’t remember, you can go back and take a look.

  • spark.sql.adaptive.advisoryPartitionSizeInBytes, which specifies the recommended size of partitions after merge.
  • spark.sql.adaptive.coalescePartitions.minPartitionNum, which ensures that after partition merging, the number of partitions does not fall below this value.

In addition to the above, we also need to note that after the Shuffle Map stage is completed, the AQE optimization mechanism is triggered, and the CoalesceShufflePartitions strategy is “unconditionally” added to the new physical plan. The logic of reading the configuration item, calculating the target partition size, and sequentially merging adjacent partitions is integrated into the “hand-written code” and executed in the Reduce stage under the influence of Tungsten WSCG.

Automatic Skew Handling #

In contrast to the automatic partition merge, the operation of automatic skew handling is “split”. In the Reduce stage, if the partition size that the Reduce Task needs to process exceeds a certain threshold, AQE will use the OptimizeSkewedJoin strategy to split the large partition into multiple smaller partitions. The skew partition and the split granularity are determined by the following configuration items. We discussed their meanings and effects in Lesson 10, you can go back and take a look.

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor, determines the skew expansion factor.
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes, determines the minimum threshold for skew.
  • spark.sql.adaptive.advisoryPartitionSizeInBytes, in bytes, defines the split granularity.

The split operation for automatic skew handling is also performed in the Reduce stage. Within the same Executor, the large partition that should have been processed by one Task is split by AQE into multiple smaller partitions that are processed by multiple Tasks. This balances the computational load between the Tasks. However, this does not solve the load balancing problem between different Executors.

Let’s take an example. Assume there is a Shuffle operation, with 3 partitions in the Map stage and 4 partitions in the Reduce stage. Two of the four partitions are skewed large partitions, and these two skewed large partitions happen to be assigned to Executor 0. From the following figure, we can see that although the two large partitions are split, the main workload of the entire job still falls on Executor 0, as seen horizontally. The computing power of Executor 0 is still the bottleneck of the entire job, and the partition split does not substantially alleviate this.

In addition, in the scenario of data association, for two tables involved in the Join operation, let’s temporarily refer to them as Data Table 1 and Data Table 2. If Data Table 1 is skewed and Data Table 2 is not skewed, in the process of association, in addition to splitting Data Table 1, AQE also needs to duplicate the corresponding data partitions of Data Table 2 to ensure that the association relationship is not broken.

Under this running mechanism, what if both tables have data skew? At this time, things gradually become more complicated. For the Data Table 1 and Data Table 2 in the figure above, let’s assume that Data Table 1 is split into two partitions and Data Table 2 is also split into two partitions due to skew. At this point, in order to maintain the logical association relationship, the partitions that are split from Data Table 1 and Data Table 2 each need to be duplicated, as shown in the figure below.

If the problem now becomes more complicated, let’s say the left table is split into M partitions and the right table is split into N partitions, then each table needs to maintain M x N partitions to ensure the consistency of the association logic. As M and N gradually increase, the computational cost required by AQE to handle data skew will face the risk of getting out of control.

In summary, when the data skew in the application scenario is relatively simple, such as although there is skew but the data distribution is relatively uniform, or only one side of the join is skewed, we can rely on AQE’s automatic skew handling mechanism. However, when the data skew in our scenario becomes complicated, such as the significant difference in the distribution of different keys in the data, or both tables involved in the join have a large amount of skew, we need to weigh the pros and cons between AQE’s automated mechanism and manual handling of skew. We will delve into manual handling of skew in Lesson 28.

Summary #

AQE (Adaptive Query Execution) is a dynamic optimization mechanism for Spark SQL. Its existence solves the limitations of RBO(Rule-based Optimization) and CBO(Cost-based Optimization), which are heuristic and static optimization mechanisms. To make good use of AQE, we need to understand its characteristics, as well as the working principles and usage methods of the three optimization features it supports.

In summary, AQE adjusts and corrects the logical and physical plans that have not been executed dynamically, based on the established rules and strategies, by combining the statistical information of each Shuffle Map phase. This completes the runtime optimization of the original query statement. Therefore, AQE is triggered by Spark SQL only when the query statement introduces a Shuffle operation.

The three optimization features supported by AQE are Join strategy adjustment, automatic partition merging, and automatic skew handling.

Regarding Join strategy adjustment, we need to know that the DemoteBroadcastHashJoin rule only applies to Shuffle Sort Merge Join. For other Shuffle Joins types, AQE does not currently support converting them to Broadcast Joins. Additionally, to ensure that AQE’s Join strategy adjustment runs smoothly, we need to ensure that the spark.sql.adaptive.localShuffleReader.enabled configuration option is always enabled.

Regarding automatic partition merging, we need to know that after the Shuffle Map phase is completed, AQE automatically performs the calculation process of partition merging based on the recommended partition size and the limit of the number of partitions.

Regarding AQE’s automatic skew handling, we need to know that it can only mitigate data skew at the task granularity and cannot solve the load balancing problem between different Executors. For simpler skew problems, such as involving one-sided skew in join calculations, we can rely on AQE’s automatic skew handling mechanism. However, when the data skew problem becomes more complex, we need to weigh the pros and cons between AQE’s automated mechanism and manual skew handling.

Daily Exercise #

  1. We know that AQE relies on the statistical information from the intermediate files produced during the Shuffle Map stage. Do you think AQE can obtain the same statistical information from other channels at runtime?
  2. AQE’s automatic skew handling mechanism can only balance workload at the task level. If you were to re-implement this mechanism, do you have any better ways to achieve load balancing at the executor level for AQE?

Looking forward to seeing your thoughts and answers in the comments section. See you in the next lecture!