10 Optimization at a Glance Quick Reference Manual to Improve Efficiency by Halving the Effort Part 2

10 Optimization at a Glance - Quick Reference Manual to Improve Efficiency by Halving the Effort Part 2 #

Hello, I’m Wu Lei.

In the previous lecture, we discussed configuration items related to hardware resources. In this lecture, we will continue to talk about the configuration items in the Shuffle category and the Spark SQL category. We will discuss their meanings, functions, and the problems they can solve. Just like the previous lecture, all the configuration items we discuss today will revolve around Executors.

Configuration options for the Shuffle class #

First, let’s talk about the Shuffle class. Looking at the Configuration page on the Spark official website, you will find that there are very few configuration options available to adjust the performance of Shuffle execution. This is understandable because once Shuffle becomes an indispensable part of the application, there is very little we can do to optimize the performance of Shuffle itself.

However, we are not completely helpless either. We know that the calculation process of Shuffle consists of two stages: Map and Reduce. During the Map stage, the mapping logic is executed, and the intermediate data is written to the local disk according to the partitioning rules of the Reducer. During the Reduce stage, data shards are downloaded from various nodes and aggregated as needed.

Therefore, we can adjust the size of the read-write buffer in the Map stage and Reduce stage using the configuration options spark.shuffle.file.buffer and spark.reducer.maxSizeInFlight, respectively. So, how can we do this? Let’s take a look one by one.

First, in the Map stage, the computation results are written to the disk file system in the form of intermediate files. At the same time, to avoid frequent I/O operations, Spark stores the intermediate files in the write buffer. At this time, we can increase the size of the write buffer by setting spark.shuffle.file.buffer. The larger the buffer, the more data can be cached before being flushed to disk, which reduces the number of disk flushes and improves overall I/O efficiency.

Second, in the Reduce stage, because Spark fetches intermediate files from disks of different nodes through the network, the files are temporarily stored in the read buffer of the compute nodes as data blocks. The larger the buffer, the more data blocks can be stored. With the same amount of data, fewer network requests are needed to fetch the data, resulting in higher network throughput and better network I/O efficiency. At this time, we can control the size of the Reduce side buffer by using the configuration option spark.reducer.maxSizeInFlight to adjust the network load during the Shuffle process.

In fact, optimizing the Shuffle calculation process involves all hardware resources, including CPU, memory, disk, and network. Therefore, the configuration options mentioned in the previous lecture regarding CPU, memory, and disk can also be applied to the in-memory computation process of the Map and Reduce stages.

In addition, Spark also provides a configuration option called spark.shuffle.sort.bypassMergeThreshold to handle a special Shuffle scenario.

Since version 1.6, Spark has adopted the Sort shuffle manager to manage Shuffle operations. Under the management mechanism of Sort shuffle manager, the Shuffle calculation process introduces sorting operations in both the Map and Reduce stages, regardless of whether the calculation results themselves need to be sorted.

This implementation mechanism is not fair to operations such as repartition and groupBy, as these two operators either re-partition the original dataset or group the dataset without the need for sorting. Therefore, the sorting step introduced by the Sort shuffle manager implementation mechanism becomes an additional computational overhead.

Therefore, in computing scenarios where neither aggregation nor sorting is required, we can change the parallelism of the Reduce stage by setting the parameter spark.shuffle.sort.bypassMergeThreshold (default value is 200). When the number of partitions on the Reduce side is less than this setting value, we can avoid the introduction of sorting in the Shuffle calculation process.

Spark SQL Major Configuration Options #

Next, let’s talk about the related configuration options for Spark SQL. In the Configuration page of the official website, there are quite a few configuration options under Spark SQL, among which the three features introduced by AQE (Adaptive query execution) contribute the most to performance improvement, namely automatic partition merging, automatic data skew handling, and join strategy adjustment. Therefore, when it comes to Spark SQL configuration options, let’s focus on these three features.

First of all, we need to know that the AQE feature is disabled by default, and to use these features, we need to enable AQE through the configuration option spark.sql.adaptive.enabled, as shown below:

Because we have discussed the principles of these three features in the previous lecture, I will briefly review them here, and then we will focus on the corresponding configuration options for these features.

The scenario for partition merging can be summarized as follows: during the shuffle process, due to uneven data distribution, there are numerous small partitions in the Reduce phase, and the amount of data in these small partitions is very small, resulting in high scheduling cost.

So, how does AQE determine whether a partition is small enough and needs to be merged? Also, since multiple partitions are being merged, there naturally exists a convergence condition. If merging continues indefinitely, the entire distributed dataset will eventually be merged into a super large partition. In other words, “where does partition merging start, and where does it end?”

Let’s look at how AQE’s partition merging works. As shown in the above figure, for all data partitions, regardless of their size, AQE scans them from left to right according to their partition numbers. While scanning, AQE records the partition size, and when the sum of the sizes of adjacent partitions exceeds the “target size”, AQE merges these scanned partitions. Then, it continues scanning to the right and uses the same algorithm to merge the remaining partitions according to the target size, until all partitions have been processed.

In summary, AQE does not determine in advance which partitions are small enough, but scans according to partition numbers and merges them when the scan size exceeds the “target size”. We can see that the key in this process lies in determining the “target size”, which determines the degree of dispersion of the distributed dataset after merging.

So, what determines the “target size”? Spark provides two configuration options to jointly determine the “target size” for partition merging: spark.sql.adaptive.advisoryPartitionSizeInBytes and spark.sql.adaptive.coalescePartitions.minPartitionNum.

Among them, the first parameter, advisoryPartitionSizeInBytes, is the target size recommended by developers, and the meaning of the second parameter, minPartitionNum, is the minimum number of partitions after merging. If it is set to 200, it means that the number of partitions after merging cannot be less than 200. The purpose of this parameter is to avoid low parallelism leading to insufficient utilization of CPU resources.

Combining the size of the dataset after shuffle and the minimum partition number limit, we can infer the average size of each partition, which we will temporarily refer to as #partitionSize. The target size for partition merging is the minimum value between advisoryPartitionSizeInBytes and #partitionSize.

This may sound abstract, so let’s take an example. Suppose the data size after shuffle is 20GB, and minPartitionNum is set to 200. By extrapolation, the size of each partition is 20GB / 200 = 100MB. Let’s also assume that advisoryPartitionSizeInBytes is set to 200MB. The final target partition size is the minimum value between (100MB, 200MB), which is 100MB. So you see, Spark does not completely follow your advice on advisoryPartitionSizeInBytes, and we also need to consider the setting of minPartitionNum.

Now let’s talk about data skew. In the scenario of data joins, when AQE detects skewed data partitions, it will automatically perform splitting operations to divide large partitions into multiple smaller partitions, thereby avoiding excessive data processing load in a single task. However, the AQE introduced in Spark 3.0 version can currently only handle data skew automatically in Sort Merge Join, and other join implementations such as Shuffle Join are still to be supported.

So, how does AQE determine whether a data partition is skewed? And how does it split a large partition into multiple smaller partitions?

Firstly, the partition size must be greater than the value set for the spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes parameter in order to be identified as a skewed partition. Then, AQE will calculate and sort the sizes of all data partitions, and take the median as the amplification factor. Partitions that are larger than a certain multiple of the median size will be identified as skewed partitions, with the multiple controlled by the spark.sql.adaptive.skewJoin.skewedPartitionFactor parameter.

Next, let’s understand this with an example. Assume that table A has 3 partitions with sizes of 80MB, 100MB, and 512MB respectively. Clearly, the median size after sorting these partitions is 100MB. Since the default value of skewedPartitionFactor is 5, only partitions larger than 100MB * 5 = 500MB have the possibility of being identified as skewed partitions. In our example, only the last partition with a size of 512MB meets this condition.

At this point, Spark cannot yet determine whether it is a skewed partition or not. It also depends on the value of the skewedPartitionThresholdInBytes configuration item, which has a default value of 256MB. For partitions that satisfy the median condition, they must be larger than 256MB for Spark to finally identify them as skewed partitions. For example, if the skewedPartitionThresholdInBytes is set to 1GB, Spark will not consider the 512MB partition as a skewed partition in our example, and it will not benefit from AQE’s optimization for data skew.

Once a skewed partition is detected, the next step is to split it, and the advisoryPartitionSizeInBytes parameter is used during the split process. Let’s say we set the value of this parameter to 256MB. Then, the skewed partition of 512MB will be split into multiple parts with a granularity of 256MB. Therefore, this large partition will be split into 2 smaller partitions (512MB / 256MB = 2). After splitting, the original table with 3 partitions becomes 4 partitions, and the size of each partition is no greater than 256MB.

Finally, let’s talk about data joins. Data joins can be considered the most common operation in data analysis. The adjustment of join strategies in Spark SQL actually refers to downgrading join methods that involve shuffling, such as Hash Join and Sort Merge Join, to Broadcast Join.

The essence of Broadcast Join is to “use the small to conquer the big”. It broadcasts the full data of the small table to all executors in the cluster, so the data of the large table does not need to be shuffled based on join keys, and can be joined with the small table data directly in place. Broadcast Join leverages the broadcast cost of the small table to achieve significant performance benefits by eliminating the shuffle of the large table. It can be said that Broadcast Join has leveraged the “leverage principle” to the extreme.

Before the release of AQE, developers could use the spark.sql.autoBroadcastJoinThreshold configuration item to actively downgrade join operations. The default value of this parameter is 10MB. If the size of either table involved in the join is smaller than 10MB, the join operation between the two can be downgraded to Broadcast Join. To fully leverage the advantages of “use the small to conquer the big” in Broadcast Join, you can consider increasing the value of this parameter, typically around 2GB.

However, although the autoBroadcastJoinThreshold parameter is useful, there are two shortcomings that can be problematic.

First, it has poor reliability. Even if the developer sets the broadcast threshold explicitly, and the size of the small table is within the threshold, Spark may still misjudge the size of the small table, resulting in the failure of the Broadcast Join downgrade.

Second, setting the broadcast threshold in advance is a static optimization mechanism, and it cannot dynamically adjust the join operation during runtime. A typical example is when both tables do not satisfy the broadcast threshold during the logical optimization stage, Spark SQL will choose Shuffle Joins in the physical planning stage. However, during runtime, it is possible that one of the tables has a remaining data size that is small enough to be downgraded to Broadcast Join. In this case, the static optimization mechanism is powerless.

AQE solves these two troublesome issues very well. Firstly, AQE’s dynamic join strategy adjustment is a dynamic optimization mechanism. For the aforementioned two large tables, AQE will dynamically calculate the remaining data size after the table completes the filtering operation. When the data size meets the broadcast condition, AQE will dynamically adjust the logical execution plan and downgrade Shuffle Joins to Broadcast Join in the new logical plan. Moreover, the runtime data estimation is much more accurate than at compile-time, so AQE’s dynamic join strategy adjustment is more reliable and stable compared to static optimizations.

However, there is a prerequisite to enable dynamic join strategy adjustment, which is to meet the limitation of the nonEmptyPartitionRatioForBroadcastJoin parameter. The default value of this parameter is 0.2, which means the non-empty data partition ratio of the large table after filtering must be less than 0.2 to trigger the successful downgrade to Broadcast Join.

This may sound a bit complicated, so let’s take an example. Assume that before filtering, the large table has 100 partitions, and after the Filter operation, 85 partitions have become empty partitions because their data does not meet the filter condition, while the remaining 15 partitions still contain data that satisfies the filter condition. In this case, the non-empty partition ratio of the filtered large table is 15 / 100 = 15%, and since 15% is less than 0.2, the large table in this example will successfully trigger the downgrade to Broadcast Join.

On the contrary, if the non-empty partition ratio of the large table after filtering is greater than 0.2, then even if the remaining data size is small, AQE will not downgrade the Shuffle Joins to Broadcast Join. Therefore, if you want to fully leverage the advantages of Broadcast Join, you can consider increasing this parameter appropriately.

Summary #

In today’s lecture, we delved into two categories of configuration options: Shuffle class and Spark SQL. We also discussed the problems that each configuration option can solve.

For the Shuffle class, it is important to know that we can avoid unnecessary computational overhead during the Shuffle process by controlling the spark.shuffle.sort.bypassMergeThreshold parameter for operations that do not require sorting and aggregation.

For the Spark SQL category, we need to understand that Adaptive Query Execution (AQE) is disabled by default. To make full use of the three features provided by AQE - automatic partition merging, data skew handling, and join strategy adjustment - we need to set spark.sql.adaptive.enabled to true.

In addition, each of AQE’s features has its own corresponding configuration option that needs to be adjusted separately.

  • The automatic partition merging process in AQE is different from what we might expect. Instead of determining in advance which partitions are small enough, AQE scans partitions based on their partition numbers and performs a merge when the scanned amount exceeds the “target size”. The “target size” is determined by the advisoryPartitionSizeInBytes and coalescePartitions.minPartitionNum parameters.

  • AQE can automatically handle data skew issues in Sort Merge Join scenarios. It detects skewed partitions based on the median size of all partitions and the skewedPartitionFactor, and then splits the skewed partitions using the advisoryPartitionSizeInBytes as granularity.

  • Dynamic join strategy adjustment in AQE can downgrade Shuffle Joins to Broadcast Joins at runtime. The runtime data estimation is more accurate than compile-time estimation, making it more reliable than static optimization. However, it is important to note that the percentage of non-empty partitions after Shuffle must be smaller than nonEmptyPartitionRatioForBroadcastJoin to trigger the downgrade optimization.

Well, after studying these two lectures, we have summarized all the configuration options in Spark that are closely related to performance tuning. To facilitate quick reference, I have combined them into a table in a document. I hope you can make good use of it in your work.

Performance Tuning Configuration Options

Daily Practice #

  1. The partition merge algorithm of AQE seems a bit simple and rude. If you were asked to reimplement the partition merge feature, what ideas would you have?
  2. Regarding the mechanism for handling data skew in AQE, what potential risks do you think there are?

Looking forward to seeing your thoughts and answers in the comment section. You are also welcome to share this optimization manual with your friends. See you in the next lecture!