14 Foreground and Background the Origins of Data Frame and Spark SQL

14 Foreground and Background - The Origins of DataFrame and Spark SQL #

Hello, I’m Wu Lei.

In the previous lesson, using the example of “Car Sales Analysis,” we learned about the general pattern of application development in the Spark SQL sub-framework. First, we used the read API of SparkSession to create a DataFrame. Then, starting from the DataFrame, we used various operators to perform transformations between different DataFrames for data analysis.

Although we mentioned that DataFrame can be seen as a special type of RDD, you may still be confused about the fundamental differences between DataFrame and RDD. Since Spark already had the RDD as the development interface, why did they create DataFrame?

I believe that after completing the previous lesson, these questions must still linger in your mind. Don’t worry, today we will provide an overview of the origins of DataFrame, and then trace back to understand Spark SQL, the behind-the-scenes hero that helped DataFrame shine.

The Tragedy of RDD: Limited Optimization Space #

In the lecture on RDD operators ([Lesson 3]), we have discussed a thought-provoking question about the common features of operators like map, mapPartitions, filter, and flatMap.

Today, let’s take a fresh perspective to reexamine this question. The conclusion is that they are all higher-order functions.

A higher-order function refers to a function that takes a function as a parameter or returns a function as the result. In other words, a higher-order function is essentially a function, but with the special feature that its parameters or return type, or both, are functions. If either the parameter or the return type is a function, the original function is considered a higher-order function.

These operators mentioned above, such as map and filter, require an auxiliary function f as a parameter to complete the computation by calling map(f) or filter(f). Using map as an example, we need function f to specify which fields to map and the mapping rules. The same goes for filter, where we need function f to indicate the filtering conditions on which fields.

However, as a result, Spark only knows that developers want to perform map and filter operations but does not know how they intend to do the mapping or filtering. In other words, for Spark, the auxiliary function f is transparent. Under the RDD development framework, Spark Core only knows “what” developers want to do, not “how” to do it. This leaves Spark Core in the dark, with no additional optimization space, except for sending function f as a closure to Executors. And this is the tragedy of RDD.

DataFrame Emerges #

To address the space optimization issue of RDDs, the Spark community released the DataFrame in version 1.3. So, what are the differences between DataFrames and RDDs? Let’s compare them from two aspects: data representation and development operators.

Like RDDs, DataFrames are used to encapsulate distributed datasets. But they differ in terms of data representation. DataFrames are structured data with a data schema, while RDDs are distributed datasets without schemas. It is precisely because of the schema which provides explicit type information that Spark can have a clear understanding and design more compact data structures, thus greatly improving data storage and access efficiency.

In terms of API development, RDD operators often use higher-order functions. Higher-order functions have strong expressive power, allowing developers to flexibly design and implement business logic. On the other hand, DataFrames have limited expressive power. They define a set of DSL operators (Domain Specific Language), such as select, filter, agg, groupBy, and so on, which are all DSL operators.

DSL languages are often designed to solve specific tasks and are not Turing-complete, so they are very limited in terms of expressive power. Most of the DataFrame operators are scalar functions, and their parameters are often data columns from a structured two-dimensional table.

Although DataFrame operators have weaker expressive power, the calculation logic of each operator is determined. For example, select is used to extract certain fields, groupBy is used to group data, and so on. These calculation logics are no longer transparent to Spark. Therefore, Spark can optimize the calculation process of DataFrames based on heuristic rules or strategies, or even dynamic runtime information.

In summary, compared to RDDs, DataFrames open up new possibilities for kernel optimization in the Spark engine through the use of schemas with explicit type information and transformation operators with clear calculation logic.

Behind the Scenes Hero: Spark SQL #

So, the question is, once the optimization space is opened up, who is the real hero behind optimizing the core engine (Spark Core)? I believe you can guess it without me having to say it – it is Spark SQL.

In order to fully understand Spark SQL, we need to clarify its relationship with Spark Core. As we progress in our learning process, we will come across more and more new concepts and knowledge points. Clarifying the relationship between Spark SQL and Spark Core will help you build a systematic knowledge system and a global perspective, enabling you to see both the trees and the forest in the learning process.

First of all, Spark Core specifically refers to the underlying execution engine of Spark (Execution Engine), which includes the core functional modules we discussed in the basic knowledge section, such as the scheduling system, storage system, memory management, and shuffle management. Spark SQL, on the other hand, is built on top of Spark Core and serves as an independent optimization engine (Optimization Engine). In other words, Spark Core is responsible for execution, while Spark SQL is responsible for optimization. The optimized code from Spark SQL still needs to be handed over to Spark Core for execution.

Image

Furthermore, in terms of development entry, application programs developed under the RDD framework are directly handed over to Spark Core for execution. On the other hand, applications developed using the DataFrame API go through Spark SQL first, and then the optimized code is handed over to Spark Core for execution.

Now that we have clarified the relationship and positioning of the two, the next question is: “How does Spark SQL optimize based on DataFrame?” To answer this question, we must start with two core components of Spark SQL: the Catalyst optimizer and Tungsten.

Let’s start with the Catalyst optimizer, whose responsibility is to create and optimize execution plans. It consists of three functional modules: creating syntax trees and generating execution plans, logical phase optimization, and physical phase optimization. Tungsten is used to bridge the gap between the Catalyst execution plan and the underlying Spark Core execution engine, and is mainly responsible for optimizing data structures and executable code.

Image

Next, let’s talk about how the code in Spark SQL is optimized based on the example of “rate analysis” discussed in the previous lesson. I have posted the complete code implementation of “rate analysis” here, you may want to review it first.

import org.apache.spark.sql.DataFrame

val rootPath: String = _
// Applicant data
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark is the default SparkSession instance in spark-shell
// Read source file using the read API
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)

// Lucky draw data
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// Read source file using the read API
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)

// Filter lucky draw data after 2016 and only extract the carNum field
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")

// Inner join lottery data with lucky draw data, with the carNum field as the join key
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")

// Group by batchNum and carNum and calculate the rate multiplier
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum"))
.agg(count(lit(1)).alias("multiplier"))

// Group by carNum and keep the maximum rate multiplier
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum")
.agg(max("multiplier").alias("multiplier"))

// Group by multiplier and count the number of people
val result: DataFrame = uniqueMultipliers.groupBy("multiplier")
.agg(count(lit(1)).alias("cnt"))
.orderBy("multiplier")

result.collect

Catalyst Optimizer #

First, let’s talk about the optimization process of Catalyst. Based on the exact transformation logic between DataFrames in the code, Catalyst first uses a third-party SQL parser, ANTLR, to generate an Abstract Syntax Tree (AST). The AST consists of nodes and edges, with nodes representing various operation operators, such as select, filter, agg, etc., and edges recording the Schema information of the data table, such as field name, field type, etc. Take the syntax tree of the “scalar analysis” in the following figure as an example. It actually describes the conversion process from the source data to the final calculation result. Therefore, within the scope of Spark SQL, the AST syntax tree is also called the “execution plan”.

Image

As can be seen, the syntax tree or execution plan composed of operators provides clear execution steps. Even without any optimization, Spark Core can run this “original” execution plan step by step.

However, from the perspective of execution efficiency, this is not the optimal choice. Why do we say that? Let’s take the green nodes in the figure as an example, Scan is used to fully scan and read the data of the selected people, Filter is used to filter out the data with lottery batch greater than or equal to “201601”, and the Select node is used to extract the “carNum” field from the data.

Remember? Our source file is stored in Parquet format, and Parquet format supports two features at the file level: “Predicates Pushdown” and “Columns Pruning”.

Predicates Pushdown refers to the use of filtering conditions like “batchNum >= 201601” to only read data files that satisfy the conditions during the file scanning process. And because the Parquet format is a columnar storage data structure, Spark only needs to read the data files with the field name “carNum”, thus “pruning off” the process of reading other data files.

Image

Taking the selected data as an example, with the help of predicate pushdown and column pruning, Spark Core only needs to scan the green part of the figure. Obviously, both of these optimizations can help Spark Core significantly reduce data scanning and disk I/O consumption, thereby improving data reading efficiency significantly.

Therefore, if the execution order of the three green nodes can be adjusted from “Scan > Filter > Select” to “Filter > Select > Scan”, the adjusted execution plan can bring better execution performance to Spark Core compared to the original execution plan.

Features like predicate pushdown and column pruning are called heuristic rules or strategies. One of the core responsibilities of the Catalyst optimizer is to adjust and optimize the execution plan based on heuristic rules and strategies during the logical optimization stage, laying a foundation for performance improvement in the physical optimization stage. After the logical optimization stage, the original execution plan is adjusted as shown in the following figure. Note the change in the order of the green nodes.

Image

The execution plan optimized by the logical optimization stage can still be directly handed over to Spark Core for execution. However, Catalyst does not stop here in terms of performance optimization.

In addition to the logical optimization stage, Catalyst will further optimize the execution plan in the physical optimization stage. Unlike the logical optimization stage, which mainly relies on prior heuristic experience, the physical optimization stage mainly relies on various statistical information, such as data table size, whether data caching is enabled, shuffle intermediate files, etc. In other words, logical optimization is more of an “empiricism”, while physical optimization is “speaking with data”.

Taking the blue Join node in the figure as an example, the execution plan only states that applyNumbersDF needs to be inner joined with filteredLuckyDogs, but it does not clearly indicate which mechanism to use for the join. According to the implementation mechanism, there are three ways to implement data join: Nested Loop Join (NLJ), Sort Merge Join, and Hash Join.

And according to the data distribution method, data join can be divided into Shuffle Join and Broadcast Join. Therefore, in a distributed computing environment, there are at least six Join strategies for Spark SQL to choose from. We will delve into these six Join strategies in detail later, here you only need to understand that different strategies have a huge difference in execution efficiency.

Returning to the example of the blue Join node, in the physical optimization stage, the Catalyst optimizer needs to combine the storage sizes of applyNumbersDF and filteredLuckyDogs to decide whether to use the stable but slightly less performant Shuffle Sort Merge Join, or the more performant Broadcast Hash Join. Regardless of the join strategy Catalyst decides to use, the optimized execution plan can be handed over to Spark Core for execution. However, the Spark SQL optimization engine doesn’t stop there. After Catalyst optimizes the plan, Tungsten takes over and continues to refine and improve the execution code on top of the plan output by Catalyst, aiming to deliver the most optimal execution code to the underlying Spark Core execution engine.

Image

Tungsten #

Building on the foundation laid by Catalyst, Tungsten mainly focuses on further optimization in two areas: data structures and execution code. Data structure optimization refers to the design and implementation of Unsafe Row, while execution code optimization refers to Whole Stage Code Generation (WSCG).

Let’s first look at why we need Unsafe Row. By default, Spark SQL uses the org.apache.spark.sql.Row object to encapsulate and store each data record in a DataFrame. However, we know that using Java objects to store data introduces a lot of additional storage overhead.

To address this, Tungsten has designed and implemented a binary data structure called Unsafe Row. Unsafe Row is essentially a byte array that stores each data record in the DataFrame in a highly compact format, significantly reducing storage overhead and improving data storage and access efficiency.

Taking the data schema for the table below as an example, for each data record that contains the four fields shown, if we use the default Row object for storage, each record would require at least 60 bytes.

Image

However, if we use the Tungsten Unsafe Row data structure for storage, each data record only requires a dozen or so bytes, as shown in the figure below.

Image

After discussing the optimization of the Unsafe Row data structure, let’s move on to WSCG: Whole Stage Code Generation. “Whole stage” refers to the stages we learned about in the scheduling system. In the execution plan shown in the image, the three nodes marked in green are assigned to the same stage during task scheduling.

Image

Code generation refers to Tungsten combining the “chain calls” between operators into one piece of code at runtime. Taking the three green nodes in the image as an example, by default, Spark Core would execute the Filter, Select, and Scan operations for each data record one by one.

After the WSCG optimization by Tungsten, the Filter, Select, and Scan operators are “combined” into one function f. With this optimization, Spark Core only needs to use function f to process each data record in one go, eliminating the overhead of data communication between different operators and completing the computation in one fell swoop.

So, after completing the two optimization phases of Catalyst and Tungsten, Spark SQL finally hands over the optimized execution plan and the generated execution code to its big brother, Spark Core. With the plan and code, Spark Core utilizes the Tungsten Unsafe Row data structure at runtime to perform distributed task computation. And with that, we have covered the content of this lesson.

Key Review #

Today’s lecture covers a lot of content, let’s summarize it together.

First of all, in the RDD development framework, the optimization space of Spark Core is limited. The computing logic encapsulated by the vast majority of RDD high-level operators (the parameterized function f) is transparent to Spark Core. Apart from distributing the function f to Executors using closures, Spark Core doesn’t have much room for optimization.

The emergence of DataFrame brings new ideas. It carries rich type information with its Schema, and most of the DataFrame operators are scalar functions that process data columns. These two characteristics of DataFrame open up a whole new space for optimizing the engine kernel. In the development framework of DataFrame, it is Spark SQL that is responsible for the specific optimization process.

Spark SQL is an optimization engine that is built on top of Spark Core. Its main responsibility is to optimize user code before delivering it to Spark Core.

Image

Spark SQL consists of two core components, Catalyst optimizer and Tungsten. The optimization process is also divided into two stages, Catalyst and Tungsten.

In the Catalyst optimization stage, Spark SQL first converts user code into the AST syntax tree, also known as the execution plan, and then adjusts the execution plan through logical and physical optimizations. Logical optimization, mainly based on prior heuristic experience, such as predicate pushdown, column pruning, optimizes the execution plan. On the other hand, physical optimization mainly uses statistical information to select the best execution mechanism or add necessary compute nodes.

Tungsten further optimizes from two aspects: data structure and execution code. Compared to the default Java Object, the binary Unsafe Row stores data records in a more compact manner, greatly improving the efficiency of data storage and access. Full-stage code generation eliminates data transmission between different operators within the same stage by merging multiple operators into a unified function, and applies this function to the data at once. This significantly improves the calculation efficiency compared to the “chained calls” of different operators.

Exercise for Each Lesson #

After studying this lesson, we learned that only DataFrames can benefit from the optimization process of Spark SQL, while RDDs can only be executed directly by Spark Core. Does this mean that the RDD development framework will exit the stage of history, and the knowledge points related to RDD that we have learned before, such as the concept of RDD, RDD properties, and RDD operations, are all useless?

Image

Welcome to exchange and discuss with me in the comments section, and I also recommend you to share the content of this lesson with more friends.