13 Spark SQL Starting With Analysis of Minor Vehicle License Plates

13 Spark SQL - Starting with Analysis of Minor Vehicle License Plates #

Hello, I’m Wu Lei.

In the introductory words, we proposed the “three steps to getting started with Spark”. Up until now, we have worked together to complete the first two steps. Congratulations on reaching this point! With a solid understanding of Spark’s common operators and core principles, you are already capable of handling most data processing needs with ease.

However, data processing is just one of the fundamental data application scenarios. Similar to different driving scenarios in car racing, in order to become a seasoned driver in Spark, we need to take the third step - learning Spark’s computational sub-framework. Only by completing this step can we master the conventional development methods of Spark SQL, Structured Streaming, and Spark MLlib, and be proficient in handling different data application scenarios such as data analysis, stream computation, and machine learning, among others.

Image

With so many sub-frameworks, where should we start? Among all the sub-frameworks, Spark SQL has the largest codebase, the highest level of investment from the Spark community, the widest range of applications, and the most far-reaching impact. When it comes to learning the sub-frameworks, it is natural for us to start with Spark SQL.

Today, we will start with an example to familiarize you with the thinking process and implementation steps of data analysis development. With an intuitive experience of Spark SQL, in the following lectures we will go deeper into the usage, features, and advantages of Spark SQL, gradually enabling you to master the full scope of Spark SQL.

Business Requirement #

Today, we are going to discuss a small example from the Beijing license plate lottery for cars. As we know, in order to limit the number of motor vehicles, the Beijing government introduced the car license plate lottery policy starting from 2011. As the lottery process progressed, in 2016, in order to accommodate those “potential drivers” who had not been assigned a license plate number for a long time, the lottery policy introduced the “multiplier” system.

The so-called multiplier system refers to assigning different multiplier coefficients to each individual based on their participation in the lottery. With the multiplier, the probability of winning for everyone changes from the original uniform base probability to “base probability * multiplier coefficient”. The more times a person participates in the lottery, the larger their multiplier coefficient becomes, and the corresponding probability of winning also increases.

However, many “potential drivers” around me always say that the multiplier system is actually useless. They have tried multipliers of 8 or 10 times, but still couldn’t win a license plate! So, in today’s discussion, we will take the opportunity to learn Spark SQL and use data to quantitatively analyze whether there is any relationship between the multiplier and the probability of winning for these “experienced drivers” who have not yet touched a car.

Preparation #

No rice, no meal. Since we are doing data analysis, we need to have data first. I have prepared the car license plate lottery data for Beijing city from 2011 to 2019 for you. You can download it from this link on Baidu Cloud, with the extraction code “ajs6”.

The file name of this data is “2011-2019 Car License Plate Lottery Data.tar.gz”. After decompression, the directory structure is as shown in the figure below.

You can see that there are two sub-directories, “apply” and “lucky”, in the root directory. The “apply” directory contains the application numbers of the lottery participants for different batches from 2011 to 2019, while the “lucky” directory contains the application numbers of the lottery winners. For the sake of convenience, let’s call the people who participated in the lottery “applicants” and the people who won the lottery “winners”. The next level sub-directories of “apply” and “lucky” are the lottery batches, and the lottery batch directories contain Parquet format data files.

Image

After downloading and decompressing the data, next, let’s prepare the runtime environment.

Our small example is relatively lightweight, and the Scala version of the code implementation will not exceed 20 lines. Moreover, the lottery data is small in size, and the total size of the decompressed Parquet files is no more than 4 GB.

Choosing such an example is also to make it easy for you to experiment without being limited by hardware. To run the application for analyzing the multiplier, you can start the local spark-shell environment on your laptop or PC. However, if conditions permit, I still encourage you to build a distributed physical cluster. For details on building a distributed cluster, you can refer to [Chapter 4] (link in Chinese).

Alright, after preparing the data and runtime environment, next, we can proceed to the main topic and develop the data analysis application to explore the relationship between the multiplier and the winning rate.

Data Exploration #

However, before diving into data analysis, let’s first have a basic understanding of the data schema. This refers to the fields present in the source data, their types, and their respective meanings. This step is commonly referred to as data exploration.

The approach for data exploration is as follows: first, we use the read API of SparkSession to read the source data and create a DataFrame. Then, by calling the show method on the DataFrame, we can easily obtain a sample of the source data, thereby completing the preliminary exploration of the data. The code below demonstrates this process.

import org.apache.spark.sql.DataFrame

val rootPath: String = _
// Applicant data
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark is the default SparkSession instance in spark-shell
// Read the source file using the read API
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)
// Print the data
applyNumbersDF.show

// Lucky winners data
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// Read the source file using the read API
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)
// Print the data
luckyDogsDF.show

At this point, you might be confused: “SparkSession? DataFrame? These concepts haven’t been mentioned at all!” Don’t worry, we will cover these key concepts gradually in the subsequent lessons. Today, let’s understand “what they are” first, and we will discuss the “why they are” later.

You can think of SparkSession as an advanced version of SparkContext, which serves as the new generation entry point for Spark development starting from Spark 2.0. SparkContext converts the source data into RDD using the textFile API, while SparkSession converts the source data into DataFrame using the read API.

DataFrame, on the other hand, can be seen as a special type of RDD. We are already familiar with RDD, and now let’s compare DataFrame with RDD to provide you with an initial understanding of DataFrame.

In terms of functionality, just like RDD, DataFrame is used to encapsulate distributed datasets. It also has the concept of data partitioning and achieves transformations between different DataFrames using operators. However, DataFrame adopts a unique set of independent operators that are different from RDD operators.

In terms of data content, unlike RDD, DataFrame is a distributed dataset with a Schema. Therefore, you can simply think of DataFrame as a two-dimensional table in a database.

Lastly, the underlying computation engine for DataFrame is Spark SQL, while the computation engine for RDD is Spark Core. This difference is crucial. However, we will delve into the differences between these computation engines in the next lesson.

Alright, back to the topic. After understanding the concepts of SparkSession and DataFrame, let’s continue with data exploration.

After running the above code in the spark-shell and calling the show function on both applyNumbersDF and luckyDogsDF DataFrames, we can obtain the sample data. It can be observed that the schema of these “two tables” is the same, both containing two fields: carNum, which is of type String, and batchNum, which is of type Int.

Image

Among them, carNum represents the application number or winning number, while batchNum indicates the batch number of the lottery draw. For example, 201906 represents the last batch of lottery draws in 2019, and 201401 represents the first lottery draw in 2014.

With this information, we have completed the initial data exploration.

Implementing Business Requirements #

After completing the initial data exploration, we can now combine the characteristics of the data (such as the fact that the schemas of the two tables are identical, but the data content differs) to implement the initial business requirements: quantifying the relationship between the lottery winning rate and the multiplier.

Firstly, since we want to quantify the relationship between the lottery winning rate and the multiplier, we only need to focus on the changes in multiplier for those who won the lottery (the data in the “lucky” directory). To calculate the multiplier, we need to use the lottery data from the “apply” directory. Therefore, in order to only focus on the multiplier for those who won the lottery, we must use data association, which is the most common operation in the field of data analysis. Furthermore, since the multiplier system was only introduced in 2016, we only need to access data from 2016 onwards.

Based on these analyses, let’s write the code for data filtering and data association, as shown below.

// Filter the lottery data from 2016 onwards, and only extract the "carNum" field
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")

// Perform inner join between the lottery data and the application data, using the "carNum" field as the join key
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")

In the above code, we use the filter operator to filter luckyDogsDF, and then use the select operator to extract the “carNum” field.

Next, we call the join operator on applyNumbersDF to complete the data association between the two DataFrames. The join operator has three parameters, which you can understand by referring to line 5 of the previous code. The first parameter specifies the DataFrame that needs to be associated, the second parameter represents the join key (i.e., the fields based on which the association is done), and the third parameter specifies the type of association, such as inner for inner join, left for left join, and so on.

After performing the data association, let’s now discuss how to compute the multiplier. The official implementation of the multiplier is somewhat crude. If you observe the files in the apply directory after 2016, you will find that the so-called multiplier is actually the number of copies of the application number.

For example, if my multiplier is 8, my application number will appear 8 times in the lottery files of various batches. Isn’t it crude? Therefore, to compute the multiplier for a certain application number, we only need to count the number of times it appears in the batch files.

Does counting by batch and application number feel familiar? Yes, it’s the Word Count we learned before. Essentially, it’s a grouping and counting process. However, this time, we won’t be using the reduceByKey RDD operator. Instead, we’ll use the set of operators provided by DataFrame. Let’s take a look at the code.

val multipliers: DataFrame = jointDF.groupBy(col("batchNum"),col("carNum"))
.agg(count(lit(1)).alias("multiplier"))

Grouping and Counting

Let’s analyze the code. We first use the groupBy operator to group by the lottery batch and application number, and then use the agg and count operators to count the number of occurrences of (batchNum, carNum) as the multiplier for carNum in the lottery batch batchNum. We use the alias operator to rename the multiplier as “multiplier”.

I understand this might be a bit complicated, so let’s use the show function on multipliers to observe the result of this calculation in a straightforward manner. To illustrate, I will use a table format.

Image As we can see, the multiplier for the same application number is different in different batches. As we mentioned before, as the number of lotteries increases, the multiplier also increases. However, what we are studying here is the relationship between the multiplier and the winning rate, so we only need to focus on the multiplier at which the winners are selected. Therefore, for the same application number, we only need to keep the maximum multiplier.

It should be noted that taking the maximum multiplier will reduce the statistical base of the multiplier and introduce survivorship bias. A more rigorous approach would be to include the past multipliers of the winners in the statistics, so that the base of the multiplier is accurate. However, combining experiments, survivorship bias does not affect the conclusion that “whether there is a direct relationship between the multiplier and the winning rate”. Therefore, we may as well adopt the simpler approach of taking the maximum multiplier. After all, our primary goal is to learn Spark SQL.

For this purpose, we need to “eliminate” the dimension of batchNum, group the multipliers by carNum, and extract the maximum value of the multiplier, as shown in the code below:

val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum")
.agg(max("multiplier").alias("multiplier"))

The method of grouping and aggregating is similar to before. We still use groupBy to group the data, but this time we only group by the carNum field, and then use agg and max operators to keep the maximum value of the multiplier. After this calculation, we get the multiplier coefficient for each application number before being selected:

Image

As we can see, the uniqueMultipliers DataFrame only contains the carNum and multiplier fields, and the carNum field has no duplicate values. In other words, in this dataset, an application number corresponds to only one maximum multiplier.

Alright, up to this point, we have obtained the multiplier coefficients for each person before being selected. Next, with this data, we can calculate the distribution of the multipliers themselves.

Specifically, what we want to know is the distribution of the number of people under different multipliers. In other words, this time, we want to group the data according to the multipliers and then calculate the statistical count under different multipliers. Needless to say, we still rely on the groupBy and agg operators. The code is as follows:

val result: DataFrame = uniqueMultipliers.groupBy("multiplier")
.agg(count(lit(1)).alias("cnt"))
.orderBy("multiplier")

result.collect

In the final step, we still use the groupBy and agg operators as before to obtain the count distribution under different multipliers. We collect the calculation results using the collect operator, and trigger the execution of all the above code from start to finish.

The result contains two fields: the multiplier and the count of people holding that multiplier. If we turn the result into a bar chart, we can more intuitively observe the relationship between the winning rate and the multiplier, as shown in the figure below:

Image

It is not difficult to find that the number of winners under different multipliers follows a normal distribution. In other words, for an applicant, the probability of winning does not increase linearly with the multiplier. As those “experienced drivers” around us say, winning the lottery is not really related to the multiplier.

Key Review #

In today’s session, we worked together to develop a data analysis application called “Distribution of Multipliers” and answered the question of whether there is a correlation between the successful lottery rate and the multipliers.

Although we encountered some new concepts and operators during the implementation, there is no need to worry or hurry. In today’s lesson, you only need to have a general understanding of application development under the Spark SQL framework.

In the development framework of Spark SQL, we typically create DataFrames from source data through the read API of SparkSession. Then, starting from the DataFrame, we use various transformation operators such as agg, groupBy, select, filter, and so on to transform the DataFrame and complete the corresponding data analysis.

For the convenience of subsequent experiments, I have organized the code snippets involved today. You can run them in the spark-shell to observe the calculation results of each step and understand the relationship between the calculation logic of different operators and their execution results. Good luck!

import org.apache.spark.sql.DataFrame

val rootPath: String = _

// Applicant data
val hdfs_path_apply: String = s"${rootPath}/apply"
// spark is the default SparkSession instance in spark-shell
// Read source file using the read API
val applyNumbersDF: DataFrame = spark.read.parquet(hdfs_path_apply)

// Lucky winners data
val hdfs_path_lucky: String = s"${rootPath}/lucky"
// Read source file using the read API
val luckyDogsDF: DataFrame = spark.read.parquet(hdfs_path_lucky)

// Filter lucky winners data after 2016 and only extract the carNum field
val filteredLuckyDogs: DataFrame = luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum")

// Perform an inner join between lucky winners data and lottery data using the carNum field as the join key
val jointDF: DataFrame = applyNumbersDF.join(filteredLuckyDogs, Seq("carNum"), "inner")

// Group by batchNum and carNum, and calculate the multiplier coefficient
val multipliers: DataFrame = jointDF.groupBy(col("batchNum"), col("carNum"))
  .agg(count(lit(1)).alias("multiplier"))

// Group by carNum and keep the maximum multiplier coefficient
val uniqueMultipliers: DataFrame = multipliers.groupBy("carNum")
  .agg(max("multiplier").alias("multiplier"))

// Group by multiplier and calculate the count
val result: DataFrame = uniqueMultipliers.groupBy("multiplier")
  .agg(count(lit(1)).alias("cnt"))
  .orderBy("multiplier")

result.collect

Exercise for Each Lesson #

  1. Brainstorming: In your opinion, how should the lottery rate system for car license plates be designed to be the most reasonable?

  2. Please run the code in your Spark environment and confirm whether the execution results match the result.

Feel free to communicate and interact with me in the comment section. It is also recommended that you share the content of this lesson with more friends and colleagues. See you in the next lesson!