22 Catalyst Physical Planning How Your SQL Statements Are Optimized Part 2

22 Catalyst Physical Planning - How Your SQL Statements are Optimized Part 2 #

Hello, I’m Wu Lei.

In the previous lesson, we talked about the logical optimization process of the Catalyst optimizer, which consists of two stages: logical plan parsing and logical plan optimization. The ultimate goal of logical optimization is to transform the Unresolved Logical Plan from a suboptimal Analyzed Logical Plan into an efficiently executable Optimized Logical Plan.

However, each step of logical optimization only indicates what Spark SQL needs to “do” from a logical standpoint, without specifying how it should be done at the execution level. Therefore, in order to deliver the logical plan for execution, Catalyst also needs to convert the Optimized Logical Plan into a physical plan. The physical plan is more specific than the logical plan, as it explicitly describes how Spark SQL should execute each step.

In today’s lesson, we will continue to follow in the footsteps of Xiao Q and see what changes occur after going through the physical optimization stage of Catalyst.

Optimizing Spark Plan #

The optimization of the physical stage starts with the Optimized Logical Plan, which is the output of the logical optimization stage. Therefore, let’s first review small Q’s original query and Optimized Logical Plan.

val userFile: String = _
val usersDf = spark.read.parquet(userFile)
usersDf.printSchema
/**
root
|-- userId: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- gender: string (nullable = true)
|-- email: string (nullable = true)
*/
val users = usersDf
.select("name", "age", "userId")
.filter($"age" < 30)
.filter($"gender".isin("M"))

val txFile: String = _
val txDf = spark.read.parquet(txFile)
txDf.printSchema
/**
root
|-- txId: integer (nullable = true)
|-- userId: integer (nullable = true)
|-- price: float (nullable = true)
|-- volume: integer (nullable = true)
*/

val result = txDF.select("price", "volume", "userId")
.join(users, Seq("userId"), "inner")
.groupBy(col("name"), col("age")).agg(sum(col("price") * col("volume")).alias("revenue"))

result.write.parquet("_")

After transforming the query statement for joining two tables, we obtain the Optimized Logical Plan shown in the following diagram. Note that at the root node of the logical plan, there is the “Join Inner” keyword, indicating that Catalyst optimizer specifies that this step needs to perform an inner join. However, Catalyst does not specify how to perform the inner join or which join strategy to use. Therefore, the logical plan itself is not actionable.

In order to make the query plan actionable and executable, Catalyst’s physical optimization stage (Physical Planning) can be divided into two phases: Optimizing Spark Plan and Generating Physical Plan.

  • In the process of optimizing the Spark Plan, Catalyst maps each relational operator in the logical plan to a physical operator based on established optimization strategies, generating the Spark Plan.
  • In the process of generating the Physical Plan, Catalyst further improves the Spark Plan and generates an executable Physical Plan based on predefined Preparation Rules.

Now the question arises: what are the established optimization strategies that Catalyst uses in the process of optimizing the Spark Plan? In terms of quantity, Catalyst has 14 classes of optimization strategies, among which 6 classes are related to stream processing, and the remaining 8 classes apply to all computing scenarios, including batch processing, data analysis, machine learning, and graph computing, as well as stream processing. Therefore, we only need to understand these 8 classes of optimization strategies.

All optimization strategies are similar in terms of transformation methods, using pattern matching-based partial functions to map operators in logical plans to physical operators in Spark plans. For example, the BasicOperators strategy directly maps logical operators such as Project, Filter, and Sort to physical operators. Other strategies follow a similar optimization process, so in the optimization phase of Spark Plan, as long as we grasp a “typical” strategy and understand its transformation process, we can proceed.

So who should we focus on as the “typical” strategy? I think that this “typical” strategy should meet at least two criteria: first, it should be very common in our application scenarios; and second, its selection has the most critical impact on execution performance. Using these two criteria to select from the 8 strategies mentioned above, we can quickly identify JoinSelection. Next, we will take JoinSelection as an example to explain the optimization process in detail.

If we summarize the optimization process of JoinSelection in one sentence, it is to combine information from various aspects to determine which Join strategy to use in the physical optimization stage. So, what Join strategies does Catalyst have?

What Join strategies does Catalyst have? #

Combining the implementation mechanism of Joins and the data distribution methods, Catalyst supports a total of 5 Join strategies at runtime, namely Broadcast Hash Join (BHJ), Shuffle Sort Merge Join (SMJ), Shuffle Hash Join (SHJ), Broadcast Nested Loop Join (BNLJ), and Shuffle Cartesian Product Join (CPJ).

From the meaning of these 5 Join strategies in the above table, we know that they are combinations of 2 data distribution methods (broadcast and shuffle) and 3 Join implementation mechanisms (Hash Joins, Sort Merge Joins, and Nested Loop Joins). So, what logic does Catalyst use to prioritize and select which Join strategy in the JoinSelection optimization process?

How does JoinSelection decide which Join strategy to choose? #

The logic is actually very simple: Catalyst always tries to prioritize the strategy with the highest execution efficiency. Specifically, when selecting the join strategy, JoinSelection first checks if the current query meets the prerequisites of BHJ: if it meets the prerequisites, BHJ is immediately selected; if not, it continues to check if the current query meets the prerequisites of SMJ. This process continues until there are no more eligible strategies, and CPJ is chosen as a fallback.

So, what prerequisites do these 5 Join strategies have to meet? In other words, what information does JoinSelection rely on when making decisions?

In general, this information can be divided into two categories: the first category is “condition-based” information, used to determine the prerequisites of the 5 Join strategies. The second category is “directive-based” information, which refers to the Join Hints provided by developers.

Let’s first talk about “condition-based” information, which includes two types. The first type is Join type, such as equality, join form, etc. This information comes from the query statement itself. The second type is the size of the inner table, which can come from various sources, such as the ANALYZE TABLE statement on Hive tables, Spark’s estimation of the sizes of Parquet, ORC, CSV, and other source files, or even dynamic statistics from AQE.

I have summarized the requirements of these 5 Join strategies for this information in the table below, you can take a look.

Directive-based information refers to Join Hints, which come in various forms and allow us to override Spark SQL behavior. For example, if we make the following adjustment to the query statement of small Q, JoinSelection will prioritize our intention and skip SMJ to choose the lower-ranked SHJ. The specific code example is as follows:

val result = txDF.select("price", "volume", "userId")
.join(users.hint("shuffle_hash"), Seq("userId"), "inner")
.groupBy(col("name"), col("age")).agg(sum(col("price") * 
col("volume")).alias("revenue"))

Now that we are familiar with the logic of JoinSelection in selecting Join strategies, let’s take a look at how small Q makes its selection. Small Q is a typical star schema query, which involves data association between fact tables and dimension tables, with filter conditions on the dimension table. When deciding which Join strategy to use, JoinSelection first checks whether small Q meets the prerequisites of BHJ.

Obviously, small Q is an equi-join Inner Join, so it satisfies the first two conditions in the table for BHJ. However, the size of the inner table “users” is large, exceeding the default broadcast threshold of 10MB, and it does not satisfy the third condition of BHJ. Therefore, JoinSelection has to reluctantly give up the BHJ strategy and continue to evaluate if small Q meets the prerequisites of SMJ.

The prerequisites of SMJ are lenient, as long as the query statement is an equi-join. Small Q naturally satisfies this condition, so JoinSelection ultimately selects SMJ as the Join strategy for small Q. The Spark Plan after optimization for small Q is shown in the following diagram, where we can see that the root node of the query plan is SMJ.

Now we know what Join strategies Catalyst has, how JoinSelection selects different Join strategies, and how small Q chooses. Small Q has transformed from an Optimized Logical Plan to a Spark Plan, and it is clear that SMJ is used for join computation at runtime. However, even though the Spark Plan for small Q explicitly specifies “what to do” at each step, why can’t Spark transform such a query plan into executable distributed tasks?

Generate Physical Plan #

Previously, the calculation of Shuffle Sort Merge Join requires two prerequisites: Shuffle and sorting. However, the Spark Plan does not explicitly specify which field to use as the basis for Shuffle, and which field to sort by.

Therefore, Catalyst needs to further transform the Spark Plan to generate an operational and executable Physical Plan. So how is this done? Let’s explain it in detail with the help of the process flowchart of the Catalyst physical optimization stage.

From the above figure, we can see that the conversion from Spark Plan to Physical Plan requires several sets of rules called Preparation Rules. These rules play the last role and are responsible for generating the Physical Plan. So, what are these rules and what do they do? Let’s take a look together.

Preparation Rules consist of 6 groups of rules, which, when applied on the Spark Plan, result in a Physical Plan. The Physical Plan is ultimately transformed by Tungsten into distributed tasks for calculating the RDD.

Q’s query statement is typical and simple, it does not involve subqueries, nor does it have Python UDFs. Therefore, in Q’s example, we won’t use rules like subqueries, data reuse, or Python UDFs. Only the EnsureRequirements and CollapseCodegenStages groups of rules will be used in Q’s Physical Plan transformation.

In fact, these two groups of rules are the most common and frequently used in structured queries. Today, let’s focus on the meaning and purpose of the EnsureRequirements rule. As for the CollapseCodegenStages rule, it is basically Tungsten’s WSCG feature, which we will explain in detail in the next lesson.

EnsureRequirements Rule #

The translation of EnsureRequirements literally means “ensuring that prerequisites are met”. What does this mean? For each operator node in the execution plan, there are 4 attributes to describe the distribution state of the input and output data.

The EnsureRequirements rule requires that the output data of the child nodes satisfy the input requirements of the parent node. How can this be understood?

Take Q’s Spark Plan tree structure as an example. In the figure, the left and right branches represent the scan and processing of the users and transactions tables, respectively. At the top of the tree, the root node SortMergeJoin has two Project child nodes, which are used to represent the projected data on the users and transactions tables. The outputPartitioning and outputOrdering attributes of these two Project nodes are Unknown and None, respectively. Therefore, the output data from these two Project nodes is not shuffled or sorted based on any column.

However, SortMergeJoin has clear requirements for the input data: it should be partitioned into 200 partitions by userId and sorted. The output of the two Project nodes clearly does not meet the requirements of the parent node SortMergeJoin. At this time, the EnsureRequirements rule comes into play. It adds necessary operators, such as Shuffle and sorting, to ensure that the SortMergeJoin node’s requirements for input data are met. The diagram below illustrates this.

After the two Project nodes, the EnsureRequirements rule adds Exchange and Sort nodes respectively. The Exchange node represents the Shuffle operation, which is used to meet the data distribution requirement of SortMergeJoin. The Sort node represents sorting, which is used to satisfy the SortMergeJoin’s requirement for ordered data.

After adding the required nodes, Q’s Physical Plan becomes quite specific. At this point, Spark can call the doExecute method of the Physical Plan to convert the computed result of the structured query into RDD[InternalRow], where InternalRow is the custom binary data structure designed by Tungsten, which we have discussed in detail in the perspective of memory (Part I), you can refer back to that section. By invoking Action operators on RDD[InternalRow], Spark can trigger the execution of the Physical Plan from start to finish.

Finally, let’s see what changes have occurred in Q’s plan.

First, we can see that the EnsureRequirements rule adds Exchange and Sort operations at the top of the two branches respectively, to meet the calculation requirements of the root node SortMergeJoin. Secondly, if you observe carefully, you will find that many asterisks “” are added in the Physical Plan, followed by parentheses and numbers, such as “ (3)”, “* (1)”. These asterisks “*” are tags for WSCG, and the numbers inside the parentheses represent the stage number. Therefore, operations with the same number in the parentheses will eventually be consolidated into a piece of “handwritten code”, which is what we will discuss in detail in the next lesson, Tungsten’s WSCG.

With this transformation, Q has evolved from a “rebellious teenager” who doesn’t consider execution efficiency to an efficient “professional”, and Catalyst, the life mentor, has played an indispensable role in this journey.

Summary #

To convert the logical plan into a physical plan that can be executed, the Spark SQL physical optimization stage consists of two steps: optimizing the Spark Plan and generating the Physical Plan.

In the optimization of the Spark Plan, Catalyst maps the logical plan to the Spark Plan based on established strategies. There are many strategies, but we focus on the JoinSelection strategy, which is used to select the best Join strategy at runtime. JoinSelection evaluates whether the query satisfies the prerequisites of each Join strategy in the order of BHJ > SMJ > SHJ > BNLJ > CPJ, and selects the optimal one.

If developers are not satisfied with the default selection order of JoinSelection, which is BHJ > SMJ > SHJ > BNLJ > CPJ, they can also specify the Join strategy explicitly by introducing Join hints in SQL or DSL statements, overriding the Catalyst’s selection. However, it should be noted that for the specified Join strategy to take effect at runtime, the query must also meet its prerequisites.

In the generation of the Physical Plan, Catalyst transforms the optimized Spark Plan into a physical plan that can be executed, called the Physical Plan, based on established sets of Preparation Rules. Among these predefined Preparation Rules, you need to focus on the EnsureRequirements rule.

The EnsureRequirements rule ensures that the input requirements of each operator are satisfied. When necessary, it forcibly inserts the necessary operators into the Physical Plan. For example, for Shuffle Sort Merge Join, this operator has explicit requirements for the data distribution and order of the child nodes. Therefore, in addition to the child nodes, EnsureRequirements introduces new operators such as Exchange and Sort.

Daily Practice #

There are 3 types of JOIN implementation methods and 2 types of network distribution modes. There should be 6 JOIN strategies in total, so why doesn’t Catalyst support the Broadcast Sort Merge JOIN strategy?

Looking forward to seeing your thoughts and answer in the comment section! See you in the next lecture!