21 Catalyst Logical Planning How Your SQL Statements Are Optimized Part 1

21 Catalyst Logical Planning - How Your SQL Statements are Optimized Part 1 #

Hello, I’m Wu Lei.

In the previous lecture, we mentioned that Spark SQL has replaced Spark Core as the next generation kernel optimization engine. All of Spark’s sub-frameworks can share the performance benefits brought by Spark SQL. Therefore, Spark SQL has the largest proportion in each new version of Spark. Consequently, we must master the optimization process of Spark SQL.

The end-to-end optimization process of Spark SQL mainly consists of two stages: Catalyst optimizer and Tungsten. Among them, the Catalyst optimizer includes two stages: logical optimization and physical optimization. In order to optimize developer queries to the extreme, the entire optimization process is designed with great precision. Therefore, I will spend three lectures discussing it in detail.

The following diagram shows the complete illustration of this process. You can start by using it to develop a holistic understanding of the optimization process. Then, as I explain, gradually solidify the key links, important steps, and core knowledge points. While delving into the details of local optimization, grasp the overall optimization process, keeping in mind both the trees and the forests.

Optimization Process

In today’s lecture, let’s first talk about the working principle of the Catalyst optimizer’s logical optimization stage.

Case Study: The Transformation of Little Q #

Let’s start with an example from the e-commerce industry. The business requirement is simple: given the transaction fact table transactions and the user dimension table users, we need to calculate the total transaction amount for each user. The data is stored in a distributed file system in Parquet format. Therefore, we first need to read the source files using the Parquet API.

val userFile: String = _
val usersDf = spark.read.parquet(userFile)
usersDf.printSchema
/**
root
|-- userId: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- gender: string (nullable = true)
|-- email: string (nullable = true)
*/
val users = usersDf
.select("name", "age", "userId")
.filter($"age" < 30)
.filter($"gender".isin("M"))

val txFile: String = _
val txDf = spark.read.parquet(txFile)
txDf.printSchema
/**
root
|-- itemId: integer (nullable = true)
|-- userId: integer (nullable = true)
|-- price: float (nullable = true)
|-- quantity: integer (nullable = true)
*/

val result = txDF.select("price", "volume", "userId")
.join(users, Seq("userId"), "inner")
.groupBy(col("name"), col("age")).agg(sum(col("price") * col("volume")).alias("revenue"))

result.write.parquet("_")

The code above demonstrates the logic for calculating the total transaction amount for each user. To achieve this, we first filter the user table based on age and gender. Then we join the filtered user table with the transaction table and group the data by user to calculate the revenue. As you can see, this logic is actually a typical join query in a star schema data warehouse. For convenience, let’s name this join query “Little Q”. Little Q requires two input sources: the transaction table and the filtered user table. In today’s lesson, let’s follow Little Q and see what transformations occur in the Catalyst logical optimization phase.

Image

The Catalyst logical optimization phase consists of two steps: logical plan resolution and logical plan optimization. In the logical plan resolution step, Catalyst transforms the “Unresolved Logical Plan” into the “Analyzed Logical Plan”. In the logical plan optimization step, Catalyst applies a set of predefined heuristic rules to further optimize the “Analyzed Logical Plan” into the “Optimized Logical Plan”.

Image

Since the “Unresolved Logical Plan” is the starting point for Catalyst’s optimization, Little Q transforms from the query statement in the code to the “Unresolved Logical Plan” before entering the Catalyst optimizer.

Image

Logical Plan Analysis #

After successfully entering the Catalyst optimizer, Little Q needs to start executing logical plan analysis, which means transforming the “Unresolved Logical Plan” to the “Analyzed Logical Plan”. So, how exactly is this done?

From the diagram of “Little Q’s Journey”, we can see that the “Unresolved Logical Plan” carries limited information. It only includes the transformation of the query statement from DSL syntax to AST syntax tree. It should be noted that both logical plans and physical plans are executed from bottom to top. Therefore, the calculation order of the logical plan in the diagram goes from full table scan to filtering by gender, and the meaning of each step is what to “do”.

For example, at the lowest level of the plan, the Relation node “tells” Catalyst: “You need to scan a table with 4 fields named ABCD, in Parquet file format”. However, this information is not enough for Little Q to optimize. We also need to know what the schema of this table is. What are the types of the fields? Do the field names actually exist? Are the field names in the data table consistent with the field names in the plan?

Therefore, during the logical plan analysis phase, Catalyst needs to combine the schema information of the DataFrame to verify if the table name, field names, and field types in the plan are consistent with the actual data. After completing the verification, Catalyst will generate the “Analyzed Logical Plan”. At this point, Little Q will be transformed from the “Unresolved Logical Plan” to the “Analyzed Logical Plan”.

From the following diagram, we can see that the logical plan has completed the consistency check and can identify the field types of the two tables, for example, the type of “userId” is int, and the type of “price” field is double, and so on.

Logical Plan Analysis

Logical Plan Optimization #

For the current small Q, if we don’t do any optimization, we can directly convert it to a physical plan. However, this approach of directly converting the developer’s computational steps into a physical plan often does not result in the most efficient execution.

Why is this the case? During runtime, Spark first scans the Parquet format user table in its entirety, and then selects the userId, name, age, and gender fields, and then filters the data by age and gender.

For this execution plan, the initial full scan is obviously wasteful. There are two main reasons: on the one hand, the query actually only involves 4 fields and does not need the email column data; on the other hand, the age and gender fields have filtering conditions, so we can use these conditions to reduce the amount of data that needs to be scanned.

Therefore, for the same computation logic, there can be multiple implementation methods, and by rearranging the operators in different orders, we can evolve different implementation methods. The best approach is to follow the development principle of “saving where possible and postponing where possible” to choose the most optimal implementation among all possible implementations.

Similarly, when facing this kind of “multiple choice question”, Catalyst also has its own set of “principles” and logic. Therefore, Catalyst does not stop at generating the “Analyzed Logical Plan”, it further transforms it into an “Optimized Logical Plan” based on a set of heuristic rules.

So here’s the question: what are the established rules and logic in Catalyst? Based on these rules, how does Catalyst perform the transformation of logical plans? Don’t worry, we’ll answer them one by one. Let’s first talk about Catalyst’s optimization rules, and then explore the process of logical plan transformation.

Catalyst’s Optimization Rules #

Compared to Catalyst, the development principles we summarized are nothing in comparison. In the newly released Spark 3.0 version, Catalyst has a total of 81 optimization rules (Rules), these 81 rules are divided into 27 groups (Batches), and some of these rules are included in multiple groups. Therefore, if we don’t consider rule duplication, after counting the groups, there will be a total of 129 optimization rules.

How should we approach so many optimization rules? In fact, if we consider the optimization effect, these rules can be summarized into the following 3 categories:

  • Predicate Pushdown
  • Column Pruning
  • Constant Folding

First, let’s talk about predicate pushdown. Predicate pushdown mainly revolves around the filtering conditions in the query. Here, “predicate” refers to filtering conditions like “age < 30” on the user table, and “pushdown” refers to pushing these predicates downward along the execution plan, to the closest place to the data source, thus reducing the amount of data scanning at the source. In other words, it’s best to get these predicates as close to the data source as possible.

However, before pushdown, Catalyst first optimizes the predicates themselves. For example, the OptimizeIn rule optimizes “gender in ‘M’” to “gender = ‘M’”, that is, replacing the “in” predicate with an equality predicate. Another example is the CombineFilters rule, which combines the “age < 30” and “gender = ‘M’” predicates into one predicate: “age != null AND gender != null AND age < 30 AND gender = ‘M’”.

After optimizing the predicates themselves, Catalyst uses the PushDownPredicte optimization rule to push the predicates to the data source at the lowest level of the logical plan tree. For storage formats like Parquet and ORC, combined with statistics in the file footer, the pushed-down predicates can significantly reduce the amount of data scanned and reduce disk I/O overhead.

Now let’s talk about column pruning. Column pruning means only reading the fields that are relevant to the query when scanning the data source. Taking small Q as an example, the user table schema is (userId, name, age, gender, email), but the query does not contain any reference to the email column at all. Therefore, Catalyst uses the ColumnPruning rule to “prune” the email column. After this optimization step, Spark will skip reading the email column when reading Parquet files, thus saving I/O overhead.

It’s not hard to see that the motivation behind predicate pushdown and column pruning is the same as the “saving where possible” principle. The core idea is to reduce the amount of data that needs to be scanned and processed by any means possible, and reduce the workload of subsequent calculations.

The final type of optimization is constant folding, which is relatively straightforward. For example, if we have a filter condition of “age < 12 + 18”, Catalyst uses the ConstantFolding rule to automatically change the condition to “age < 30”. Similarly, if we have constant expressions mixed in the select statement, Catalyst will automatically replace them with the results of the expressions.

So far, we have discussed the 3 categories of Catalyst logical optimization rules from the perspective of their functionality and effects. You might say, “Is it necessary to have 81 rules just to do these 3 things?“We divided these rules into these 3 categories mainly for the convenience of explanation and understanding. In fact, it is because of the constantly expanding optimization rules in Catalyst that queries written by developers of all kinds can enjoy decent execution performance. Without the help of these optimization rules, the execution performance of small Q would be extremely poor.

Finally, the small Q that has been optimized by Catalyst is transformed from an “Analyzed Logical Plan” to an “Optimized Logical Plan”, as shown in the figure below. We can see that predicate pushdown and column pruning are both reflected in the Optimized Logical Plan.

Optimization process of Catalyst #

Next, I will continue to answer the second question raised earlier: how does Catalyst specifically transform the “Analyzed Logical Plan” into the “Optimized Logical Plan” based on so many optimization rules? In fact, both the logical plan and the physical plan inherit from QueryPlan.

QueryPlan’s parent class is TreeNode, and TreeNode is an abstraction of nodes in the syntax tree. TreeNode has a field called children, of type Seq[TreeNode], and using the TreeNode type, Catalyst can easily construct a tree structure.

In addition to the children field, TreeNode also defines many higher-order functions, among which the most notable is a method called transformDown. The formal parameter of transformDown is the various optimization rules defined by Catalyst, and the return type of the method is still TreeNode. In addition, transformDown is a recursive function, and the optimization rules in the parameter will first be applied to the current node, and then applied to the child nodes in the children one by one, until all 27 sets of rules have been tried and the tree structure no longer changes. At this point, the generated TreeNode is the Optimized Logical Plan.

To simplify the complex problem, we use Expression, which is an expression, to explain this process. Because Expression itself also inherits TreeNode, once we understand this example, we will also understand the transformation between TreeNodes.

//Transformation of Expression
import org.apache.spark.sql.catalyst.expressions._
val myExpr: Expression = Multiply(Subtract(Literal(6), Literal(4)), Subtract(Literal(1), Literal(9)))
val transformed: Expression = myExpr transformDown {
  case BinaryOperator(l, r) => Add(l, r)
  case IntegerLiteral(i) if i > 5 => Literal(1)
  case IntegerLiteral(i) if i < 5 => Literal(0)
}

First, we define an expression: ((6-4)*(1-9)), and then we call the transformDown higher-order function of this expression. In the higher-order function, we provide an anonymous function defined with the case keyword. Obviously, this is a partial function, and you can think of this anonymous function as “custom optimization rules”. In this optimization rule, we only consider 3 situations:

  • For all binary operators, we convert them to addition operations
  • For all numbers greater than 5, we change them to 1
  • For all numbers less than 5, we change them to 0

Although our optimization rules have no substantive meaning, they are just conversion rules, but this does not prevent you from understanding the transformation between TreeNodes in Catalyst. When we apply this rule to the expression ((6-4)*(1-9)), the result is another expression ((1+0)+(0+1)), and the diagram below intuitively shows this process.

Optimization process of expression

The transformation from “Analyzed Logical Plan” to “Optimized Logical Plan” is the same as the transformation process of the expression in the example. The main difference is that Catalyst’s optimization rules are much more complex and precise.

Cache Manager Optimization #

In addition to using heuristic rules, Catalyst also leverages the Cache Manager for further optimization in the transformation from “Analyzed Logical Plan” to “Optimized Logical Plan”.

Here, Cache refers to the distributed data cache. To cache data, you can call DataFrame’s .cache or .persist methods, or use the “cache table” keyword in SQL statements.

The Cache Manager is actually very simple, and its main responsibility is to maintain information related to caching. Specifically, the Cache Manager maintains a mapping dictionary, where the Key is the logical plan and the Value is the corresponding cache metadata.

When Catalyst attempts to optimize a logical plan, it first tries to look up the Cache Manager to see if the current plan or branch of the plan is already recorded in the Cache Manager’s dictionary. If the current plan or branch can be found in the dictionary, Catalyst replaces the entire plan or part of the plan with the InMemoryRelation node, thereby fully utilizing the existing cached data for optimization.

Summary #

In today’s lecture, we mainly discussed the logical optimization phase of the Catalyst optimizer. This phase consists of two steps: logical plan parsing and logical plan optimization.

In the logical plan parsing step, Catalyst combines schema information and validates the consistency of table names and column names with actual data for the Unresolved Logical Plan, which only records the query string. The parsed execution plan is called the Analyzed Logical Plan.

In the logical plan optimization step, Catalyst optimizes the Analyzed Logical Plan using three aspects: Adaptive Query Execution (AQE), Cache Manager, and heuristic rules. Among them, Catalyst heavily relies on heuristic rules.

Although there are a total of 81 heuristic rules, we can categorize them into three major groups: predicate pushdown, column pruning, and constant folding. We need to focus on understanding predicate pushdown and column pruning. The optimization motivation and the development principle of “save wherever possible” are the same for both. The core idea is to reduce the amount of data that needs to be scanned and processed and reduce the computational workload in subsequent steps.

For all optimization rules, the Catalyst optimizer applies them to every node of the logical plan by calling the transformDown higher-order function in the TreeNode until the structure of the logical plan no longer changes. At this point, the generated optimized logical plan is called the Optimized Logical Plan.

Finally, the Cache Manager provides a mapping relationship between logical plans and cached data. When an existing logical plan or a branch appears in the mapping dictionary maintained by the Cache Manager, Catalyst can fully utilize the existing cached data for optimization.

Daily Practice #

  1. Since Catalyst has 81 optimization rules in the logical optimization phase, do we still need to follow the principle of “being conservative and procrastinating” in development?
  2. Can you explain why Spark uses partial functions instead of ordinary functions to define Catalyst’s optimization rules?

Looking forward to seeing your thoughts and answers in the comments section. See you in the next lecture!