28 Large Table Joins Large Table I What Is the Divide and Conquer Tuning Approach

28 Large Table Joins Large Table I - What is the Divide and Conquer Tuning Approach #

Hello, I am Wu Lei.

In the previous lesson, we discussed the optimization approach and methods for joining large tables with small tables. Now, besides joining a large table with a small table, are there scenarios in the field of data analysis where we join two large tables? Indeed, there are. This refers to joining two large fact tables, where the sizes are within a 3x difference and cannot be fit into broadcast variables.

However, generally speaking, in the field of data analysis, it is highly discouraged to join one large table with another. In fact, it can be said without hesitation that “joining large tables” is a grave mistake in data analysis. If you have to resort to “joining large tables” to achieve business logic and complete data analysis, it indicates that the data warehouse was not well-designed or the developers did not consider the long-term implications.

Nevertheless, you may argue, “When I joined the company, the data warehouse had already been established, so it’s not my fault. I can only follow the existing practices.” In order to deal with this situation, today’s lesson will discuss the optimization approaches and techniques when you are forced to join large tables.

To address the computation scenario of joining large tables, we have two main optimization approaches. One is called “divide and conquer,” and the other I collectively refer to as “resistance to the end.” In today’s lesson, we will focus on “divide and conquer,” while we will delve into “resistance to the end” in the next lesson.

It is worth mentioning that even if you don’t need to deal with the hot potato of “joining large tables,” it is still highly valuable to deepen our understanding of the optimization approaches and methods involved in “divide and conquer” and “resistance to the end.” This is because the transferability of these approaches and methods is incredibly strong. After learning them, you will realize that they can be effectively applied to other application scenarios.

Without further ado, let’s dive into today’s lesson!

How to understand “divide and conquer”? #

The optimization approach of “divide and conquer” is to downgrade the scenario of “joining big tables with big tables” to “joining big tables with small tables” and then use the optimization method of “joining big tables with small tables” from the previous lecture to solve the performance problem. Its core idea is to break down a complex task into multiple simple tasks and then merge the calculation results of multiple simple tasks. So how does the scenario of “joining big tables with big tables” apply the calculation idea of “divide and conquer”?

First, we need to differentiate the outer table and the inner table based on the size of the two tables. Generally, the inner table is the smaller one. Then, we artificially add filtering conditions to the inner table, dividing it into multiple non-overlapping complete subsets. Next, we associate the outer table with these subsets one by one to obtain partial calculation results. Finally, we use the Union operation to merge all the partial results together to obtain the complete calculation result, which is the end-to-end association calculation. The entire calculation process of “divide and conquer” is as follows:

How to ensure that the granularity of internal table splitting is fine enough? #

The core purpose of using the “divide and conquer” strategy is to transform “joining big tables” into “joining big tables with small tables”. Therefore, a key step in “divide and conquer” is internal table splitting, where we require each sub-table to have a relatively uniform size and be small enough to fit into a broadcast variable. Only in this way can the original Shuffle Join be transformed into broadcast joins one by one, and the massive data shuffle can be eliminated, allowing us to enjoy the benefits of performance tuning. On the other hand, if the internal table splitting does not satisfy the above conditions, we would be working for nothing.

The key to splitting lies in the selection of columns to be split. In order to make the sub-tables small enough, the cardinality of the splitting columns has to be large enough. This statement might be abstract, so let’s give a few examples. Let’s say the splitting column of the internal table is “gender”, with a cardinality of 2 and values being “male” and “female”. If we split the internal table into two based on the filtering conditions “gender = male” and “gender = female”, it is obvious that the resulting sub-tables are still large and far exceed the broadcast threshold.

You might say, “Since the cardinality of gender is so low, why not choose a column with a large cardinality like ID card number?” Indeed, the ID card number has a sufficiently large cardinality, which is the total population of the country. However, filtering based on ID card numbers, which are large strings, has two disadvantages: first, it is not easy to split, resulting in high development costs; second, filtering based on such conditions is difficult to benefit from internal optimization mechanisms like predicate pushdown in Spark SQL.

Since low cardinality is not good and high cardinality is not good either, what kind of cardinality is appropriate? In most data warehouse scenarios, there are time-related fields on the fact table, such as date or even more detailed timestamps. This is also why many fact tables are partitioned and stored based on dates during table creation. Therefore, choosing a date as the splitting column is often a good choice, as it allows us to enjoy the performance benefits of Spark SQL’s partition pruning while keeping the development costs low.

How to Avoid Repetitive Scanning of External Tables? #

After splitting the internal table, the external table needs to be associated with each child table, although each association becomes “Joining a Big Table to a Small Table” and is transformed into BHJ. However, in the running mechanism of Spark, each association calculation requires a re-scan of the full data of the external table. Undoubtedly, this kind of operation is unacceptable. This is another key step in “divide and conquer”: repetitive scanning of external tables.

Let’s take the above figure as an example. The internal table is split into four parts. Originally, the Shuffle Join of two big tables is transformed into four Broadcast Joins. The external table is associated with each of the four child tables, and all the associated result sets are finally merged together through Union to complete the calculation. For these four associations, each calculation requires a full scan of the external table from the beginning. In other words, the external table will be scanned four times. Obviously, the number of scans of the external table depends on the number of partitions of the internal table.

We just mentioned that the splitting of the internal table needs to be fine enough to enjoy the benefits of performance optimization. This often means that the number of splits of the internal table is in the hundreds, or even thousands or tens of thousands. Under such a magnitude, the cost of repetitive scanning of the external table is enormous.

To solve the problem of data repetitive scanning, there are actually more than one method. The easiest one we can think of is caching. Indeed, if we can cache the full data of the external table in memory, we don’t need to worry about repetitive scanning, after all, the computational latency of memory is much lower than that of disk. However, the situation we face is that the amount of data in the external table is very huge, often starting from the level of TB. To cache all TB-sized data into memory requires our computing cluster to have strong enough resource configuration. To put it bluntly, you need enough budget to configure a large enough memory.

If the cluster is not that powerful and the boss doesn’t give a budget to expand the cluster’s memory, what should we do?

We still need to follow the “divide and conquer” idea. If the internal table can be “divided and conquered,” why can’t the external table? For each sub-association that the external table participates in, logically, we can completely scan only the external table data related to the sub-table of the internal table, and there is no need to scan the full data of the external table every time. In this way, in terms of the effect, the full data of the external table is only scanned once. You may say, “It sounds easy in words, there is no problem logically, but how do we actually achieve the ‘divide and conquer’ for the external table?”

If it were in the past, there would be little room for operation, but after learning about the DPP mechanism in Spark 3.0, we can use DPP to “divide and conquer” the external table.

Assuming that the partition key of the external table contains Join Keys, then each sub-table of the internal table can help reduce the amount of data scanned by the associated external table through the DPP mechanism. As shown in the figure above, steps 1, 2, 3, and 4 represent the association calculations between the external table and four different sub-tables. Taking step 1 as an example, with the help of the DPP mechanism, to complete the association calculation, the external table only needs to scan the partition data corresponding to the green sub-table, as shown in the two green partitions in the figure. Similarly, to complete the association calculation of step 4, the external table only needs to scan the partition corresponding to the purple sub-table, as shown in the two data partitions marked in purple on the left side of the figure.

It is not difficult to see that each sub-query only scans a part or a subset of the external table, and all these subsets add up to exactly the full data of the external table. Therefore, by using the optimization technique of “divide and conquer,” the end-to-end association calculation only needs to make a complete full scan of the external table. In this way, after transforming the original Shuffle Join into multiple Broadcast Joins, we have not introduced any additional performance overhead. Undoubtedly, after the query is optimized in this way, the execution efficiency will be greatly improved.

However, you may ask, “After talking for so long, these are just some ideas and theories. How do you write the code to implement ‘divide and conquer’?” Next, let’s combine a small example to practice the optimization ideas of “divide and conquer.”

Practical Example of “Divide and Conquer” Optimization Strategy #

This practical example comes from a cross-border e-commerce platform that deals with large-scale assembled equipment traded worldwide. The components of these equipment come from different suppliers in different regions around the world, so one equipment order often includes multiple item details. This e-commerce platform uses two tables, “orders” and “transactions,” to record the orders and transaction details, with the key fields as shown in the following table:

// Key fields of the "orders" table
orderId: Int
customerId: Int
status: String
date: Date //Partition key

// Key fields of the "lineitems" table
orderId: Int //Partition key
txId: Int
itemId: Int
price: Float
quantity: Int

Both “orders” and “transactions” are fact tables with a size on the TB scale. Based on these two fact tables, the e-commerce platform calculates the transaction amount of all orders from the previous quarter every once in a while. The code implementation for this is as follows:

// Code for calculating order transaction amount
val txFile: String = _
val orderFile: String = _

val transactions: DataFrame = spark.read.parquet(txFile)
val orders: DataFrame = spark.read.parquet(orderFile)

transactions.createOrReplaceTempView("transactions")
orders.createOrReplaceTempView("orders")

val query: String = """
select sum(tx.price * tx.quantity) as revenue, o.orderId
from transactions as tx inner join orders as o
on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date between '2020-01-01' and '2020-03-31'
group by o.orderId
"""

val outFile: String = _
spark.sql(query).save.parquet(outFile)

It can be seen that in the join operation between the two tables, “transactions” plays the role of the outer table, and naturally, “orders” is the inner table. It should be noted that even though there are many filtering conditions on the inner table, such as the order status being “COMPLETE” and the transaction date falling within a certain range, the filtered inner table still remains in the range of hundreds of GB, making it difficult to fit into a broadcast variable. Therefore, the join operation between these two large tables naturally degrades into Shuffle Joins.

So, how should the code be modified if we use the “divide and conquer” strategy for optimization? “Divide and conquer” has two key factors: splitting the inner table and duplicating the scan of the outer table. Let’s adjust the original code starting from these two factors.

First, the rationality of splitting the inner table depends entirely on the selection of the splitting column, which should simultaneously satisfy multiple criteria, such as moderate cardinality, uniform distribution of the sub-tables, and the sub-table size being smaller than the broadcast threshold. Considering all the key fields in the “orders” table, only the “date” field satisfies these conditions. Therefore, we can split the “orders” table by the “date” field on a daily basis, and then the query statement in the original code needs the following adjustment:

// Splitting the inner table by the "date" field
val query: String = """
select sum(tx.price * tx.quantity) as revenue, o.orderId
from transactions as tx inner join orders as o
on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date = '2020-01-01'
group by o.orderId
"""

You might say, “Isn’t this wrong? The business requirement is to calculate the transaction amount for a quarter, but this modified query only calculates the amount for a single day, right?” Don’t worry, there’s one more step left to adjust the code: duplicating the scan of the outer table. After splitting the inner table, the outer table naturally needs to be joined with all the sub-tables one by one, and the results of all the sub-joins need to be merged together to complete the implementation of “divide and conquer.”

// Looping through dates and performing the calculation with "divide and conquer"
val dates: Seq[String] = Seq("2020-01-01", "2020-01-02", ..., "2020-03-31")

for (date <- dates) {

val query: String = s"""
select sum(tx.price * tx.quantity) as revenue, o.orderId
from transactions as tx inner join orders as o
on tx.orderId = o.orderId
where o.status = 'COMPLETE'
and o.date = ${date}
group by o.orderId
"""

val file: String = s"${outFile}/${date}"
spark.sql(query).save.parquet(file)
}

The adjusted code, as shown above, uses a simple for loop to iterate through dates, allowing the outer table to be joined with each sub-table one by one. The calculation results of each sub-join are directly written to subdirectories under the “outFile” root directory. The code modification is relatively simple. However, the careful readers among you may notice, “Doesn’t this code violate the distributed mindset that we have been trying to avoid?” Yes, purely in terms of the form or appearance, this code indeed has a strong flavor of “single-machine mindset.”

However, in understanding the “single-machine mindset” mode, we cannot simply stay at the level of form or surface. The so-called “single-machine mindset” refers to developers programming without thinking, directly adopting a procedural programming approach, and ignoring or disregarding the distributed data entities’ programming model. But during the process of organizing and optimizing the train of thought just now, we have always kept in mind the duplicated scan of the outer table and thought about using the DPP mechanism to avoid it. Therefore, although we used a for loop, it does not introduce duplicated scanning of distributed datasets at runtime.

In summary, in this case, using the “divide and conquer” optimization method, we can transform the “large table joins large table” join query into “large table joins small table.” The original Shuffle Joins are transformed into multiple Broadcast Joins, and Broadcast Joins can effectively deal with data skew in the join operation. It can be said to kill two birds with one stone.

Summary #

The first tuning idea for “Joining big tables with big tables” is to “divide and conquer”. We need to focus on understanding the tuning idea and optimizing the two key steps.

The core idea of “divide and conquer” is to evenly split the inner table, transforming a complex and large Shuffle Join into multiple Broadcast Joins. Its purpose is to eliminate the massive data distribution introduced by the original Shuffle Join, greatly reducing disk and network overhead, and improving the overall performance of job execution.

In the tuning process of “divide and conquer”, the split of the inner table is crucial because it plays an important role in whether the Shuffle Join can be successfully transformed into Broadcast Joins. The key to the split lies in the selection of the split column. To balance performance and development efficiency, the cardinality of the split column should be large enough to make the sub-table small enough to fit into broadcast variables. However, the cardinality of the split column should not be too large, otherwise the development cost of implementing “divide and conquer” will increase significantly. Generally, date columns are often a good choice.

To avoid introducing additional computational overhead during tuning, we need to pay special attention to the issue of duplicate scans of the outer table. There are at least two approaches to deal with duplicate scans of the outer table. The first approach is to fully cache the outer table in memory, but this method requires high memory space and is not universally applicable. The second approach is to utilize the DPP feature introduced in Spark 3.0, which partitions the outer table storage based on the Join Key during data warehouse design.

Once we have split the inner table and avoided duplicate scans of the outer table, we can transform the original Shuffle Join into multiple Broadcast Joins, eliminating the distribution of massive data across the network and avoiding additional performance overhead. Without a doubt, after tuning with “divide and conquer”, the end-to-end execution performance of the query will be significantly improved.

Daily Practice #

In the case of uniformly distributed data in a large table, if we use the “divide and conquer” tuning technique to avoid redundant scans, in addition to using caching or DPP mechanisms, what other methods can we use?

I look forward to seeing your thoughts and answers in the comments. See you in the next lesson!