16 Data Transformation How to Perform Data Processing on Data Frames

16 Data Transformation - How to Perform Data Processing on DataFrames #

Hello, I am Wu Lei.

In the previous lecture, we learned about various ways and methods to create DataFrames. Now, once we have a DataFrame, how do we explore and analyze the data, as well as perform various data transformations on it? After processing the data, how do we present and persist the data? In today’s lecture, we will answer these questions.

To provide developers with enough flexibility, Spark SQL supports two types of development approaches for data processing on DataFrame: one is the well-known Structured Query Language (SQL), and the other is DataFrame development operators. In terms of development efficiency and execution efficiency, there is no difference between the two. The choice of which development approach to use depends entirely on the developer’s personal preference and development habits.

Similar to RDD, DataFrame supports a variety of development operators, but compared to SQL, learning DataFrame operators can be a bit more challenging. Therefore, following the principle of starting with the easy and then progressing to the difficult, let’s first talk about how to use SQL statements in DataFrame, and then we will understand DataFrame development operators.

SQL Statements #

For any DataFrame, we can use createTempView or createGlobalTempView to create temporary tables in Spark SQL.

The difference between the two is that the temporary table created by createTempView is only available within the SparkSession, while the temporary table created by createGlobalTempView can be accessed across SparkSessions in the same application. Once we have a temporary table, we can manipulate the table data flexibly using SQL statements.

In the code snippet below, I will demonstrate how to use createTempView to create a temporary table. First, we use toDF to create a DataFrame containing names and ages, then we call the createTempView method to create a temporary table.

import org.apache.spark.sql.DataFrame
import spark.implicits._

val seq = Seq(("Alice", 18), ("Bob", 14))
val df = seq.toDF("name", "age")

df.createTempView("t1")
val query: String = "select * from t1"
// spark is an instance of SparkSession
val result: DataFrame = spark.sql(query)

result.show

/** Output
+-----+---+
| n ame| age|
+-----+---+
| Alice| 18|
| Bob| 14|
+-----+---+
*/

In the example above, we first create a DataFrame using toDF and spark.implicits._, then we create a temporary table named “t1” on it by calling createTempView. Next, given a SQL query string “query”, we can execute the query by using the sql API provided by SparkSession. The result of the query is wrapped in a new DataFrame.

It is worth mentioning that, similar to the development mode of RDD, the transformations between DataFrames are also lazily evaluated, and they will only be executed when an Action operator, such as show in the example above, is encountered.

Spark SQL uses the ANTLR syntax parser to parse and process SQL statements. As we know, ANTLR is a powerful and cross-language syntax parser that fully supports SQL syntax, making it widely used in distributed data warehouses and compute engines such as Oracle, Presto, Hive, and ElasticSearch. Therefore, SQL queries from Hive or Presto can be smoothly migrated to Spark SQL.

Furthermore, Spark SQL provides a large number of Built-in Functions to assist in data processing, such as array_distinct, collect_list, and more. You can browse the Built-in Functions page on the official website for a complete list of functions. With SQL statements and these flexible built-in functions, you can easily handle typical data application scenarios such as data exploration and analysis.

SQL statements are relatively simple, with a short learning curve and low cost. Once you understand how to convert DataFrames into tables, the rest will be straightforward. Next, we will focus our main efforts on various operators supported by DataFrames. These operators often provide functionalities that greatly improve development efficiency, allowing us to achieve more with less effort.

DataFrame Operators #

It has to be said that DataFrame supports a rich and comprehensive set of operators, which is mainly due to the unique “dual-faced” nature of DataFrame. On one hand, DataFrame comes from RDD and has the same origin as RDD, so most of the operators supported by RDD are also supported by DataFrame. On the other hand, DataFrame carries a schema and is structured data, so it must provide a set of computation operators that are compatible with structured queries.

Because of this “dual-faced” characteristic, we can see from the figure below that the operators supported by DataFrame can be described as “extensive and comprehensive”.

Image

The human brain prefers structured knowledge. To facilitate your memory and understanding, I further divide the operators of DataFrame into 6 categories based on this figure. They are RDD-Supported Operators, Exploratory Operators, Cleaning Operators, Transformation Operators, Analysis Operators, and Persistence Operators.

You might be perplexed and wondering, “Oh my! Do I have to learn so many operators? Isn’t this forcing me to give up from beginner to advanced?” Don’t worry, you can treat the above figure as a “DataFrame Operators Mind Map” or a dictionary. When you run out of ideas during daily development, you can take it out and see which operators can help you achieve your business logic.

In today’s lesson, we will focus on the most commonly used and critical parts based on this “mind map”.

RDD-Supported Operators #

Let’s start with the RDD-supported operators in DataFrame. These operators were introduced in the three lessons on RDD operators. If you don’t remember the functions or meanings of any of these operators, you can review the previous three lessons. I have organized these operators into a table according to the previous classification.

Image

Exploratory Operators #

Next are the exploratory operators in DataFrame. The so-called exploration refers to data exploration. The purpose of these operators is to help developers have a preliminary understanding and recognition of the data, such as the data schema, data distribution, and data “appearance”, etc., in order to lay the foundation for subsequent application development.

For commonly used exploratory operators, I have organized them into the table below. Take a look first to get a “first impression”.

Image

Let’s go through these operators “from the weighty to the trifling”. First, columns/schema/printSchema are similar operators that can help us get the data columns and schema of the DataFrame. In particular, printSchema prints the Data Schema in plain text on the screen, as shown below.

import org.apache.spark.sql.DataFrame
import spark.implicits._

val employees = Seq((1, "John", 26, "Male"), (2, "Lily", 28, "Female"), (3, "Raymond", 30, "Male"))
val employeesDF: DataFrame = employees.toDF("id", "name", "age", "gender")

employeesDF.printSchema

/** Result
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- gender: string (nullable = true)
*/

After understanding the data schema, we often want to know what the data looks like. show operator can help achieve this. By default, show randomly prints 20 data records of the DataFrame.

employeesDF.show

/** Result
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| John| 26| Male|
| 2| Lily| 28|Female|
| 3|Raymond| 30| Male|
+---+-------+---+------+
*/

Once you see the “true face” of the data, you can further use describe to view the statistical distribution of numeric columns. For example, by calling employeesDF.describe(“age”), you can view the extreme values, average, variance, and other statistical values of the age column.

After gaining a basic understanding of the data, if you are interested in the execution plan of the current DataFrame, you can use the explain operator to get the execution plan provided by Spark SQL. explain plays a vital role in optimizing execution efficiency. In subsequent lessons, we will explain the usage and interpretation of explain in more detail with specific examples. For now, you only need to know that explain is used to view the execution plan.

+---+----+---+------+----------------+
| id|name|age|gender|        interests|
+---+----+---+------+----------------+
|  1|John| 26|  Male|[Sports, News]  |
|  2|Lily| 28|Female|[Shopping, Reading]|
|  3|Raymond|30|  Male|null|
+---+----+---+------+----------------+

现在,我们可以使用explode来展开interests字段,生成新的数据记录。

employeesDF.select($"id", $"name", $"age", $"gender", explode($"interests").as("interest")).show

结果打印:

+---+----+---+------+--------+
| id|name|age|gender|interest|
+---+----+---+------+--------+
|  1|John| 26|  Male|  Sports|
|  1|John| 26|  Male|   News |
|  2|Lily| 28|Female|Shopping|
|  2|Lily| 28|Female|Reading |
+---+----+---+------+--------+

可以看到,原本的两行数据记录,根据interests字段展开后,变成了四行。每一个兴趣爱好都生成了一行新的数据记录。

掌握了转换类算子,我们就可以轻松地进行各种数据转换与加工,满足业务需求。 import spark.implicits._ import org.apache.spark.sql.DataFrame

// Create the employees table
val seq = Seq((1, "Mike", 28, "Male"), (2, "Lily", 30, "Female"), (3, "Raymond", 26, "Male"))
val employees: DataFrame = seq.toDF("id", "name", "age", "gender")

// Create the salaries table
val seq2 = Seq((1, 26000), (2, 30000), (4, 25000), (3, 20000))
val salaries: DataFrame = seq2.toDF("id", "salary")

employees.show

/** Output:
+---+-------+---+------+
| id| name|age|gender|
+---+-------+---+------+
| 1| Mike| 28| Male|
| 2| Lily| 30|Female|
| 3|Raymond| 26| Male|
+---+-------+---+------+
*/

salaries.show

/** Output:
+---+------+
| id|salary|
+---+------+
| 1| 26000|
| 2| 30000|
| 4| 25000|
| 3| 20000|
+---+------+
*/

To begin, let’s join the two tables together using the join method. We will use the “id” column as the join key and perform an inner join.

val jointDF: DataFrame = salaries.join(employees, Seq("id"), "inner")
jointDF.show()

/** Result print
+---+------+-------+---+------+
| id|salary| name|age|gender|
+---+------+-------+---+------+
| 1| 26000| Mike| 28| Male|
| 2| 30000| Lily| 30|Female|
| 3| 20000|Raymond| 26| Male|
+---+------+-------+---+------+
*/

As we can see, we call the join operator on top of salaries, which has three parameters. The first parameter is the table to be joined, in our case it is the employee table. The second parameter is the join condition, which specifies which fields to join the two tables on, in our case it is the id column. The third parameter is the join type. We know that there are different types of join, such as inner, left, right, anti, semi, etc. We will discuss these join types in the next lecture. For now, you only need to know that Spark SQL supports these various types of joins.

After the data is joined, we only get the most detailed fact data, which is how much salary each employee receives each month. This fact data itself does not have much value. We often need to group and aggregate the data from different perspectives to obtain more valuable insights.

For example, if we want to group the data by gender and calculate the total and average salary for each gender, in order to analyze any potential relationship between salary and gender:

val aggResult = fullInfo.groupBy("gender").agg(sum("salary").as("sum_salary"), avg("salary").as("avg_salary"))

aggResult.show()

/** Data print
+------+----------+----------+
|gender|sum_salary|avg_salary|
+------+----------+----------+
|Female| 30000| 30000.0|
| Male| 46000| 23000.0|
+------+----------+----------+
*/

Here, we use the groupBy operator to group the data by the “gender” column, and then use the agg operator to perform aggregate operations. Inside the agg operator, we use the sum and avg aggregate functions to calculate the total and average salary. Spark SQL supports a variety of built-in aggregate functions, which can be found on the Built-in Functions page. With the help of these functions, we can flexibly perform statistical analysis on the data.

After obtaining the statistical results, we can use the sort or orderBy operator to sort the result set for easier viewing. The usage and effects of these two operators are exactly the same, as shown in the tables below:

aggResult.sort(desc("sum_salary"), asc("gender")).show()

/** Result print
+------+----------+----------+
|gender|sum_salary|avg_salary|
+------+----------+----------+
| Male| 46000| 23000.0|
|Female| 30000| 30000.0|
+------+----------+----------+
*/

aggResult.orderBy(desc("sum_salary"), asc("gender")).show()

/** Result print
+------+----------+----------+
|gender|sum_salary|avg_salary|
+------+----------+----------+
| Male| 46000| 23000.0|
|Female| 30000| 30000.0|
+------+----------+----------+
*/

As we can see, sort/orderBy supports sorting by multiple columns, and the sorting direction can be specified using desc and asc. Here, desc indicates descending order, and asc indicates ascending order.

And that’s it! So far, we have gone through the Spark SQL operators at different stages of the data lifecycle, including exploration, cleansing, transformation, and analysis.

Image

To wrap up the entire data lifecycle, we have the last stage - data persistence. For this final stage, Spark SQL provides the write API, complementing the read API introduced in the previous lecture. The write API allows developers to flexibly persist data in different file formats.

Persistence Operators #

Before diving into the write API, let’s first review the read API introduced in the previous lecture.

Image

As shown in the above diagram, the read API has three key components: the file format specified by format, zero or more loading options specified by option, and the source file path marked by load.

In contrast, the write API also consists of three key components: the file format specified by format, zero or more “write options” specified by option, and the storage path specified by save, as shown in the diagram below.

Image

In this case, format and save correspond to format and load in the read API, respectively, used to specify the file format and storage path. In fact, apart from mode, the option keys in the write API are consistent with the option keys in the read API. You can refer to the previous lecture for more options.

In the read API, the mode option is used to specify the read mode, such as permissive, dropMalformed, failFast. In the write API, however, mode is used to specify the “write mode”, which can be one of Append, Overwrite, ErrorIfExists, or Ignore. Their meanings and descriptions are shown in the table below:

Image

With the write API, we can flexibly persist the DataFrame into different storage systems, thereby completing the data lifecycle.

Key Review #

In today’s lecture, we mainly focused on the lifecycle of data and learned about the processing operators supported by Spark SQL in different stages of data, as shown in the following figure.

Image

There are many operators involved in the figure. Although we have given examples for most of them, it is indeed challenging to grasp so much content in a short time. However, you don’t have to worry. The main purpose of today’s lecture is to give you an overall understanding of the operators supported by Spark SQL.

As for what each operator is specifically used for, you can review this lecture repeatedly in future development work, combined with practice to deepen your understanding gradually. This way, learning will be more efficient. I also strongly recommend that you take a look at the Built-in Functions list on the official website in your free time, so that you will have a good grasp of the functions and be able to implement business logic more effectively.

In addition to the operators supported by DataFrame itself, SQL can also achieve the same data analysis in terms of functionality. Given a DataFrame, you only need to create a temporary table through createTempView or createGlobalTempView, and then you can use SQL statements to explore, skew, transform, and analyze data.

Finally, it should be noted that there is no distinction between the quality of DataFrame operators and SQL queries. They can achieve the same data applications and have consistent performance in terms of execution. Therefore, you can freely choose between them based on your development habits and preferences.

Practice for Each Lesson #

In the transformation operators, we have introduced the explode operator as an example. Its function is to expand (explode) a piece of data into multiple pieces of data, according to the data column with an array as an element. With the understanding of this operator, can you analyze whether the explode operation will introduce Shuffle computation?

Feel free to interact with me in the comment area, and I also recommend you to share this lesson with friends in need.