15 Data Sources and Format Where Data Frames Come From

15 Data Sources and Format - Where DataFrames Come From #

Hello, I’m Wu Lei.

In the previous lecture, we focused on the origins of DataFrame and Spark SQL, and mentioned that DataFrame is an important entry point for Spark SQL. In other words, by creating a DataFrame and using the DataFrame development API, we can fully leverage the performance benefits provided by the Spark SQL optimization engine. Obviously, for beginners, the first step of creating a DataFrame becomes extremely important.

In the previous lecture, when we analyzed the car license plate lottery multiplier, we used the read API of SparkSession to create a DataFrame from a Parquet file. However, there are many other ways to create a DataFrame. It can be said without exaggeration that there are extremely rich ways to create a DataFrame, why is that?

As shown in the figure below, Spark supports multiple data sources, which can be divided into several categories according to the data source: driver-side custom data structures, (distributed) file systems, relational databases (RDBMS), relational data warehouses, NoSQL databases, and other computing engines.

Image

Obviously, it is not practical, nor necessary, to have an in-depth introduction to Spark’s integration with each data source. We only need to focus on the most commonly used and common integration methods.

In this lecture, I will explain five common ways to create a DataFrame from the perspectives of the driver, file system, and RDBMS. Then I will take you to understand the usage scenarios and analyze the advantages and disadvantages of different methods.

Creating DataFrame from Driver #

In Spark, you can directly create a DataFrame from data structures such as arrays, tuples, and maps on the Driver side. DataFrames created in this way usually have limited data volume, so they are often used for auxiliary calculations or data exploration rather than directly participating in distributed computing. Nevertheless, learning this part is still very necessary as it can help us intuitively understand the relationship between DataFrame and RDD.

Do you remember? In terms of data representation, compared to RDDs, DataFrames only have an additional Schema. You can even say that a DataFrame is an RDD with a Schema. Therefore, the first method to create a DataFrame is to first create an RDD and then give it a Schema.

To create an RDD, let’s first define the list data structure seq. Each element of seq is a tuple, where the first element is of type String and the second element is of type Int. With the list data structure, we then create the RDD as shown below.

import org.apache.spark.rdd.RDD
val seq: Seq[(String, Int)] = Seq(("Bob", 14), ("Alice", 18))
val rdd: RDD[(String, Int)] = sc.parallelize(seq)

Once we have the RDD, let’s make a “hat” for it, which is the Schema we just mentioned. To create the Schema, we need to use several built-in types in Spark SQL, such as StructType, StructField, StringType, IntegerType, and so on.

Among them, StructType is used to define and encapsulate the Schema, StructField is used to define each field in the Schema, including the field name and field type. Stringtype, IntegerType, and other similar *Type types represent field types. To be consistent with the RDD data type, the element type corresponding to the Schema should be (StringType, IntegerType).

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
val schema: StructType = StructType(Array(
  StructField("name", StringType),
  StructField("age", IntegerType)
))

Alright, we now have the RDD and the “hat” Schema tailored for it. However, before putting the hat on, we need to tidy up the “hairstyle” of the RDD. What does that mean?

The createDataFrame method has two parameters. The first parameter is the RDD itself, and the second parameter is the Schema. createDataFrame requires the type of the RDD to be RDD[Row], where Row is org.apache.spark.sql.Row. Therefore, for an RDD of type RDD[(String, Int)], we need to convert it to RDD[Row].

import org.apache.spark.sql.Row
val rowRDD: RDD[Row] = rdd.map(fields => Row(fields._1, fields._2))

After tidying up the “hairstyle”, we can then call createDataFrame to create a DataFrame, as shown below.

import org.apache.spark.sql.DataFrame
val dataFrame: DataFrame = spark.createDataFrame(rowRDD, schema)

After creating the DataFrame, don’t forget to verify its usability. We can use the show method to do some simple data exploration and verify if the DataFrame creation is successful.

dataFrame.show

/** Output
+----+---+
|name|age|
+----+---+
| Bob| 14|
|Alice| 18|
+----+---+
*/

After all the efforts, we first created an RDD using a data structure on the Driver side, and then called createDataFrame to convert the RDD into a DataFrame. You might say, “Creating a DataFrame using createDataFrame seems more complicated than using parallelize to create an RDD. Is there a simpler way?” Let’s continue reading.

toDF method #

In fact, to convert an RDD into a DataFrame, we don’t necessarily have to create the Schema “hat” ourselves. We can directly use the toDF method after the RDD. Let’s first look at the usage of the toDF function, and then analyze how spark.implicits helps us easily create a DataFrame.

import spark.implicits._
val dataFrame: DataFrame = rdd.toDF
dataFrame.printSchema
/** Output
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)
*/

As you can see, we imported all the methods in the spark.implicits package and then easily created a DataFrame by calling toDF on the RDD. In fact, using spark.implicits, we can even skip the step of creating an RDD and directly create a DataFrame using the seq list.

import spark.implicits._
val dataFrame: DataFrame = seq.toDF
dataFrame.printSchema
/** Output
root
 |-- _1: string (nullable = true)
 |-- _2: integer (nullable = false)
*/

Do you feel that this method is concise and easy? However, you may ask, “Since there is a shortcut toDF, why bother learning the cumbersome steps of the createDataFrame method in the first place?”

There is a saying popular on the internet: “Your life is good because someone is carrying the weight for you.” The same applies to toDF. The reason why we can easily create a DataFrame using toDF is that the spark.implicits package provides various implicit methods.

Implicit methods are a special type of function in the Scala language. These functions do not need to be explicitly called by developers. The calculation logic in the function body will be triggered automatically at the appropriate time. It is these implicit methods that silently help us create an RDD using the seq list and then convert the RDD into a DataFrame using the createDataFrame method.

Creating DataFrame from File System #

After discussing the first type of data source, let’s take a look at how Spark creates a DataFrame from the file system.

Spark supports multiple file systems, including HDFS, Amazon S3, and local file systems, among others. However, regardless of the file system used, Spark always reads data and creates DataFrames through the read API of SparkSession. Therefore, we need to understand how to use the read API and what considerations need to be made.

The read API is provided by SparkSession, allowing developers to create DataFrames in a unified manner, as shown in the following diagram.

Image

As shown, to create a DataFrame using the read API, developers only need to call the read method of SparkSession and provide three types of parameters. These three types of parameters are the file format, load options, and file path, which are specified by the functions format, option, and load respectively.

Let’s first look at the first type of parameter, the file format. It refers to the storage format of the file, such as CSV (Comma Separated Values), Text, Parquet, ORC, and JSON. Spark SQL supports a rich set of file formats. In addition to the few examples mentioned here, Spark SQL also supports compressed files like Zip files and even image formats.

For the complete list of supported formats, you can refer to the following diagram or visit the official website. In the following explanations, we will select some commonly used data formats to demonstrate the specific usage of the read API.

Image

The file format determines the set of available load options, which means that different data formats have different options. For example, CSV files can use option(“header”, true) to indicate that the first row of the CSV file contains the column names, but other file formats do not have this option. We will explain the load options in detail when we discuss the usage of common file formats.

It’s worth noting that there can be zero or multiple load options, and when multiple options need to be specified, they can be implemented using “option(option1, value1).option(option2, value2)”.

The third type of parameter of the read API is the file path, which is the locator of the file in the file system. For example, in the local file system, it could be “/dataSources/wikiOfSpark.txt”, in HDFS it could be “hdfs://hostname:port/myFiles/userProfiles.csv”, and in Amazon S3 it could be “s3://myBucket/myProject/myFiles/results.parquet”, etc.

Now that we have understood the general usage of the read API, let’s proceed to illustrate it with some commonly used data formats. For file formats that are not covered in this lesson, you can refer to the official documentation for their usage.

Creating DataFrame from CSV #

CSV file format is commonly used in applications such as data exploration, data analysis, and machine learning due to its human-readable plain text format. As we analyzed earlier, the key to successfully creating a DataFrame from a CSV file lies in understanding and familiarizing oneself with the corresponding load options. Now let’s take a look at the load options associated with the CSV format and what they mean.

Image

Starting from the top, the first option is “header”. The value of “header” is a boolean, either true or false, which is used to specify whether the first row of the CSV file contains the column names. If it does, Spark SQL will use the column names from the first row to create the DataFrame, otherwise it will name each data column using “_c” followed by a number, such as “_c0”, “_c1”, etc.

For each loaded data column, regardless of its actual meaning, Spark SQL treats it as a string type. For example, in the following CSV file, both the “name” and “age” fields are treated as string types by Spark SQL.

name,age
alice,18
bob,14


import org.apache.spark.sql.DataFrame
val csvFilePath: String = _
val df: DataFrame = spark.read.format("csv").option("header", true).load(csvFilePath)
// df: org.apache.spark.sql.DataFrame = [name: string, age: string]
df.show
/** Result:
+-----+---+
| name| age|
+-----+---+
| alice| 18|
| bob| 14|
+-----+---+
*/

If we want to explicitly specify the data types for each column of the DataFrame during the loading process, we need to define the Data Schema explicitly and pass it to Spark SQL through the schema method in the read API. We have already mentioned how to define a Data Schema when discussing the createDataFrame function, so let’s review it together.

Define the Schema:

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
val schema:StructType = StructType( Array(
StructField("name", StringType),
StructField("age", IntegerType)
))

Pass the Data Schema through the schema method:

val csvFilePath: String = _
val df: DataFrame = spark.read.format("csv").schema(schema).option("header", true).load(csvFilePath)
// df: org.apache.spark.sql.DataFrame = [name: string, age: int]

As you can see, after clearly defining the Data Schema using the schema method, the type of DataFrame created after loading the data changed from “[name: string, age: string]” to “[name: string, age: int]”. It is important to note that not all file formats require the schema method to specify the Data Schema. Therefore, the schema method is not a mandatory step in the general usage of the read API.

Image

Okay, let’s continue discussing the load options for the CSV format. After “header”, the second option is “sep”, which is used to specify the separator used to separate column data. It can be any string and the default value is a comma. Other common separators include Tab, “|”, etc.

The “escape” and “nullValue” options are used to specify the escape character and null value in the file, respectively. The “dateFormat” option is used to specify the date format. Its value can be any string that can be converted to a Java SimpleDateFormat type, and the default value is “yyyy-MM-dd”.

The last option is “mode”, which is used to specify the read mode of the file, or more precisely, how Spark SQL should handle “dirty data” in the CSV file. The so-called dirty data refers to data records where the data values ​​do not match the expected data type. For example, in a CSV file, there is a column named “age” used to record the user’s age, and its data type is integer (Int). Obviously, the age column data cannot contain decimals like “8.5” or strings like “8 years old”. The “8.5” or “8 years old” is what we often refer to as dirty data.

Without calling the schema method to explicitly specify the Data Schema, Spark SQL treats all data columns as String types. We can see that the use of the mode option is often closely related to the invocation of the schema method.

The “mode” option supports 3 values, namely permissive, dropMalformed, and failFast, which are explained in the following table.

Mode Description
permissive When encountering a malformed record, sets all fields to null.
dropMalformed Ignores the whole corrupted records.
failFast Throws an exception when encountering a malformed record. Stops reading the data.

It can be seen that except for the “failFast” mode, the other two modes do not affect the creation of DataFrame. Taking the following CSV file as an example, in order to eliminate dirty data, i.e., the record “cassie, six”, and load the “clean” data that meets the type requirements, we need to combine the schema method with the mode option.

CSV file content:

name,age
alice,18
bob,14
cassie, six

Call the schema method to pass the Data Schema:

val csvFilePath: String = _
val df: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", true)
.option("mode", "dropMalformed")
.load(csvFilePath)
// df: org.apache.spark.sql.DataFrame = [name: string, age: int]
df.show
/** Result printout
+-----+---+
| name| age|
+-----+---+
| alice| 18|
| bob| 14|
+-----+---+
*/

Okay, we have finished talking about creating DataFrame from CSV files. It is not difficult to see that creating DataFrame from CSV files is relatively cumbersome, and there are many details that developers need to pay attention to. However, after all, CSV is simple, cross-platform, readable, and widely used, so it is still very worthwhile to master these development skills.

Creating DataFrame from Parquet / ORC #

Next, let’s talk about the Parquet format and ORC format. Compared to creating DataFrame from CSV, these two methods are not that difficult.

Parquet and ORC are widely used columnar storage file formats. As the name suggests, columnar storage is in contrast to row-based storage.

In traditional row-based file formats, data records are stored on a row-by-row basis. Although this is very intuitive for humans, in terms of data retrieval and scanning, row-based data is often inefficient. For example, in data warehouse scenarios such as data exploration and analysis, we often only need to scan certain fields of data records. However, in row-based storage, we must scan the full data to complete field filtering.

CSV is a typical row-based data format. Using the example content below, if we want to count the number of girls in the file, we have to scan each row of data, check the value of the gender field, and then decide whether to include the current record in the count.

CSV file content:

name,age,gender
alice,18,female
bob,14,male

Columnar storage is different. It stores data on a column-by-column basis, and each column has its own file or file block. Taking the previous file content as an example, if we use columnar storage, the storage format of the file will look like this:

Image

As we can see, the data is stored column by column. To count the number of girls, we only need to scan the data file of the gender column, without scanning the data file of the name and age fields. Compared to row-based storage, columnar storage is advantageous in greatly reducing the number of files required for data scanning.

Moreover, for each columnar file or file block, columnar formats often attach data structures such as headers and footers to record statistical information about the column data, such as maximum value, minimum value, and record count. These statistics further help improve data access efficiency. For example, for a gender file where max=“male” and min=“male”, when counting the number of girls, we can completely skip such a file and not scan it.

Furthermore, many columnar formats often record Data Schema in the files themselves, such as Parquet and ORC. They use the Meta Data data structure to record the data schema of the stored data. In this way, when reading columnar files, we don’t need to manually specify the Data Schema like reading CSV files, and these cumbersome steps can be omitted. Therefore, using the read API to read Parquet or ORC files becomes very easy, as shown below.

Using the read API to read Parquet files:

val parquetFilePath: String = _
val df: DataFrame = spark.read.format("parquet").load(parquetFilePath)

Using the read API to read ORC files:

val orcFilePath: String = _
val df: DataFrame = spark.read.format("orc").load(orcFilePath)

As you can see, in the usage of the read API, we don’t even need to specify any options, as long as there are the format and load steps. Isn’t it very simple?

Okay, that’s it. We have sorted out how to create a DataFrame from the file system in different data formats. At the end of this lesson, let’s briefly understand how to create a DataFrame from a relational database, after all, this scenario is quite common in our daily development.

Creating DataFrame from RDBMS #

Using the read API to read from a database is as simple as connecting to a database using the command line. When connecting to a database via the command line, we usually need to specify key information such as the database driver, database address, username, and password through parameters. The read API works in the same way, except that these parameters are specified through the option parameter. Taking MySQL as an example, the usage of the read API is as follows:

Connect to the database and create a DataFrame using the read API:

spark.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://hostname:port/mysql")
.option("user", "username")
.option("password","password")
.option("numPartitions", 20)
.option("dbtable", "table_name ")
.load()

When accessing the database, we also need to use the format method to specify the “data source format”, where the keyword is “jdbc”. Please note that since the database URL is specified through the option, the load method no longer requires “file path” to be passed in. Instead, we focus on setting the option parameters.

Just like the command line, the option parameters also require the driver, URL, user, and password parameters to specify the general database connection settings. However, since the purpose of calling the read API is to create a DataFrame, we also need to specify the “dbtable” option to determine which table to access.

Interestingly, in addition to assigning a table name to “dbtable”, we can also assign any SQL query to this option. This allows us to complete data filtering and improve access efficiency during the data loading process. For example, if we want to select all female data from the users table and then create a DataFrame on it:

val sqlQuery: String = "select * from users where gender = 'female'"
spark.read.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://hostname:port/mysql")
.option("user", "username")
.option("password","password")
.option("numPartitions", 20)
.option("dbtable", sqlQuery)
.load()

In addition, to improve subsequent parallel processing efficiency, we can control the parallelism of the DataFrame, i.e., the number of partitions, through the “numPartitions” option.

It is worth noting that by default, the Spark installation directory does not provide any JAR files related to database connections. Therefore, for databases such as MySQL, PostgreSQL, Oracle, and DB2, we need to manually copy the relevant JAR files to the “Jars” folder under the Spark installation directory. At the same time, we also need to use the following command line parameters in the spark-shell command or spark-submit to tell Spark the access address of the related JAR files:

  • --driver-class-path mysql-connector-java-version.jar
  • --jars mysql-connector-java-version.jar

Alright, that’s it for this lecture! Today’s content is a bit extensive, so let’s summarize together.

Key Review #

In today’s lecture, we focus on the ways to create a DataFrame. Spark supports a variety of data sources and formats, and today our emphasis is on creating a DataFrame through the driver, file system, and relational databases.

On the driver side, we can use the createDataFrame method to create a DataFrame. It is important to note that there are two prerequisites for this method. First, the underlying RDD must have a type of RDD[Row]. Second, we need to manually create the data schema. The creation of the schema requires the use of data types such as StructType, StructField, etc. Remember this well.

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
val schema: StructType = StructType(Array(
  StructField("name", StringType),
  StructField("age", IntegerType)
))

In addition to this more cumbersome method, we can also use the implicit methods provided by spark.implicits._ to easily create a DataFrame by calling the toDF method on an RDD or a raw sequence data.

Next, using the read API of the SparkSession, we explained the general methods of creating a DataFrame from CSV, Parquet, ORC, and relational databases. Familiarize yourself with the general methods of the read API.

Image

Since columnar storage formats like Parquet and ORC have Data Schema built into the file, only the format and load methods are required to access these file formats.

In comparison, reading CSV files is more complex. First, to specify the Data Schema, developers need to additionally use the schema method to input the predefined data schema. Furthermore, there are many options for CSV, you can refer to the table below to have better control over the loading process of CSV data.

Image

Finally, we learned the general methods of the read API to access RDBMS. Similar to the command line access method, you need to specify the required access parameters for the database connection through multiple option options, such as database driver, URL address, username, password, and so on. Specifically, you can also specify the table name or query statement for the dbtable option to intervene and control the data loading process.

Practice for Each Lesson #

Given the following CSV file, please compare the differences between the DataFrames created by the read API using the permissive, dropMalformed, and failFast modes.

name,age
alice,18
bob,14
cassie, six

Feel free to communicate with me in the comment section and I encourage you to share this lesson with more colleagues and friends to learn and improve together.