18 Data Association Optimization What Join Strategies Are There and How Should Developers Choose

18 Data Association Optimization - What Join Strategies are There and How Should Developers Choose #

Hello, I’m Wu Lei.

In the previous lecture, we explained and introduced data analysis from the perspective of association forms and implementation mechanisms. You should already have a good understanding of the usage of different association forms and the principles of implementation mechanisms. However, in the application scenarios of big data, data processing is often done in a distributed environment, which introduces the consideration of network distribution in data association calculations.

In a distributed environment, we know that Spark supports two types of data distribution modes. One is the Shuffle mode that we learned in [Lecture 7]. Shuffle completes the data exchange between the Map phase and the Reduce phase through intermediate files, which introduces a large amount of disk and network overhead. The other type is the broadcast variable introduced in [Lecture 10], where the broadcast variable is created on the Driver side and distributed to each Executor.

Therefore, from the perspective of data distribution modes, data association can be further divided into two major categories: Shuffle Join and Broadcast Join. Combining these two distribution modes with the three implementation mechanisms of Join itself, we can derive six Join strategies in distributed environments.

So, how does Spark SQL support these six Join strategies? What are their advantages, disadvantages, and applicable scenarios? Can developers make targeted choices among these strategies? In today’s lecture, let’s discuss these topics.

Comparison of Join Implementation Mechanisms #

First of all, let’s talk about the characteristics and applicable scenarios of different Join implementation mechanisms, in order to lay the foundation for subsequent discussions. It should be noted that when we talk about Join implementation mechanisms, we are referring to the working principles at the algorithm level. Different algorithms have different applicable scenarios and complexities, and we need to have sufficient understanding of them and make distinctions.

We know that Join supports three implementation mechanisms, namely Hash Join, Sort Merge Join, and Nested Loop Join. Among the three, Hash Join has the highest execution efficiency, mainly due to the O(1) search efficiency of hash tables. However, before enjoying the “performance dividend” of hash tables in the Probe phase, we need to build a hash table in memory in the Build phase. Therefore, this algorithm has higher memory requirements and is suitable for scenarios where memory can accommodate the calculation of base table data.

In comparison, Sort Merge Join does not have memory limitations. Whether it is sorting or merging, SMJ can use disk to complete the calculation. Therefore, SMJ has an advantage in terms of stability.

Moreover, compared with Hash Join, SMJ does not have much difference in execution efficiency. The former is O(M), and the latter is O(M + N), which can be considered comparable. Of course, the O(M + N) complexity is due to the sorting phase of SMJ. Therefore, if the two tables to be joined are sorted tables, then using SMJ algorithm to implement the association at this time is simply perfect.

Compared with the previous two, Nested Loop Join seems somewhat redundant, as the nested double-layer for loop brings the highest computational complexity: O(M * N). However, each has its own strengths and weaknesses. Efficiently executing HJ and SMJ can only be used for equijoin, which means the association condition must be an equality. For association conditions like salaries(“id”) < employees(“id”), HJ and SMJ are helpless. On the other hand, NLJ can handle both equijoin and non-equijoin, making it the last line of defense for data association in terms of implementation mechanisms.

Shuffle Join and Broadcast Join #

After analyzing the pros and cons of different join mechanisms, let’s talk about join strategies in a distributed environment. Unlike in a single-machine environment, the data of two tables in a distributed environment is scattered across different computing nodes and executor processes. Therefore, in order to perform data correlation, Spark SQL needs to distribute the data with the same join keys to the same executor.

Let’s use the example of an employee information table and a salary table discussed in the previous lecture. If we want to join the salaries and employees tables based on the id column, we must ensure that the data with the same id values are located in the same executor. Spark SQL can then utilize the Hash Join (HJ), Sort-Merge Join (SMJ), or Nested Loop Join (NLJ) algorithms to perform data correlation in parallel on the executors.

In other words, the data distribution of the two tables must be consistent based on the join keys for Spark SQL to execute distributed data correlation. There are only two ways to achieve this: shuffle and broadcast. I would like to remind you that we have discussed shuffle and broadcast variables in detail in previous lectures. If you don’t remember them clearly, you can go back and review.

As mentioned earlier, data correlation can be divided into two categories in terms of distribution patterns: Shuffle Join and Broadcast Join. Generally speaking, compared to Shuffle Join, Broadcast Join often performs better in terms of execution performance. Why is this the case?

Next, let’s analyze the execution process of these two types of joins in a distributed environment, and then answer this question. Once you understand the execution process, you will naturally be able to answer this question.

Shuffle Join #

By default, Spark SQL uses Shuffle Join to perform data correlation in a distributed environment without developer intervention. For the two participating tables, Spark SQL first determines which executor different data records should be distributed to based on the following rules:

  • Calculate the hash value based on the join keys.
  • Modulo the hash value by the parallelism.

Since the left table and the right table have the same parallelism (number of partitions), distributing the data based on the same rules ensures that the salary data and employee data with the same id values are located in the same executor.

Image

As shown in the above figure, rectangles with the same color represent data records with the same join keys. It can be seen that the data is scattered across different executors in the Map stage. After shuffling, the records with the same join keys are distributed to the same executors. Next, in the Reduce stage, the Reduce tasks can use the HJ, SMJ, or NLJ algorithms to perform data correlation calculations within the executors.

The reason why Spark SQL always uses Shuffle Join by default is due to the versatility of Shuffle Join. In other words, in any case, regardless of the size of the data or whether there is enough memory, Shuffle Join can successfully complete the data correlation calculation. However, with great power comes great responsibility, and the versatility in terms of functionality often comes with a performance cost.

After learning the principles of Shuffle in Lesson 6, there is no need for me to say more. You are probably very familiar with the disadvantages of Shuffle. We know that Shuffle’s calculation consumes almost all types of hardware resources, from CPU to memory, from disk to network. In particular, disk and network overhead are often performance bottlenecks during application execution.

So the question is, besides Shuffle Join, which is the versatile join strategy, are there any other more efficient options for developers? The answer is yes, Broadcast Join is a “trump card” to counter Shuffle.

Broadcast Join #

In the lecture on broadcast variables (Lesson 10), we discussed the process of encapsulating user data structures into broadcast variables. In fact, Spark can create broadcast variables not only on ordinary variables but also on distributed datasets (such as RDDs and DataFrames). In this way, for the two tables participating in the join, we can encapsulate the smaller one into a broadcast variable and then perform the join.

You may not have a deep understanding of the idea just by words, so let’s understand it with an example. Taking the salary table and the employee table as an example, with just a small modification to the code, we can fully utilize the advantages of broadcast variables.

The modified code is as follows.

import org.apache.spark.sql.functions.broadcast

// Create a broadcast variable for the employee table
val bcEmployees = broadcast(employees)

// Inner join, PS: Replace the original employees with bcEmployees
val jointDF: DataFrame = salaries.join(bcEmployees, salaries("id") === employees("id"), "inner")

In the execution process of Broadcast Join, Spark SQL first collects all data shards of the employees table from each executor, and then constructs the broadcast variable bcEmployees on the driver, as shown by the solid line in the following figure.

Image

The colorful rectangles scattered in different executors represent the data shards of the employees table. These data shards are gathered together to form the broadcast variable. Next, as shown by the dashed line in the figure, the broadcast variable bcEmployees, which carries the complete data of the employees table, is distributed to all executors across the network.

In this case, as long as the larger salary table data “stay in place and remain unchanged”, it can be easily joined with the consistent employee table data. By doing so, Spark SQL successfully avoids the “arduous and laborious” process of data distribution in Shuffle Join and instead uses the distribution of the broadcast variable.

Although creating and distributing broadcast variables also consumes network bandwidth, compared to the full network distribution of both tables in Shuffle Join, Spark SQL’s execution performance is significantly more efficient by using the distribution of the smaller table to complete the data correlation. This low investment, high output approach allows Spark SQL to achieve high performance benefits at minimal cost, which can be described as “achieving much with little effort”!

Join Strategies supported by Spark SQL #

Whether it is Shuffle Join or Broadcast Join, once data distribution is completed, theoretically any of the three implementation mechanisms, HJ, SMJ, and NLJ, can be used to complete the data association within the Executors. Therefore, with these two distribution modes and three implementation mechanisms combined, there are a total of six distributed Join strategies, as shown in the figure below.

Image

Although there are various combinations to choose from, you don’t have to memorize them all. The key is to understand the patterns inside. Let’s analyze them together.

Among these six Join strategies, Spark SQL supports five of them to handle different association scenarios, namely the five blue rectangles in the figure. For Equi Join, Spark SQL prefers to use the Broadcast HJ strategy, followed by Shuffle SMJ, and the least preferred is Shuffle HJ. For Non Equi Join, Spark SQL prefers to use the Broadcast NLJ strategy, followed by Shuffle NLJ.

Image

It is not difficult to see that whether it is Equi Join or Non Equi Join, as long as the preconditions for Broadcast Join are met, Spark SQL will always prioritize strategies related to Broadcast Join. Then the question arises, what are the preconditions for Broadcast Join?

Looking back at the working principle of Broadcast Join, we can see that the basis for implementing Broadcast Join is that the full data of the broadcast table (Table 2 in the figure) can fit into the memory of the Driver as well as the memory of each Executor as shown in the figure below.

Image

In addition, in order to avoid introducing new performance risks due to the oversized size of the broadcast table, Spark SQL requires that the memory size of the broadcast table does not exceed 8GB.

Alright, let’s summarize briefly here. As long as the broadcast table meets the above two conditions, we can use the broadcast function in SQL Functions to create broadcast variables, and then use the Broadcast Join strategy to improve execution performance.

Of course, when the preconditions for Broadcast Join are not met, Spark SQL will degrade to Shuffle Join strategy. In the case of Non Equi Join, Spark SQL only has the option of Shuffle NLJ, so we don’t need to go into details.

However, in the scenario of Equi Join, Spark SQL has two options: Shuffle SMJ and Shuffle HJ. Nevertheless, the relationship between Shuffle SMJ and Shuffle HJ is like the relationship between Guan Yu and Zhou Cang. Although Zhou Cang is also a skilled warrior, he always stands behind Guan Yu with his knife. In the imminent battle, Liu Bei naturally relies on Guan Yu, who is at the front, and rarely uses Zhou Cang at the back. The same goes for the choice between Shuffle SMJ and Shuffle HJ in Spark SQL.

After learning about Shuffle, we know that Shuffle often sorts the data during the Map phase, which is exactly what SMJ mechanism is good at. For two tables that are already sorted, the complexity of SMJ is O(M + N), which is comparable to the performance of HJ with O(M). Moreover, in terms of execution stability, SMJ is far superior to HJ. In the case of memory limitation, SMJ can make full use of the disk to smoothly complete the association calculation. Therefore, considering the many advantages of Shuffle SMJ, Shuffle HJ is like Zhou Cang behind Guan Yu. Spark SQL has always turned a blind eye to it, so you only need to know its function.

Key Review #

Alright, that concludes today’s lesson. Let’s summarize what we have learned. First, we analyzed and compared the advantages and disadvantages of different join mechanisms in a standalone environment. I have organized them in the table below for your reference.

Image

In a distributed environment, in order to utilize the above mechanisms to perform data association, Spark SQL needs to distribute the data with consistent join keys to the same Executors.

Therefore, data distribution is the foundation and premise of distributed data association. Spark SQL supports two data distribution modes, Shuffle and Broadcast, and accordingly, joins are divided into Shuffle Join and Broadcast Join, with Shuffle Join being the default association strategy. I have organized a comparison between the advantages and disadvantages of these two strategies in the table below for your reference.

Combining the three implementation mechanisms and two data distribution modes, Spark SQL supports 5 distributed join strategies. Spark SQL has its own preference for these different join strategies, which I have summarized in the table below for your reference.

Note that Broadcast Join is only effective when the base table can fit in memory and its storage size is less than 8 GB. As long as the prerequisites are met, Spark SQL will prioritize using Broadcast Join.

Image

Practice for each lesson #

Among the 6 distributed Join strategies, Spark SQL is the only one that does not support Broadcast Sort-Merge Join (SMJ). Can you think of a reason why Spark SQL did not choose to support this Join strategy? As a hint, you can analyze the execution efficiency of SMJ and Hash Join (HJ).

Feel free to leave me a message in the comments section to interact with me. I also recommend you share this lesson with more colleagues and friends.