20 Rdd and Data Frame Which Shines Brighter

20 RDD and DataFrame - Which Shines Brighter #

Hello, I’m Wu Lei.

Starting today, we will begin our study on performance tuning in Spark SQL. In this section, I will first introduce you to the existing optimization mechanisms in Spark SQL, such as Catalyst and Tungsten, as well as new features like AQE and DPP. Understanding these built-in optimization mechanisms will give you a higher starting point when developing applications. Afterwards, we will delve into the methods and techniques for performance tuning in typical data analysis scenarios, such as data association.

In today’s lecture, let’s first talk about the origin of RDD and DataFrame. This is also a question that interviewers often ask. For example, “With the release of Spark 3.0, the optimization proportion of Spark SQL is close to 50%, while the optimization proportion of PySpark, Mllib, and Streaming is less than 10%, and the proportion for Graph can almost be neglected. Does this mean that the Spark community is gradually abandoning other computing domains and only focusing on data analysis?”

The standard answer to this question is, “Spark SQL replaces Spark Core as the new generation engine kernel, and all other sub-frameworks such as Mllib, Streaming, and Graph can share the performance optimization of Spark SQL, benefiting from the Spark community’s investment in Spark SQL.” However, interviewers aren’t that easy to deal with. Once you say this, he/she might follow up with, “Why do we need Spark SQL as the new generation engine kernel? Are there any problems with Spark Core? What problems does Spark SQL solve in Spark Core? How does it solve them?”

Can you answer this series of “arrow-like” follow-up questions? Next, I will start with the pain points of RDDs and gradually guide you through the inevitability of DataFrame, the limitations of Spark Core, and its relationship with Spark SQL.

The Pain of RDD: Limited Optimization Space #

Since the release of DataFrames in version 1.3, it has gradually replaced RDD and become the preferred choice for developers in the Spark community. We know that the birth of new abstractions must be to solve the problems that old abstractions cannot handle. So, what are these problems? Let’s analyze together.

In the development framework of RDD, we can easily implement business logic by calling RDD operators in appropriate combinations. I have summarized these frequently used RDD operators in the table below, you can take a look.

The highlighted operators in the table are RDD transformations and aggregations, which are all higher-order functions. Higher-order functions refer to functions that have functions as parameters or functions that have functions as return values. For convenience, we call those RDD operators that are higher-order functions “higher-order operators”.

For these higher-order operators, developers need to provide the specific computation logic in the form of a lambda function. Let’s take map as an example. We need to specify which fields to map and the mapping rules. Take filter as another example. We need to specify the conditions and fields to filter.

However, in this way, Spark only knows that developers want to do map and filter, but it doesn’t know how developers want to do map and filter. In other words, in the RDD development mode, Spark Core only knows “what to do” but not “how to do it”. This makes Spark Core blindfolded, and there is no extra space for optimization except for passing the lambda function as a closure to the Executors.

For Spark Core, the limited optimization space mainly affects the execution performance of the application. A typical example is that PySpark implementations have significantly lower execution performance compared to Java or Scala. The reason is that even if it is the same application, different language implementations have a huge difference at runtime in the RDD development mode.

When we develop using Java or Scala, all computations are done within the JVM process, as shown on the left side of the Spark computing node in the figure.

But when we develop on PySpark, we can only send the computation code composed of RDD operators to the Python process. The Python process is responsible for executing the specific script code, completing the computation, and then returning the result to the Executor process. Since each task requires a Python process, if the parallelism of the RDD is #N, the entire cluster will need #N of these Python processes to interact with Executors. It is not difficult to see that the overhead of task scheduling, data computation, and data communication is the main culprit for the poor performance of PySpark.

The Emergence of DataFrames #

In response to the core issue of limited optimization space, the Spark community, in deep pain, released DataFrames in version 1.3 in 2013. So, what are the characteristics of DataFrames and how do they differ from RDDs?

Firstly, to put it simply, a DataFrame is a structured distributed dataset that carries a data schema, while an RDD is an unstructured distributed dataset. Therefore, from a data representation perspective, the only difference between them is whether or not they carry a schema. The presence of a schema in the data representation form determines that DataFrames can only encapsulate structured data, while RDDs do not have this limitation, so they can encapsulate semi-structured and unstructured data in addition to structured data.

Secondly, in terms of development APIs, RDD operators are mostly higher-order functions, which allow developers to flexibly implement business logic and have very strong expressive power.

On the other hand, the expressive power of DataFrames is weak. Firstly, it defines a set of DSL (Domain Specific Language) operators, such as select, filter, agg, groupBy, and so on. Because DSL languages are specialized programming languages designed to solve a specific type of task, they are not Turing complete and therefore have very limited expressive power. Secondly, most of the operators in DataFrames are scalar functions, where their parameters are often structured data columns, which also limits their expressive power.

You may ask, “Compared to RDDs, both the representation and expressive power of DataFrames have become weaker. How does it solve the core pain point of limited optimization space for RDDs?”

Of course, it is extremely difficult to solve the core pain point of RDDs solely based on the API changes in DataFrames. The greatest significance of the DataFrame API lies in that it opens up a brand new space for the optimization of the Spark engine.

Firstly, the type information carried by the Schema in a DataFrame allows Spark to design customized data structures based on explicit field types, greatly improving the efficiency of data storage and access. Secondly, the calculation logic determined by scalar operators in DataFrames allows Spark to optimize the calculation process of DataFrames based on heuristic rules and strategies, and even dynamic runtime information.

Spark SQL Intelligent Brain #

So, the question is, with the DataFrame API, who is the behind-the-scenes hero responsible for optimizing the engine kernel? To support the DataFrame development mode, Spark introduced Spark SQL starting from version 1.3. Spark SQL has two core components: Catalyst optimizer and Tungsten. We will discuss the features and optimization process of Catalyst and Tungsten in the next two lectures. In today’s lecture, let’s focus on their relationship with DataFrame.

Catalyst: Execution Process Optimization #

Let’s first talk about the optimization process of Catalyst. When developers trigger a computation request on DataFrame through Actions operators, interesting things happen internally in Spark.

First, based on the exact computation logic of DataFrame, Spark uses a third-party SQL parser called ANTLR to generate an Abstract Syntax Tree (AST). Since it is a tree, it consists of two basic elements: nodes and edges. The nodes record the processing logic of scalar operators such as select and filter, and the edges carry data information: relation tables and data columns, as shown in the following figure. This syntax tree describes the conversion process from source data to DataFrame result data.

In Spark, the syntax tree is also called “Unresolved Logical Plan”. It is the starting point of the Catalyst optimization process. The reason it is named “Unresolved” is because the relation tables and data columns recorded on the edges are just strings and have not been associated with actual data. For example, after the Filter operator, the output data columns are joinKey and payLoad. The sources of these strings are the DSL queries of DataFrame. Catalyst does not know if these field names are valid, let alone what each field type is.

Therefore, the first step of optimization that Catalyst does is to combine the Schema information of DataFrame to confirm whether the table names, field names, and field types in the plan are consistent with the actual data. This process is also called transforming the “Unresolved Logical Plan” into the “Analyzed Logical Plan”.

Based on the resolved “Analyzed Logical Plan”, Catalyst can continue optimization. Using heuristic rules and execution strategies, Catalyst ultimately transforms the logical plan into an executable physical plan. In summary, Catalyst’s optimization space comes from the DataFrame development mode.

Tungsten: Data Structure Optimization #

After talking about Catalyst, let me now talk about Tungsten. In the lecture on development principles, we mentioned that Tungsten uses customized data structure called Unsafe Row to store data. The advantages of Unsafe Row are high storage efficiency and high garbage collection efficiency. The reason Tungsten can design such a data structure is also thanks to the Schema carried by DataFrame. We have talked about Unsafe Row before, and let’s briefly review it again here.

Tungsten stores each user data using a binary byte sequence, so it outperforms Java Object in terms of storage efficiency. For example, if we want to store the data in the table above, it would consume 100 bytes using Java Object, but using Tungsten requires less than 20 bytes, as shown in the following figure.

However, in order to achieve the binary sequence shown in the figure above, Tungsten must know the Schema of the data entries. In other words, it needs to know the data types of each field to determine where to place fixed-length fields, insert offsets, and store data values for variable-length fields. DataFrame happens to satisfy this prerequisite.

Let’s imagine for a moment, if the data is encapsulated in RDD, can Tungsten achieve this? Of course not. This is because although RDD has types, such as RDD[Int], RDD[(Int, String)], if RDD carries developer-defined data types, such as RDD[User] or RDD[Product], Tungsten will be completely unaware of what your User and Product abstractions are. The universality of RDD is a double-edged sword. While it provides development flexibility, it also makes it extremely difficult to optimize the engine kernel.

In summary, based on the simple scalar operators of DataFrame and the explicit Schema definition, with the help of Catalyst optimizer and Tungsten, Spark SQL has the ability to build an end-to-end optimization mechanism at runtime. This mechanism applies heuristic rules and strategies, as well as runtime execution information, to transform originally suboptimal or even inefficient query plans into efficient execution plans, thereby improving end-to-end execution performance. Therefore, under the DataFrame development framework, regardless of which programming language you use, developers can all enjoy the performance benefits brought by Spark SQL.

Finally, let’s review the interview question mentioned at the beginning: “From version 2.0 to the present, the improvement and optimization of Spark in other sub-frameworks has a very low proportion compared to Spark SQL. Does this mean that the future development focus of Spark is data analysis, and other scenarios such as machine learning and stream processing will gradually be marginalized?”

Initially, Spark SQL was indeed just a sub-framework for running SQL and DataFrame applications. However, with the increasingly improved optimization mechanisms, Spark SQL gradually replaced Spark Core and evolved into the next-generation engine kernel. So far, the source code implementations of all sub-frameworks have switched from RDD to DataFrame. Therefore, like PySpark, sub-frameworks such as Streaming, Graph, and Mllib actually run on top of Spark SQL through the DataFrame API, so they can naturally share various optimization mechanisms introduced by Spark SQL.

Figuratively speaking, Spark SQL is like the intelligent brain of Spark. Anything seen through the “eyes” of DataFrame will be analyzed and optimized by the intelligent brain, and the optimized action instructions will eventually be executed by the Executors, which are like the “limbs”.

Summary #

Today, we discussed the inevitability of DataFrame’s emergence, the limitations of Spark Core, and its relationship with Spark SQL, which deepened our understanding of Spark SQL.

The core pain point of RDD is limited optimization space. This means that the functions encapsulated in the higher-order operators of RDD are completely transparent to Spark, so Spark has no way to optimize the computational logic.

Compared to RDD, DataFrame is a distributed dataset with a schema and can only encapsulate structured data. Most of the operators in DataFrame are ordinary scalar functions that consume data columns. However, the weaker expressive power of DataFrame actually opens up new space for the optimization of the Spark engine.

Based on the simple scalar operators and explicit schema definition of DataFrame, with the help of Catalyst optimizer and Tungsten, Spark SQL has the ability to build an end-to-end optimization mechanism at runtime. This mechanism uses heuristic rules, strategies, and runtime execution information to transform originally suboptimal or even inefficient query plans into efficient execution plans, thereby improving end-to-end execution performance.

In the development mode of DataFrame, all sub-frameworks, as well as PySpark, run on top of Spark SQL and can share the various optimization mechanisms provided by Spark SQL. This is also the fundamental reason why Spark SQL has the highest proportion in each new release of Spark.

Daily Practice #

  1. Why does Java Object have a relatively large overhead on object storage? How many bytes does the JVM need to store the string “abcd”?

  2. In the DataFrame development framework, what other operations in PySpark are “stubborn” and frequently cause interaction between the JVM process and the Python process? (Hint: Refer to the limitations of RDD, those computation logics transparent to Spark have no optimization space for Spark)

Looking forward to seeing your thoughts and answers in the comments section. See you in the next lecture!