29 Large Table Joins Large Table Ii What Is the Negative Scalability Resistance Tuning Approach

29 Large Table Joins Large Table II - What is the Negative Scalability Resistance Tuning Approach #

Hello, I am Wu Lei.

In the previous lecture, we discussed the first optimization strategy for dealing with “Big Table Join Big Table,” which is to divide and conquer, that is, transforming a large and complex Shuffle Join into multiple lightweight Broadcast Joins. In this lecture, we will continue with the second optimization strategy: reluctant resistance.

Reluctant resistance refers to the situation where the inner table cannot be evenly split, or the outer table does not have a partition key and cannot leverage Dynamic Partition Pruning (DPP), and can only rely on Shuffle Join to accomplish the join between large tables. In such cases, there are various optimization methods and approaches that can be employed. These methods are relatively complex and are applicable in different scenarios. From the perspective of data distribution, we can discuss them in two common situations: uniform data distribution and data skewness.

Let’s start with how to deal with the calculation scenario of “Big Table Join Big Table” when the data distribution is uniform.

Uniform Data Distribution #

In the last lecture, we mentioned that when the following conditions are satisfied between the large table and small table involved in the join, the execution efficiency of Shuffle Hash Join is often better than the default Shuffle Sort Merge Join in Spark SQL.

  • The data distribution of the two tables is uniform.
  • All data shards of the inner table can be fully loaded into memory.

In fact, this optimization technique also applies to the scenario of “join between two large tables”. The reason is simple: these two conditions are unrelated to the size of the data tables, but only depend on whether the data distribution is uniform. However, to ensure the stability of the Shuffle Hash Join calculation, we need to pay special attention to the second condition mentioned above, which is that all data shards of the inner table can be loaded into memory.

So the question is, how can we ensure that the second condition is met? In fact, as long as we handle the relationship between parallelism, concurrency, and execution memory well, we can make each data shard of the inner table fit perfectly into the execution memory. In simple terms, we first calculate the upper and lower limits of memory consumption for each task based on parallelism and execution memory, and then combine the distributed dataset size with the limits to infer the matching parallelism. For more detailed content, you can refer to Lecture 14.

Now, how can we force Spark SQL to choose the Shuffle Hash Join mechanism at runtime? The answer lies in using Join Hints. We have talked about this technique many times before, so here, I will directly use the query from the previous lecture as an example and write down the usage method below for your review.

-- Using Join Hints in the query statement
select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as revenue, o.orderId
from transactions as tx inner join orders as o
on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date between '2020-01-01' and '2020-03-31'
group by o.orderId

Data Skewness #

Next, let’s talk about how to deal with the “join between big tables” calculation scenario when there is data skewness between the two tables. For the data skewness problem in the “join between big tables”, we can discuss three different scenarios based on the skewness location.

In fact, no matter which table is skewed, their optimization techniques are similar. Therefore, let’s take the first scenario as an example, where the outer table is skewed and the inner table is evenly distributed, to discuss the methods to deal with data skewness.

Solving Data Skewness at the Task Level #

After learning about AQE, when dealing with data skewness, you might quickly think of AQE’s feature: automatic skew handling. With the following configuration parameters, Spark SQL can insert the OptimizeSkewedJoin strategy into the physical plan at runtime, automatically handling data skewness during joins.

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor: determines the skewness amplification factor.
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: determines the minimum threshold for skewness.
  • spark.sql.adaptive.advisoryPartitionSizeInBytes: defines the splitting granularity in bytes.

The automatic skew handling during the join process is shown in the above figure. When AQE detects skewed partitions in the outer table, it splits the skewed partitions into multiple data partitions using the value defined by spark.sql.adaptive.advisoryPartitionSizeInBytes as the splitting granularity. At the same time, AQE also needs to replicate the corresponding data partitions in the inner table to preserve the relationship between the two tables.

With AQE’s automatic skew handling feature, we can indeed save a lot of development costs when dealing with data skewness. However, there is no such thing as a free lunch. AQE’s skew handling is at the task level, which means that the load imbalance between Executors is not fundamentally improved. What does this mean?

Let’s take an example. Suppose a table has two skewed partitions after shuffling, as shown in the figure above, and they happen to be shuffled to the same Executor: Executor 0. With AQE’s automatic skew handling mechanism, the two skewed partitions are split into four moderately sized data partitions. As a result, the computation load of all tasks in Executor 0 is balanced. However, compared to Executor 1, the overall computation load of Executor 0 is still the same and is not alleviated by AQE’s automatic handling.

Solving Data Skewness at the Executor Level #

You might say, “It’s not very likely for all skewed partitions to land on the same Executor, right?” Indeed, the previous example was mainly used to explain the concept of skewness granularity. If the skewed partitions are evenly distributed in the cluster in practical applications, AQE’s automatic skew handling mechanism is indeed a “magic potion” for developers.

However, there is always a possibility that we should consider when discussing optimization strategies: what if the skewed partitions happen to land on a few Executors in the cluster, just like in our example? In this case, how do you handle it? The answer is “divide and conquer” and “two-stage shuffle”.

The “divide and conquer” here is consistent with the divide and conquer approach mentioned in the previous lecture, both of which solve complex problems by decomposing them into subtasks. The difference is that today we are going to decompose the tasks based on whether the join keys are skewed. Specifically, for all join keys in the outer table, we divide them into two groups based on whether they have skewness. One group consists of join keys with skewness, and the other group consists of join keys with even distribution. Correspondingly, we split the data in the inner table into two parts as well.

The meaning of “divide and conquer” is that for the two groups of different data in the inner and outer tables, we use different methods to perform join calculations. Then, through the Union operation, we merge the results of the two join calculations to obtain the final result of the “join between big tables”. The entire process is shown in the above figure.

For the data part with even distribution of join keys, we can use the method of transforming Shuffle Sort Merge Join into Shuffle Hash Join. For the data part with skewed join keys, we need to use the optimization technique of “two-stage shuffle” to balance the workload between Executors. So, what is “two-stage shuffle”?

Understanding “Two-Stage Shuffle” #

To put it simply, “two-stage shuffle” refers to the calculation process of “salting, shuffling, joining, and aggregating” and “unsalting, shuffling, and aggregating” in two stages. It balances the computation load at the Executor level within the cluster without breaking the original association.

Let’s first talk about the first stage, which includes the four steps of “salting, shuffling, joining, and aggregating”. Clearly, the most critical step in this stage is salting. The salting here comes from the word “Salting”, which sounds mysterious but is actually adding suffixes to skewed join keys. The core purpose of salting is to disperse the originally concentrated skewed join keys. When performing Shuffle operations, the skewed data that should have been distributed to a single Executor is evenly distributed among multiple Executors in the cluster through salting, thus eliminating skewness and balancing the computation load between Executors.

Improving Performance with Salted Shuffle #

For the salted shuffle operation, the first step is to determine the granularity of the salt to control the degree of data scattering. The higher the granularity, the more dispersed the data will be after salting. Since the purpose of salting is to balance the calculation load on Executors, it is usually a good choice to use the total number of Executors (#N) as the granularity of salting.

Next, in order to maintain the association between the outer and inner tables, both tables need to be salted at the same time, but the processing methods are slightly different.

The processing of the outer table is called “random salting”. The specific operation method is to add a random suffix between 1 and #N to any skewed Join Key. For example, let’s take the Join Key = ‘黄小乙’ as an example. Assuming N = 5, after salting the outer table, all the original data records with Join Key = ‘黄小乙’ will be scattered into data records with Join Keys as (‘黄小乙_1’, ‘黄小乙_2’, ‘黄小乙_3’, ‘黄小乙_4’, ‘黄小乙_5’).

Random Salting

The processing of the inner table is called “replicated salting”. The specific operation method is to make copies of the original data (#N - 1) times for any skewed Join Key, resulting in #N copies of data. For each copy, we append a fixed suffix between 1 and #N to its Join Key to keep it consistent with the scattered data in the outer table. In the example of Join Key = ‘黄小乙’, we need to make 4 copies of the data in the inner table and append the suffixes between 1 and 5 to each copy’s Join Key, as shown in the following figure.

Replicated Salting

After salting the inner and outer tables separately, the data skew problem is eliminated. At this point, we can use conventional optimization methods, such as converting Shuffle Sort Merge Join to Shuffle Hash Join, to continue with the Shuffle, join, and aggregation operations. At this point, the first stage of “Two-Stage Shuffle” is completed and we obtain preliminary aggregation results, which are calculated based on scattered Join Keys.

First Stage of Shuffle

As mentioned earlier, the purpose of the first stage of salting is to scatter the data and balance the calculation load. Now that we have obtained the preliminary aggregation results after data scattering, we are only one step away from the final calculation result. However, in order to restore the original calculation logic, we need to remove the “salt granules” that were added earlier.

Second Stage of Shuffle

The second stage of calculation includes three steps: “desalting, Shuffle, and aggregation”. First, we remove the suffix of each Join Key, which is called “desalting”. Then, we perform Shuffle and aggregation calculations again based on the original Join Keys. The result of this step is the calculation result for the skewed part in the “Divide and Conquer” approach.

After optimizing with “Two-Stage Shuffle”, we finally obtain the association results for the skewed part. By merging this part of the results with the calculation results for the uniform part in the “Divide and Conquer” approach, we can complete the calculation scenario of “Joining Large Tables” with skew issues.

Optimizing with Executors as Granularity: Practical Examples #

It can be said that the optimization process of balancing the calculation load with Executors as granularity is the most complex among the optimization techniques we have learned. Therefore, it is necessary to explain the specific implementation methods in detail in combination with actual application cases. To facilitate the comparison of different optimization methods, let’s take the cross-border e-commerce scenario discussed in the previous lesson as an example.

Let’s first recall the business requirements of this e-commerce company. Given two fact tables, orders and transactions, both of which have a volume of TB-level data, the company needs to calculate the total transaction amount of all orders in the previous quarter periodically. The specific business code is as follows.

// Code for calculating order revenue
val txFile: String = _
val orderFile: String = _

val transactions: DataFrame = spark.read.parquent(txFile)
val orders: DataFrame = spark.read.parquent(orderFile)

transactions.createOrReplaceTempView(transactions)
orders.createOrReplaceTempView(orders)

val query: String = 
select sum(tx.price * tx.quantity) as revenue, o.orderId
from transactions as tx 
inner join orders as o
on tx.orderId = o.orderId
where o.status = COMPLETE
and o.date between 2020-01-01 and 2020-03-31
group by o.orderId


val outFile: String = _
spark.sql(query).save.parquet(outFile)

To implement the optimization process mentioned earlier for this query, we first follow the principle of “divide and conquer” and split the data of the inner and outer tables into two parts. The first part contains all the Join Keys and their corresponding Payloads that have skewness issues, while the second part consists of evenly distributed Join Keys and their Payloads. Let’s assume we save all skew orderId, which is the Join Key, in the array skewOrderIds, and keep the evenly distributed orderId in the array evenOrderIds. We can use these two arrays to split the inner and outer tables separately.

// Split the inner and outer tables based on whether the Join Keys are skewed or evenly distributed
import org.apache.spark.sql.functions.array_contains

// Split the Join Keys into two groups: skewed and evenly distributed
val skewOrderIds: Array[Int] = _
val evenOrderIds: Array[Int] = _

val skewTx: DataFrame = transactions.filter(array_contains(lit(skewOrderIds),$"orderId"))
val evenTx: DataFrame = transactions.filter(array_contains(lit(evenOrderIds),$"orderId"))

val skewOrders: DataFrame = orders.filter(array_contains(lit(skewOrderIds),$"orderId"))
val evenOrders: DataFrame = orders.filter(array_contains(lit(evenOrderIds),$"orderId"))

Once the split is done, we can continue with the principle of “divide and conquer” and apply different optimization techniques to each part. For the evenly distributed part, we convert Shuffle Sort Merge Join into Shuffle Hash Join.

// Register the evenly distributed data as temporary tables
evenTx.createOrReplaceTempView("evenTx")
evenOrders.createOrReplaceTempView("evenOrders")

val evenQuery: String = """
select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as revenue, o.orderId
from evenTx as tx inner join evenOrders as o
on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date between '2020-01-01' and '2020-03-31'
group by o.orderId
"""
val evenResults: DataFrame = spark.sql(evenQuery)

For the skewed part, we bring out the killer move of “Two-Stage Shuffle”. First, in the first stage, we need to add salt to both tables. We apply “random salt” to the outer table (transactions table) and “replicated salt” to the inner table (orders table).

import org.apache.spark.sql.functions.udf

// Define a UDF to get random salt grains
val numExecutors: Int = _
val rand = () => scala.util.Random.nextInt(numExecutors)
val randUdf = udf(rand)

// Salt the tables in the first stage. Note: Keep the orderId field for later destaging in the second stage.

// Random salt for the outer table
val saltedSkewTx = skewTx.withColumn("joinKey", concat($"orderId", lit("_"), randUdf()))

// Replicated salt for the inner table
var saltedskewOrders = skewOrders.withColumn("joinKey", concat($"orderId", lit("_"), lit(1)))
for (i <- 2 to numExecutors) {
  saltedskewOrders = saltedskewOrders union skewOrders.withColumn("joinKey", concat($"orderId", lit("_"), lit(i)))
}

After adding salt to both tables, we can use similar query statements as before to execute subsequent Shuffle, Join, and Aggregation operations.

// Register the salted data as temporary tables
saltedSkewTx.createOrReplaceTempView("saltedSkewTx")
saltedskewOrders.createOrReplaceTempView("saltedskewOrders")

val skewQuery: String = """
select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as initialRevenue, o.orderId, o.joinKey
from saltedSkewTx as tx inner join saltedskewOrders as o
on tx.joinKey = o.joinKey
where o.status = 'COMPLETE'
and o.date between '2020-01-01' and '2020-03-31'
group by o.joinKey
"""
// Preliminary results after the first stage of salting, Shuffle, Join, and Aggregation
val skewInitialResults: DataFrame = spark.sql(skewQuery)

After getting the preliminary results of the first stage, we can start the second stage of computation, which includes “destaging, Shuffle, and Aggregation”. The purpose of destaging is actually to restore the calculation granularity from the salted joinKey to the original orderId. Since we kept the orderId field during the initial salting, in the second stage computation, we only need to perform aggregation on the orderId field to achieve the desired “destaging” effect.

val skewResults: DataFrame = skewInitialResults.select("initialRevenue", "orderId")
  .groupBy(col("orderId")).agg(sum(col("initialRevenue")).alias("revenue"))

After completing the second stage computation, we have the results of the “Two-Stage Shuffle”. Finally, all we need to do is merge these results with the previously obtained results from the evenly distributed part to achieve the optimization process of balancing the computation load at the Executors granularity.

evenResults union skewResults

Trade-off Between Execution Performance and Development Cost #

You may say, “Oh my god! It requires so much development cost to optimize the computation for this scenario! With all the divide and conquer and the Two-Stage Shuffle, is it really worth the investment?”

This is a very good question. We need to clarify that the purpose of the divide and conquer technique along with the Two-Stage Shuffle optimization technique is to solve the issue where AQE cannot balance the computation load at the Executors granularity. Therefore, the key to deciding whether to use this technique lies in whether the load imbalance between Executors becomes a performance bottleneck for the entire join computation. If the answer to this question is yes, then the investment is worth it.

Summary #

In today’s lecture, you need to master the approach of using Shuffle Join to deal with the computation scenario of “joining large tables with large tables”. The method of dealing with different data distributions also varies.

When the data distribution of the two tables involved in the join is relatively uniform, and the data partitions of the inner table can be completely fit into memory, the computational efficiency of Shuffle Hash Join is often higher than that of Shuffle Sort Merge Join, which is the default join mechanism in Spark SQL. You can use the “shuffle_hash” join hints to force Spark SQL to choose the Shuffle Hash Join implementation mechanism at runtime. If the data partitions of the inner table cannot fit into memory, you can combine the tuning techniques of “the three pillars” to adjust the parallelism, concurrency, and execution memory parameters to meet this prerequisite condition.

When there is data skew in the two tables involved in the join, if the skew is balanced between the Executors in the cluster, the best approach is to use the automatic skew handling mechanism provided by AQE (Adaptive Query Execution). You only need to set the following three parameters and let AQE handle the rest:

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor, the skew factor to determine the skew.
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes, the minimum threshold to determine the skew.
  • spark.sql.adaptive.advisoryPartitionSizeInBytes, the splitting granularity in bytes.

However, if the skew problem is concentrated in a few Executors, and these heavily loaded Executors have become performance bottlenecks, we need to use the tuning techniques of “divide and conquer” plus “two-stage Shuffle” to deal with the problem. “Divide and conquer” means dividing the data of the inner and outer tables into two parts based on the skew of the Join Keys and processing them separately. Among them, the uniform part can be computed using Shuffle Hash Join, while the skewed part needs to be processed using “two-stage Shuffle”.

The key to the two-stage Shuffle lies in salting and desalting. The purpose of salting is to scatter data distribution and balance the computation load between Executors, thus eliminating the bottleneck of Executors. The purpose of desalting is to restore the original join logic. Although the development cost of the two-stage Shuffle is high, as long as the performance gain is significant enough, our investment is worthwhile.

Daily Practice #

  1. When attempting to use the skewness of join keys as the basis for “divide and conquer” partitioning, what criteria do you think we should use to divide join keys into skewed groups and non-skewed groups?

  2. Whether it is the automatic skew handling of AQE or the developer’s “two-stage Shuffle”, both fundamentally rely on the two-step process of “salting” and “desalting” to balance the computational load at different granularities while maintaining the association. So, are these optimization techniques of “salting” and “desalting” applicable to all join scenarios? If not, what are the scenarios that cannot leverage AQE’s automatic skew handling or our “two-stage Shuffle”?

Looking forward to seeing your thoughts and answers in the comments section. See you in the next lecture!