26 Join Hints Guide How to Choose Join Strategies in Different Scenarios

26 Join Hints Guide - How to Choose Join Strategies in Different Scenarios #

Hello, I am Wu Lei.

In the field of data analysis, data correlation can be said to be the most common computational scenario. Because of its high frequency of use, Spark provides us with a rich set of correlation forms, including Inner Join, Left Join, Right Join, Anti Join, Semi Join, and so on.

Understanding the differences and functions of different correlation forms can help us quickly implement business logic. However, this is just the foundation. To improve the execution performance of Spark applications in data correlation scenarios, it is more important for us to have an in-depth understanding of the implementation principles of Joins.

So, in today’s lecture, let’s first discuss the various implementation methods of Joins in a single-machine environment, and the advantages and disadvantages of each. Once we understand these implementation methods, we can then explore together the Join strategies supported by Spark in a distributed computing environment. We will also discuss how Spark makes trade-offs for different Join strategies.

Detailed Explanation of Join Implementation #

So far, there are a total of three ways to implement data association with Join. In the order of appearance, they are Nested Loop Join (NLJ), Shuffle Sort Merge Join (SMJ), and Hash Join (HJ). Next, we will use a data association scenario to explain the working principles of these three Join implementation methods.

Suppose we have a fact table called “orders” and a dimension table called “users”. The “users” table stores user attribute information, while the “orders” table records every transaction made by the users. The schemas of the two tables are as follows:

// Key fields of the orders table
userId, Int
itemId, Int
price, Float
quantity, Int
  
// Key fields of the users table
id, Int
name, String
type, String // enumeration value, divided into head users and long-tail users

Our task is to perform an inner join on these two tables and project fields such as username, unit price, and transaction amount. The specific SQL query statement is as follows:

// SQL query statement
select orders.quantity, orders.price, orders.userId, users.id, users.name
from orders inner join users on orders.userId = users.id

So, for such an association query, how does it perform calculations under the three different Join implementation methods?

Working Principle of NLJ #

For the two tables involved in the association, we usually distinguish them based on their roles. Among them, the table with a larger volume that actively scans data is called the outer table or driving table, while the table with a smaller volume that passively participates in data scanning is called the inner table or base table. So, how does NLJ associate these two tables?

NLJ uses the “nested loop” method to perform association. That is to say, NLJ uses two nested for loops to scan the data records in the outer table and inner table one by one, check if the association condition is met (e.g., orders.userId = users.id), and if it is met, concatenate the records from both sides and output them.

In this process, the outer loop is responsible for iterating through each data record in the outer table, as shown in step 1 in the figure. For each data record in the outer table, the inner loop scans all the records in the inner table one by one, checks if the Join Key of the record satisfies the association condition, as shown in step 2. Assuming the outer table has M rows of data and the inner table has N rows of data, the computational complexity of the NLJ algorithm is O(M * N). It has to be said that although the NLJ implementation method is simple and direct, its execution efficiency is not commendable.

Working Principle of SMJ #

Due to the extremely low execution efficiency of NLJ, shortly after its release, someone used sorting and merging algorithms instead of NLJ to implement data association, and this algorithm is SMJ. The idea of SMJ is to sort first and then merge. Specifically, the two tables participating in the Join are first sorted in ascending order based on the Join Key. Then, SMJ uses two independent cursors to complete the merge association of the sorted two tables.

When SMJ starts working, the cursors of the outer and inner tables are initially anchored to the first record of the two tables, and then the comparison of the Join Key of the record where the cursor is located and subsequent operations are mainly divided into three cases:

  1. The Join Key of the outer table is equal to the Join Key of the inner table, satisfying the association condition. Concatenate the data records from both sides and output them, and then move the cursor of the outer table to the next record.
  2. The Join Key of the outer table is less than the Join Key of the inner table, not satisfying the association condition. Move the cursor of the outer table to the next record.
  3. The Join Key of the outer table is greater than the Join Key of the inner table, not satisfying the association condition. Move the cursor of the inner table to the next record.

Based on these three cases, SMJ continuously slides the cursor downward until one of the tables’ cursor reaches the end, which indicates the end of the association. For each record in the outer table of SMJ, since the inner table is sorted in ascending order based on the Join Key and the scanning starts from the cursor’s position, the computational complexity of the SMJ algorithm is O(M + N).

However, the reduction in computational complexity of SMJ relies on the fact that the two tables have been pre-sorted. Sorting itself is a very time-consuming operation, and let alone both tables participating in the Join need to be sorted in order to complete the merge association. Therefore, we can describe the calculation process of SMJ as “bitter first, sweet later”. The bitterness is spending time sorting the two tables, and the sweetness is enjoying linear computational complexity with the merge association of sorted tables.

Working Principle of HJ #

Considering the strict sorting requirements of SMJ, someone later proposed a more efficient association algorithm: HJ. The design intention of HJ is very clear: reduce the computational complexity of scanning the inner table to O(1). The access efficiency of a data set can only be improved to O(1) by using a Hash Map. It is precisely because the association process of Join introduces hash calculations that it is called HJ.

The calculation of HJ is divided into two stages, namely Build and Probe. In the Build stage, based on the inner table, the algorithm uses a predefined hash function to construct a hash table, as shown in step 1 in the above figure. The Key in the hash table is the hash value after applying the hash function to the Join Key, and the Value in the table contains both the original Join Key and the Payload.

In the Probe stage, the algorithm traverses each data record, first calculates the hash value of the Join Key on the fly using the same hash function, and then searches the hash table created in the Build stage with the calculated hash value. If the search fails, it means that the record has no association with the data in the dimension table. If the search succeeds, the Join Key on both sides is compared. If they match, concatenate the records from both sides and output them, thereby completing the data association.

Join in Distributed Environments #

Once you have mastered the working principles of these three main data correlation implementation methods, in a single-machine environment, whether you are dealing with common Inner Join, Left Join, Right Join, or less common Anti Join and Semi Join, you will be able to optimize the performance of data correlation with ease.

However, you may also ask, “After all, Spark is a distributed system, what’s the use of focusing only on single-machine implementation?”

As they say, the essence remains the same amid all changes. In fact, compared to a single-machine environment, data correlation in a distributed environment still follows these three implementation methods, NLJ, SMJ, and HJ, in the computation process, with the addition of the variable of network distribution. In the distributed computing environment of Spark, there are mainly two ways to distribute data in the network: Shuffle and broadcast. So, how do different network distribution methods affect the computation of data correlation?

If we use the Shuffle distribution method to achieve data correlation, both the outer and inner tables need to be fully distributed across the cluster based on the Join Key. Only in this way can data records with the same Join Key in both tables be assigned to the same Executor process to complete the correlation computation, as shown in the figure below.

If we use the broadcast mechanism, the situation will be quite different. In this case, Spark only needs to package the inner table (base table) into a broadcast variable and distribute it throughout the network. Since the broadcast variable contains the complete data of the inner table, the outer table, which may have a large volume, can remain in place and easily complete the correlation computation, as shown in the figure below.

It is not difficult to see that by combining these two network distribution methods, Shuffle and broadcast, with the three computation methods, NLJ, SMJ, and HJ, we can come up with six Join strategies for distributed data correlation, as shown in the figure below.

These six Join strategies, represented by the six rounded rectangles in the figure, become gradually weaker from top to bottom in terms of execution performance, as the color of the rectangles becomes lighter. Compared to the other implementation methods, CPJ has the lowest execution efficiency, with high network and computation overhead, which is reflected in the darker color in the figure. BHJ is the best mechanism for distributed data correlation, with the lowest network and computation overhead, resulting in the lightest color. In addition, you may have noticed that Broadcast Sort Merge Join is marked in gray. This is because Spark does not choose to support this combination of Broadcast and Sort Merge Join.

So the question arises, since there are six combination strategies, why does Spark not support this particular one? To answer this question, we need to compare the differences and advantages of the SMJ and HJ implementation methods.

Unlike SMJ, HJ does not require the two tables involved in the Join to be ordered, nor does it need to maintain two cursors to determine the current record positions. As long as the base table’s hash table built during the Build phase can fit into memory, the HJ algorithm can traverse the outer table during the Probe phase and correlate it with the hash table one by one.

When data can be distributed in the network in the form of broadcasting, it means that the distributed data, i.e., the base table data, is small enough to be stored in memory. At this time, compared to NLJ and SMJ, HJ has the highest execution efficiency. Therefore, when HJ can be used, Spark naturally does not need to use SMJ, which has higher upfront costs, to complete data correlation.

How does Spark choose a Join strategy? #

So, in different data association scenarios, what is the logic for Spark to choose among these 5 Join strategies, namely CPJ, BNLJ, SHJ, SMJ, and BHJ? Let’s discuss two situations, namely equi-join and non-equi-join.

For equi-join, how does Spark choose a Join strategy? #

Equi-join refers to the Join condition where the Join Key of two tables is connected by an equality condition. In daily development, this is the most common form of Join, such as “t1 inner join t2 on t1.id = t2.id”.

In equi-join, Spark will try to select Join strategies in the order of BHJ > SMJ > SHJ. Among these three strategies, BHJ has the highest execution efficiency, followed by SHJ, and then SMJ. SMJ and SHJ strategies support all types of joins, such as full join and anti-join. Although BHJ has the highest efficiency, it has two prerequisites: the join type cannot be a full join, and the base table needs to be small enough to fit into the broadcast variables.

So why does SHJ have a higher execution efficiency than SMJ, but it is not ranked higher than SMJ? This is a very good question. Let’s start with the conclusion. The reason why Spark prefers SMJ over SHJ is that the implementation of SMJ is more stable and less likely to result in out-of-memory errors.

Looking back at the implementation mechanism of HJ, during the Build phase, the algorithm creates a hash table based on the inner table. During the Probe phase, in order for the outer table to successfully “probe” each Hash Key, the entire hash table needs to be loaded into memory. To be honest, this requirement is quite demanding, and it’s enough for Spark to shy away from it. It should be noted that in different computing scenarios, the diversity of data distribution makes it difficult to guarantee that the inner table can be completely loaded into memory.

In Spark, SHJ must satisfy two prerequisites to be selected as the Join strategy. Both of these conditions are related to data size. Firstly, the size of the outer table must be at least 3 times that of the inner table. Secondly, the average size of the inner table’s data partitions must be smaller than the broadcast variable threshold. The motivation for the first condition is easy to understand, as only when the size difference between the inner and outer tables reaches a certain level, the advantages of HJ will be more significant than SMJ. The purpose of the second constraint is to ensure that each data partition of the inner table can fit entirely into memory.

Compared with SHJ, SMJ doesn’t have as many additional conditions. Both single-table sorting and merging joins can be performed with the help of disk. Data that cannot fit in memory can temporarily overflow to disk. The process of single-table sorting can be illustrated by the generation of intermediate files in the Shuffle Map phase. When performing the merge join, the algorithm can load the ordered data from disk into memory in a reasonable granularity. This granularity can be large or small, ranging from the size of a data partition to scanning one record at a time.

Considering these factors, Spark SQL gives priority to SMJ over SHJ. In fact, when the configuration property spark.sql.join.preferSortMergeJoin is set to True (which is the default value), Spark SQL uses the SMJ strategy as a fallback to ensure job execution stability, and it has no intention of trying SHJ at all. If developers want to adjust the Join strategy through configuration properties, they need to change this parameter to False, so that Spark SQL will consider trying SHJ.

For non-equi-join, how does Spark choose a Join strategy? #

Next, let’s talk about non-equi-join, which refers to the Join condition where the Join Key of two tables is connected by a non-equality condition. We have also seen non-equi-join in previous examples, such as the query “t1 inner join t2 on t1.date > t2.beginDate and t1.date <= t2.endDate”, where the association is based on inequality conditions.

Since only NLJ can be used to implement non-equi-join, the available Join strategies for Spark SQL are reduced to BNLJ and CPJ. In the same computing mode, broadcasting has less network overhead compared to shuffling. Obviously, when choosing between the two strategies, Spark SQL will try BNLJ first and then CPJ. Of course, the prerequisite for BNLJ to be effective is that the inner table is small enough to fit into the broadcast variables. If this condition is not met, Spark SQL has no choice but to use the cumbersome CPJ strategy to complete the join computation.

What can developers do? #

Lastly, let’s talk about what developers can do with regards to the five join strategies mentioned above. From the analysis above, we can see that Spark SQL’s choice of join strategy is based on certain predefined rules. However, these rules may not cover the diverse and ever-changing scenarios of computation. Therefore, when we understand the working principles of different join strategies and combine them with our deep understanding of business and data, we can confidently decide which join strategy to choose.

In the latest 3.0 release, Spark provides developers with various Join Hints, allowing you to override Spark SQL’s selection logic with expert experience. Under certain conditions such as equi-join, join type, table size, etc., Spark will prioritize the developer’s intention and select the join strategy specified by Join Hints. I have summarized the Join Hints keywords supported by Spark 3.0 and their corresponding use cases in the table above for your reference.

In simple terms, you have two ways to specify Join Hints: one is through SQL structured query language, and the other is using DataFrame’s DSL language, both of which are convenient. For a more comprehensive explanation, you can refer to Lesson 13, so I won’t go into too much detail here.

Summary #

In this lesson, we have covered the implementation principles of data correlation and the applicable scenarios for different Join strategies in Spark SQL. Mastering these key points is crucial for performance tuning in data correlation scenarios.

First, you need to understand the working principles of the three Join implementation mechanisms. To facilitate comparison, I have summarized them in the table below.

By understanding the implementation principles of the three correlation mechanisms, you can better understand the Join strategies in Spark SQL. Combining the data distribution methods (Shuffle and Broadcast), Spark SQL supports five Join strategies, ordered by execution efficiency as follows: BHJ> SHJ> SMJ> BNLJ> CPJ. Similarly, for easy comparison, you can also refer to the table below.

Finally, when you have mastered the working principles of different Join strategies and combined them with a deep understanding of your business and data, you can actually decide which Join strategy to choose on your own, without relying entirely on Spark SQL’s judgment.

Spark provides developers with various Join Hints, allowing you to override the selection logic of Spark SQL with your own expertise. For example, when you are sure that the external table is much larger than the internal table and the internal table data is evenly distributed, using SHJ can be much more efficient than the default SMJ. In such cases, you can specify Join Hints to force Spark SQL to choose the Join strategy according to your will.

Daily Exercise #

  1. If the associated scenario is a join between fact tables, do you think the implementation method of Sort Merge Join we discussed today is still applicable? If you were asked to design the algorithm implementation steps, how would you do it?

  2. Do you think it is possible to forcibly use both Sort Merge Join and Hash Join mechanisms to implement non-equal Join? Why?

Looking forward to seeing your thoughts and answers in the comments section. See you in the next lecture!