19 Configuration Item Explanation Which Parameters Affect the Performance of Application Execution

19 Configuration Item Explanation - Which Parameters Affect the Performance of Application Execution #

Hello, I am Wu Lei.

In the previous lecture, we learned about Broadcast Join, an efficient join strategy. To trigger Spark SQL to use this type of join strategy, we can use the broadcast function in SQL Functions to force the base table to be broadcasted. In this case, Spark SQL will fully “respect” the developer’s intention and try its best to broadcast and adopt the Broadcast Join strategy as long as the base table is smaller than 8GB.

In addition to this more “forceful” approach, we can also use a more gentle way to “delegate” the selection right to Spark SQL and let it decide when to choose Broadcast Join and when to fallback to Shuffle Join. This gentle way is through configuration settings. In Lecture 12, we mastered the general configuration options of Spark. In today’s lecture, let’s talk about those configuration options related to Spark SQL.

However, when you open the Configuration page on the official Spark website, you will find that there are hundreds of configuration options, with dozens of them related to Spark SQL, which can be overwhelming and confusing. In fact, most of the configuration options can simply use their default values and do not require much attention from us. Therefore, we will focus our attention on Join strategy selection and AQE.

The importance of Join strategy goes without saying, and AQE (Adaptive Query Execution) is a new feature introduced in Spark 3.0. It helps Spark SQL dynamically adjust the execution plan at runtime and optimize the execution performance of jobs more flexibly.

Broadcast Join #

Next, let’s talk about how to use configuration options to enable Spark SQL to choose broadcast join in a “gentle” way. For the two tables participating in the join, we refer to the smaller-sized table as the base table.

If the storage size of the base table is smaller than the broadcast threshold, there is no need for developers to explicitly call the broadcast function. Spark SQL will automatically choose the broadcast join strategy, create a broadcast variable on top of the base table, and complete the data association between the two tables.

So, what is the broadcast threshold and how is it defined? The broadcast threshold is actually a value that signifies the storage size. It can be 10MB, 1GB, and so on. The broadcast threshold is set by the following configuration option. As long as the size of the base table is smaller than the configured value, Spark SQL will automatically choose the broadcast join strategy.

Image

As shown in the table above, the default value of the broadcast threshold is 10MB. In general, for industrial applications, we usually set it to around 2GB to effectively trigger the broadcast join. Once the broadcast threshold is set, Spark SQL needs to compare it with the storage size of the base table. The storage size of the base table needs to be calculated in advance. Now, the question arises, based on what does Spark SQL calculate this value?

This question needs to be discussed in two scenarios: if the data of the base table comes from the file system, then the benchmark Spark SQL uses to compare with the broadcast threshold is the storage size of the base table on disk. If the data of the base table comes from an intermediate stage of DAG computation, then Spark SQL will refer to the statistical values in the DataFrame execution plan and compare them with the broadcast threshold, as shown below.

val df: DataFrame = _
// Cache the distributed dataset first
df.cache.count

// Obtain the execution plan
val plan = df.queryExecution.logical

// Obtain the accurate estimation of the dataset size in the execution plan
val estimated: BigInt = spark
.sessionState
.executePlan(plan)
.optimizedPlan
.stats
.sizeInBytes

By this point, you may be a little impatient: “Why bother with all this trouble? Setting configuration options and estimating the size of the base table in advance is really troublesome! It’s easier to use the broadcast function mentioned in the previous lesson!”

From the perspective of developers, it is indeed more convenient to use the broadcast function. However, the combination of the broadcast threshold and base table estimation not only provides an additional optimization approach for developers but also lays the foundation for Spark SQL’s dynamic optimization.

Dynamic optimization is naturally in contrast to static optimization. Prior to version 3.0, Spark SQL mainly relied on compile-time (before runtime) statistics information for query plan optimization, such as the storage size of tables on disk, and so on.

Therefore, prior to version 3.0, all optimization mechanisms in Spark SQL (such as the selection of join strategies) were static. It was not possible to dynamically adjust the execution plan at runtime to adapt to the changes in the dataset during runtime.

For example, during the logical optimization phase of Spark SQL, the sizes of both large tables exceeded the broadcast threshold, forcing Spark SQL to choose the suboptimal strategy of a shuffle join during the physical optimization phase.

However, in reality, during runtime, if one of the tables has significantly reduced data after filtering, and the remaining data size is much smaller than the broadcast threshold, it can be completely stored in a broadcast variable. Unfortunately, at this point, it is too late to make any changes as the static optimization mechanism cannot transform the shuffle join into a broadcast join.

AQE (Adaptive Query Execution) #

In order to compensate for the shortcomings of static optimization and make Spark SQL more intelligent, the Spark community introduced the AQE mechanism in version 3.0.

AQE, short for Adaptive Query Execution, refers to the dynamic optimization features of Spark SQL, including Join Strategy Adjustment, Automatic Partition Merge, and Automatic Skew Handling.

Perhaps the Spark community is more conservative when it comes to new optimization mechanisms, so the AQE mechanism is not enabled by default. To fully utilize the above three features, we need to modify spark.sql.adaptive.enabled to true.

Image

Okay, now that we have successfully enabled the AQE mechanism, let’s talk about the problems these features solve and how they work, combined with the relevant configuration options.

Join Strategy Adjustment #

Let’s start with Join Strategy Adjustment. In a nutshell, Join Strategy Adjustment refers to Spark SQL dynamically adjusting the original Shuffle Join strategy to a more efficient Broadcast Join strategy.

Specifically, after the Map stage in the DAG is completed, Spark SQL combines the statistics of the intermediate files in the Shuffle phase to recalculate the storage size of the data tables in the Reduce phase. If it finds that the base table size is smaller than the broadcast threshold, Spark SQL adjusts the Shuffle Join in the next stage to a Broadcast Join.

It’s easy to see that the key here is Shuffle, as well as the intermediate files in Shuffle. In fact, not only Join Strategy Adjustment, but the entire AQE mechanism relies on the Shuffle phase in the DAG.

As the saying goes, “No man is a genius in everything.” To achieve dynamic optimization, Spark SQL must rely on the execution status at runtime, and the intermediate files in Shuffle are the only source of this status.

For example, through the intermediate files in Shuffle, Spark SQL can obtain statistical information such as file size, Map Task data shard size, Reduce Task shard size, and empty file ratio. It is by using these statistics that Spark SQL can dynamically adjust the execution plan during job execution.

Let’s further understand this with an example. Taking Join Strategy Adjustment as an example, given the following query statement, assume that the storage size of the salaries table and the employees table both exceeds the broadcast threshold. In this case, for the association calculation of the two tables, Spark SQL can only choose the Shuffle Join strategy.

However, in fact, after filtering by age, the remaining data size of the employees table is smaller than the broadcast threshold. At this time, thanks to the Join Strategy Adjustment of AQE, Spark SQL can adjust the originally planned Shuffle Join strategy to a Broadcast Join strategy at runtime, thereby accelerating the execution performance.

select * from salaries inner join employees
  on salaries.id = employees.id
  where employees.age >= 30 and employees.age < 45

As you can see, the setting of the broadcast threshold and the estimation of the data size after filtering the base table are very important in this case. The reason is that these two factors determine whether Spark SQL can successfully utilize the Join Strategy Adjustment feature of AQE at runtime to optimize the overall execution performance. Therefore, we must master the method of setting the broadcast threshold and the method of estimating the dataset size.

After introducing Join Strategy Adjustment, let’s talk about the other two features of the AQE mechanism: Automatic Partition Merge and Automatic Skew Handling. They are both optimization strategies for Shuffle itself.

Let’s start by discussing what problems Automatic Partition Merge and Automatic Skew Handling are trying to solve. We know that the computation process of Shuffle consists of two stages: Map stage and Reduce stage. The data distribution in the Map stage is often determined by the source data in the distributed file system, so the distribution of the dataset in this stage is relatively uniform.

The distribution of the data in the Reduce stage is different because it is determined by the Distribution Key and the parallelism of the Reduce stage. Parallelism refers to the number of partitions, a concept we have repeatedly emphasized in previous lessons, which I believe you are familiar with.

The Distribution Key defines the basis for distributing data in Shuffle. For the reduceByKey operator, the Distribution Key is the Key of the Paired RDD; for the repartition operator, the Distribution Key is the argument passed to the repartition operator, such as repartition($“Column Name”).

In business scenarios, the Distribution Key is often a field that is prone to skew, such as user_id or item_id. Correspondingly, the distribution of the dataset in the Reduce stage is often unbalanced. Data imbalance is often reflected in two aspects: on the one hand, the size of some data partitions is too small, and on the other hand, a few partitions have extremely large sizes. The automatic partition merging and automatic skew handling mechanisms of AQE are designed to address these two aspects of data imbalance.

Automatic Partition Merging #

Having understood the usefulness of automatic partition merging, let’s now discuss how Spark SQL actually merges the small partitions in the Reduce phase. To understand how partition merging works, we first need to understand where it starts and where it ends.

Specifically, how does Spark SQL determine whether a data partition is small enough and needs to be merged? Moreover, since merging is performed on multiple partitions, there naturally needs to be a convergence condition. The reason for this is simple: if merging continues indefinitely, the entire dataset will be squeezed into a single super-large partition, and the degree of parallelism will decrease to 1. Obviously, this is not the desired result.

In fact, Spark SQL uses a relatively simple method to achieve partition merging. Specifically, Spark SQL does not proactively determine whether a partition is small enough, but rather scans the partitions sequentially based on their IDs and performs a merge when the scanned data volume exceeds the “target size”. This target size is determined by the following two configuration parameters.

Image

Among them, developers can directly specify the target size through the first configuration parameter spark.sql.adaptive.advisoryPartitionSizeInBytes. The second parameter is used to limit the parallelism after merging in the Reduce phase, to avoid insufficient utilization of CPU resources caused by merging.

By combining the dataset size and the minimum parallelism, we can infer the average size of each partition. Let’s assume we denote this average size as #partitionSize. The actual target size is then the smaller of either the advisoryPartitionSizeInBytes setting or #partitionSize.

Once the target size is determined, Spark SQL sequentially scans the data partitions. When the sum of the sizes of adjacent partitions exceeds the target size, Spark SQL merges the scanned partitions. Then, it continues to merge the remaining partitions in the same way until all partitions have been processed.

Automatic Skew Handling #

After analyzing how automatic partition merging deals with the problem of small and scattered data partitions, let’s now discuss how automatic skew handling addresses the issue of heavily skewed large partitions.

Based on the analysis above, we can easily see that automatic partition merging actually consists of two key steps: determining the target size for merging and sequentially scanning for merging. In contrast, automatic skew handling also involves two steps: the first step is detecting and determining the large skewed partitions, and the second step is splitting these large partitions into smaller ones. To achieve these two steps, Spark SQL relies on the following three configuration parameters.

Image

The first two parameters are used to determine skewed partitions. The third parameter advisoryPartitionSizeInBytes that we just learned about is used not only for merging small partitions but also for splitting skewed partitions, making it a “two-in-one” parameter.

Now let’s focus on how Spark SQL uses the first two parameters to determine large partitions.

First, Spark SQL sorts all data partitions by storage size and takes the median as the base. It then multiplies the median by the skewPartitionFactor specified ratio to obtain the determination threshold. Any data partition with a storage size greater than the determination threshold may be identified as a skewed partition.

Why do we say “may” instead of “certainly”? The reason is that the determination of skewed partitions is also subject to the constraint of the skewedPartitionThresholdInBytes parameter, which is the minimum threshold for determining skewed partitions. In other words, only those “candidate partitions” with sizes greater than the value specified by skewedPartitionThresholdInBytes will ultimately be determined as skewed partitions.

To better understand this determination process, let’s take an example. Suppose a data table salaries has three partitions with sizes of 90MB, 100MB, and 512MB respectively. Obviously, the median of these three partitions is 100MB. By multiplying it by the ratio factor skewedPartitionFactor (default value is 5), the determination threshold is calculated as 100MB * 5 = 500MB. Therefore, in our example, only the last partition with a size of 512MB will be listed as a “candidate partition”.

Next, Spark SQL compares 512MB with skewedPartitionThresholdInBytes. This parameter has a default value of 256MB.

Clearly, 512MB is much larger than 256MB. At this point, Spark SQL will finally determine the last partition as a skewed partition. Conversely, suppose we increase the value of the skewedPartitionThresholdInBytes parameter and set it to 1GB. Then the last partition will not meet the minimum threshold and will not be determined as a skewed partition.

After determining the skewed partitions, the next step is to split the large partitions based on the target size specified by advisoryPartitionSizeInBytes. For example, if we set the value of this parameter to 256MB, the previously large partition of 512MB will be split into two smaller partitions (512MB / 2 = 256MB). After splitting, the salaries table will have four partitions instead of three, and the size of each data partition will not exceed 256MB.

Key Takeaways #

Alright, that’s it for the important configurations related to Spark SQL. Today’s content was extensive, so let’s summarize together.

Firstly, we introduced the concept of the broadcast threshold. Its purpose is to automatically select the Broadcast Join strategy for join calculations when the size of the base table is smaller than the broadcast threshold.

Next, we introduced the three features of Adaptive Query Execution (AQE): Join strategy adjustment, automatic partition merging, and automatic skew handling. Unlike Spark SQL’s static optimization mechanism, AQE dynamically adjusts the execution plan at runtime based on the statistics provided by intermediate shuffle files to optimize job performance.

Join strategy adjustment refers to the process where Spark SQL dynamically adjusts the Shuffle Join strategy to Broadcast Join strategy based on the estimated size of the base table after filtering and the broadcast threshold. The following methods can be used to estimate the size of the base table:

val df: DataFrame = _
// Cache the distributed dataset first
df.cache.count

// Get the execution plan
val plan = df.queryExecution.logical

// Get the accurate estimate of dataset size in the execution plan
val estimated: BigInt = spark
  .sessionState
  .executePlan(plan)
  .optimizedPlan
  .stats
  .sizeInBytes

Automatic partition merging and automatic skew handling are used to address the issue of uneven data distribution after shuffling. The purpose of automatic partition merging is to merge small data partitions to avoid excessive task granularity and scheduling overhead. On the other hand, the purpose of automatic skew handling is to split large data partitions to prevent individual tasks from overloading and affecting the overall job performance.

Whether it’s the broadcast threshold or the various features of AQE, we can influence the optimization behavior of Spark SQL by adjusting the relevant configuration settings. To facilitate your review and reference of these configuration settings, I have compiled the following table for your convenience.

Image

Practice for Each Lesson #

In light of the reliance on Shuffle intermediate files in AQE, can you discuss the shortcomings of AQE? (Hint: Start thinking about this question from the perspective of the two computing stages of Shuffle)

Feel free to engage with me in the comments section to discuss and exchange ideas. I also encourage you to share this lesson with more colleagues and friends.