17 Data Association Different Join Types and Implementation Mechanisms to Choose

17 Data Association - Different Join Types and Implementation Mechanisms to Choose #

Hello, I’m Wu Lei.

In the previous lesson, we learned about many operators supported by Spark SQL. Among them, data association (Join) is the most common and important operation in data analysis scenarios. It is safe to say that in almost all data applications, you can see the “shadow” of data association. Therefore, in today’s lesson, let’s continue to talk in detail about Spark SQL’s support for Join.

As we all know, there are many types of Joins. If we divide data association according to association forms (Join Types), it can be divided into inner join, outer join, left join, right join, and so on. For the two tables participating in the association calculation, the association form determines the data source of the result set. Therefore, when developing, the choice of association form is determined by our business logic.

From the perspective of implementation mechanism, Join can be divided into NLJ (Nested Loop Join), SMJ (Sort Merge Join), and HJ (Hash Join). In other words, for the same inner join, we can choose to implement it using NLJ, SMJ, or HJ. The difference lies in the huge difference in execution efficiency of these different implementation mechanisms in different calculation scenarios. Therefore, understanding and being familiar with these mechanisms is crucial for us developers.

Today, we will talk separately about the ins and outs of data association in Spark SQL from these two perspectives.

Data Preparation #

In order to help you better understand the new knowledge, I will explain the specific usage of data association in Spark SQL through examples one by one. Before introducing data association, let’s prepare the data that will be used in the examples.

import spark.implicits._
import org.apache.spark.sql.DataFrame

// Create employee information table
val seq = Seq((1, "Mike", 28, "Male"), (2, "Lily", 30, "Female"), (3, "Raymond", 26, "Male"), (5, "Dave", 36, "Male"))
val employees: DataFrame = seq.toDF("id", "name", "age", "gender")

// Create salary table
val seq2 = Seq((1, 26000), (2, 30000), (4, 25000), (3, 20000))
val salaries:DataFrame = seq2.toDF("id", "salary")

As shown in the table above, we created two DataFrames: one for storing employee basic information, called the employee table; and another for storing employee salaries, called the salary table.

After preparing the data, it is necessary for us to first understand some basic concepts of data association. Data association refers to a calculation process: the process of combining two data tables together in different association forms given the association conditions (Join Conditions). The association conditions include two layers of meanings: one is the selection of the association fields (Join Key) in the two tables respectively, and the other is the logical relationship between the association fields.

In the previous lesson, we mentioned that Spark SQL supports both DataFrame operators and SQL queries. Therefore, let us combine the data just prepared and use these two methods respectively to explain the basic concepts of data association.

Image

First of all, conventionally, we call the data table that actively participates in the join, such as the salaries table in the above picture, the “left table”; and the data table that passively participates in the association, such as the employees table, the “right table”.

Next, we focus on the blue part in the figure. We can see that both tables select the id column as the association field, and the logical relationship between them is “equality”. Such an equation constitutes the association condition we just mentioned. Next, let’s take a look at the green part in the figure, where inner refers to the inner association form.

Association form is one of the key points we are going to learn today. Next, we will continue to bypass the SQL query development method and use the DataFrame operator development mode as an example to talk about the association forms supported by Spark SQL, and the effects of different association forms.

Join Types #

In terms of join types, Spark SQL’s support is quite comprehensive. To give you a complete understanding, I have organized the join types supported by Spark SQL into the following table. You can take a rough look at it first.

Image

Combining the prepared data, let’s talk about the usage and effects of each join type. Let’s start with the simplest, most basic, and most common inner join.

Inner Join #

If we want to get the salary information of each registered employee, we can use inner join, as shown below.

// inner join
val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "inner")

jointDF.show

/** Result
+---+------+---+-------+---+------+
| id|salary| id| name|age|gender|
+---+------+---+-------+---+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 3| 20000| 3|Raymond| 26| Male|
+---+------+---+-------+---+------+
*/

// left table
salaries.show

/** Result
+---+------+
| id|salary|
+---+------+
| 1| 26000|
| 2| 30000|
| 4| 25000|
| 3| 20000|
+---+------+
*/

// right table
employees.show

/** Result
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| Mike| 28| Male|
| 2| Lily| 30|Female|
| 3|Raymond| 26| Male|
| 5| Dave| 36| Male|
+---+-------+---+------+
*/

As you can see, for registered employees, we can use inner join to get the salary information of each person. The general usage of join operator is to specify “inner” as the third argument, which allows us to use inner join to combine the data of two tables. However, if you carefully observe the printed join result set and the original salary and employee tables, you will find that the original data in the left and right tables does not appear in the join result set.

For example, in the original salary table, there is a salary record with id 4; and in the employee table, there is a data record with id 5 and name “Dave”. These two data records do not appear in the join result set, which is the purpose of using inner join.

The effect of inner join is to keep only the data records in the left and right tables that satisfy the join condition. In the example above, the join condition is salaries(“id”) === employees(“id”), and in both the employee and salary tables, only the values 1, 2, and 3 exist in their respective id fields. Therefore, the result set only contains the three data records with ids 1, 2, and 3.

Once you understand the meaning and effect of inner join, it will be much easier to learn other join types, such as outer join.

Outer Join #

Outer join can be further divided into three types: left outer join, right outer join, and full outer join. Here, the left and right correspond to the left and right tables, respectively.

Let’s start with left outer join. To perform a left outer join between salaries and employees, we only need to replace the “inner” keyword with “left”, “leftouter”, or “left_outer”, as shown below.

val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "left")

jointDF.show

/** Result
+---+------+----+-------+----+------+
| id|salary| id| name| age|gender|
+---+------+----+-------+----+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 4| 25000|null| null|null| null|
| 3| 20000| 3|Raymond| 26| Male|
+---+------+----+-------+----+------+
*/

As you can see, the result set of a left outer join is actually the result set of an inner join, plus the remaining data in the left table salaries that does not satisfy the join condition, which is the data record with id 4. It is worth noting that since there is no record with id 4 in the right table employees, all field values in the employees section of the result set are null.

To better understand the inner join and left outer join we learned earlier, let’s take a look at the result of a right outer join. To calculate a right outer join, in the code below, we replace the “left” keyword with “right”, “rightouter”, or “right_outer”.

val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "right")

jointDF.show
/** Print the result
+----+------+---+-------+---+------+
| id|salary| id| name|age|gender|
+----+------+---+-------+---+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 3| 20000| 3|Raymond| 26| Male|
|null| null| 5| Dave| 36| Male|
+----+------+---+-------+---+------+
*/

Upon careful observation, you will notice that the result set of right outer join is actually the same as that of inner join, with the addition of the remaining data from the right table employees, such as the record with id 5 and name "Dave". Similarly, since the table salaries does not have a data record with id 5, the corresponding fields in the result set are filled with null values.

Now that we understand the meanings of inner and outer joins, the purpose of full outer join becomes evident. The result set of a full outer join consists of the result of inner join plus any leftover data from both the left and right tables that do not satisfy the join condition. The keyword for performing a full outer join in Spark SQL can be "full", "outer", "fullouter", or "full_outer", as shown in the table below.

    val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "full")
    
    jointDF.show
    
    /** Print the result
    +----+------+----+-------+----+------+
    | id|salary| id| name| age|gender|
    +----+------+----+-------+----+------+
    | 1| 26000| 1| Mike| 28| Male|
    | 3| 20000| 3|Raymond| 26| Male|
    |null| null| 5| Dave| 36| Male|
    | 4| 25000|null| null|null| null|
    | 2| 30000| 2| Lily| 30|Female|
    +----+------+----+-------+----+------+
    */
    

So far, we have covered the purposes of inner and outer joins. You may have already noticed that "inner" refers to only including the data records that satisfy the join condition in the result set, while "outer" indicates that the result set of the join calculation also includes the data records that do not satisfy the join condition. Furthermore, the terms "left", "right", and "full" in outer join indicate the origins of the data records that do not satisfy the join condition.

Understanding the meanings of "inner", "outer", "left", and "right" can help us avoid getting lost in the various types of related joins. In addition to inner and outer joins, Spark SQL also supports left semi join and left anti join. What are these two joins used for?

### Left Semi Join / Left Anti Join

Although the names may sound awkward, their meanings are actually quite simple. Let's start with left semi join, which can be specified using the keywords "leftsemi" or "left_semi". The result set of a left semi join is a subset of the result set of an inner join, containing only the data records that satisfy the join condition in the left table salaries, as shown in the table below.

    // Inner join
    val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "inner")
    
    jointDF.show
    
    /** Print the result
    +---+------+---+-------+---+------+
    | id|salary| id| name|age|gender|
    +---+------+---+-------+---+------+
    | 1| 26000| 1| Mike| 28| Male|
    | 2| 30000| 2| Lily| 30|Female|
    | 3| 20000| 3|Raymond| 26| Male|
    +---+------+---+-------+---+------+
    */
    
    // Left semi join
    val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "leftsemi")
    
    jointDF.show
    
    /** Print the result
    +---+------+
    | id|salary|
    +---+------+
    | 1| 26000|
    | 2| 30000|
    | 3| 20000|
    +---+------+
    */

For comparison, I have printed the results of both inner join and left semi join. Here, it is important to understand that a left semi join is a subset of an inner join, and it only retains the data from the left table salaries. The combination of these two characteristics effectively explains the meaning of "left semi".

With the understanding of left semi join as a foundation, left anti join becomes easier to grasp. Like left semi join, left anti join only retains the data from the left table, and it can be specified using the keywords "leftanti" or "left_anti". However, unlike left semi join, left anti join retains the data records that do not satisfy the join condition, as shown below.

    // Left anti join
    val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "leftanti")
    
    jointDF.show
    
    /** Print the result
    +---+------+
    | id|salary|
    +---+------+
    | 4| 25000|
    +---+------+
    */

By comparing it with the result set of left semi join, it is easy to see the difference in the data retained by left anti join. Clearly, the salary record with id 4 does not satisfy the join condition salaries("id") === employees("id"), and left anti join retains those "unqualified" data records.

That's it for the different join types supported by Spark SQL. With an understanding of their characteristics and purposes, combined with the business logic of specific scenarios, you will be able to make flexible choices in your daily development work.

Join Mechanisms #

However, from a functional perspective, it can be said that using different forms of association to implement business logic is an essential skill for programmers. In order to stand out among many developers, we need to be familiar with and understand different join mechanisms. Even for inner joins, different join implementation mechanisms have significant differences in execution efficiency. Therefore, mastering the principles and characteristics of different join mechanisms is beneficial for gradually cultivating performance-oriented development habits.

At the beginning of this lecture, we mentioned that there are three implementation mechanisms for joins, namely Nested Loop Join (NLJ), Sort Merge Join (SMJ), and Hash Join (HJ). Next, let’s take inner join as an example and discuss their respective implementation principles and characteristics, combined with the “salaries” and “employees” tables.

// Inner join
val jointDF: DataFrame = salaries.join(employees, salaries("id") === employees("id"), "inner")

jointDF.show

/** Printed result
+---+------+---+-------+---+------+
| id|salary| id| name|age|gender|
+---+------+---+-------+---+------+
| 1| 26000| 1| Mike| 28| Male|
| 2| 30000| 2| Lily| 30|Female|
| 3| 20000| 3|Raymond| 26| Male|
+---+------+---+-------+---+------+
*/

Nested Loop Join #

For the two tables involved in the join, such as “salaries” and “employees”, we conventionally refer to “salaries” as the “left table” and “employees” as the “right table” based on their appearance order in the code. When discussing join mechanisms, we often refer to the left table as the “driving table” and the right table as the “base table”.

Generally speaking, the driving table is often larger in size, and in the process of implementing the join, it actively scans the data. Relative to that, the base table is smaller in size and passively participates in the data scan.

In the implementation mechanism of NLJ, the algorithm uses two nested for loops, outer and inner, to sequentially scan the data records of the driving table and the base table. During the scan, it also checks whether the join condition is met, such as salaries(“id”) === employees(“id”) in the case of an inner join example. If the join condition is met, the records of the two tables are concatenated together and then output externally.

Image

In the implementation process, the outer for loop is responsible for iterating through each data record of the driving table, as shown in step 1 in the figure. For each data record in the driving table, the inner for loop scans all the records of the base table one by one and checks whether the id field value of the record satisfies the join condition, as shown in step 2. It is not difficult to find that assuming the driving table has M rows of data and the base table has N rows of data, the calculation complexity of the NLJ algorithm is O(M * N). Although the NLJ algorithm is simple, intuitive, and easy to understand, its execution efficiency is obviously poor.

SMJ: Sort Merge Join #

In view of the inefficient computational efficiency of NLJ, SMJ emerges. As the name suggests, the implementation idea of SMJ is to sort first and then merge. Given two tables participating in the join, SMJ first sorts them separately and then uses independent cursors to perform merge join on the sorted tables.

图片

The specific calculation process is as follows: initially, the cursors of the driving table and the base table are both anchored to the first record of each table, and then the next step is determined by comparing the id field values of the records where the cursors are located. The comparison result and subsequent operations can be divided into three cases:

  • If the join condition is satisfied and the id values on both sides are equal, at this time the data records on both sides are concatenated and output, and then the cursor of the driving table slides to the next record.
  • If the join condition is not satisfied and the id value of the driving table is smaller than the id value of the base table, at this time the cursor of the driving table slides to the next record.
  • If the join condition is not satisfied and the id value of the driving table is greater than the id value of the base table, at this time the cursor of the base table slides to the next record.

Based on these three cases, SMJ keeps sliding the cursors downward until one of the tables’ cursor reaches the end, which indicates the end of the join. For each record of the driving table, since the base table is sorted according to the id field and the scanning starts from the position where the cursor is located, the computational complexity of the SMJ algorithm is O(M + N).

However, the reduction in computational complexity actually relies on the fact that the two tables have been pre-sorted. But as we know, sorting itself is a time-consuming operation, not to mention that both tables participating in the join need to be sorted in order to accomplish the merge join.

Therefore, we can describe the calculation process of SMJ as “bitter first, sweet later”. Bitter refers to the time spent on sorting the two tables first, while sweet refers to the linear computational complexity enjoyed by the merge join of sorted tables.

HJ: Hash Join #

Considering the demanding requirements of sorting in SMJ, the HJ algorithm was later introduced. The original intention of HJ is to trade space for time and reduce the computational complexity of scanning the base table to O(1).

图片

Specifically, the calculation of HJ is divided into two stages, namely the Build stage and the Probe stage. In the Build stage, the algorithm uses a predetermined hash function to build a hash table on top of the base table, as shown in step 1 of the above figure. The key in the hash table is the hash value obtained by applying the hash function to the id field, while the value of the hash table contains both the original Join Key (id field) and Payload.

In the Probe stage, the algorithm sequentially traverses each data record of the driving table. First, the algorithm calculates the hash value of the Join Key using the same hash function in a dynamic way. Then, the algorithm uses the hash value to query the hash table created in the Build stage. If the query fails, it means that the record is not associated with any data in the base table; on the contrary, if the query is successful, the join keys on both sides are compared. If the join keys are the same, the records on both sides are concatenated and output to complete the data join.

Well, at this point, we have briefly covered the three implementation mechanisms of join. I believe you have a good grasp of their respective principles. As for which calculation scenarios these three mechanisms are suitable for, and how Spark SQL utilizes these mechanisms to perform data join in a distributed environment, we will discuss them in the next lesson.

Key Points Review #

In today’s lecture, we focused on the join types and join mechanisms in data association. By understanding different join types, we can effectively meet the ever-changing business requirements. Familiarity with and understanding of different join mechanisms’ working principles will help cultivate our performance-oriented development habits.

Spark SQL supports various join types. To facilitate your reference, I have compiled their meanings and effects into the following table. In future development work, when you need to differentiate and confirm different join types, you can quickly come to a conclusion by reviewing this table.

Image

After that, we introduced three join mechanisms, namely Nested Loop Join, Sort Merge Join, and Hash Join. I have also organized the working principles of these three join mechanisms into a table for your reference at any time.

Image

Practice for each lesson #

For the three implementations mechanisms of Join, namely Nested Loop Join, Sort Merge Join, and Hash Join, and combined with their implementation principles, can you guess what possible scenarios these mechanisms may be suitable for? In other words, in what scenarios is it more appropriate to use a particular implementation mechanism for data association?

Feel free to leave a message in the comments section to interact with me, and I also recommend you to share this lesson with your colleagues and friends.