01 the Necessity of Performance Tuning Why Adjust When Spark Itself Is Fast

01 The Necessity of Performance Tuning - Why Adjust When Spark Itself is Fast #

Hello, I am Wu Lei.

In my day-to-day development work, I have noticed a common phenomenon. Many developers believe that Spark’s execution performance is already very strong, and in practical work, as long as they implement the business logic step by step, there is no need to perform performance tuning.

Do you also think so? Indeed, Spark’s core competitiveness lies in its execution performance, which is mainly due to its in-memory computing mode and Tungsten project, as well as the focus and efforts on Spark SQL.

However, is it true that developers are all set as long as they implement the business logic? Let’s not rush to conclusions. First, let’s look at two common examples in daily development, and then we can answer this question.

In data application scenarios, ETL (Extract Transform Load) often takes the lead, as the source data needs to be extracted and transformed before it can be used for exploration and analysis, or for feeding machine learning algorithms for model training, in order to uncover the deep value of the data. The two examples we are going to discuss today are both from typical ETL end-to-end jobs, involving common operations and computational tasks.

Development Case 1: Data Extraction #

The first example is very simple: given a list of data entries, extract specific fields from them. This kind of data processing requirement is quite common in ETL jobs. To implement this requirement, we need to define a function called extractFields. The input parameter of this function is of type Seq[Row], which represents a sequence of data entries. The return type of the function is Seq[(String, Int)], which represents a sequence of tuples of type (String, Int). The logic of the function is to extract the string at index 2 and the integer at index 4 from each data entry.

This business requirement is simple and straightforward, and implementing it is a piece of cake. In actual development, I have observed that many developers quickly implement it using the following approach. It is concise, the code is written quickly, and it works fine. Unit tests and functional tests all pass.

// Implementation option 1 -- Anti-pattern
val extractFields: Seq[Row] => Seq[(String, Int)] = {
  (rows: Seq[Row]) => {
    var fields = Seq[(String, Int)]()
    rows.map(row => {
        fields = fields :+ (row.getString(2), row.getInt(4))
    })
  fields
  }
}

At first glance, this function seems to be fine. The special thing about it is that although this data extraction function is very small and only a small part of a complex ETL application, it will be called frequently in different places throughout the entire ETL job. If I deploy the entire ETL application based on this code, I will find that the end-to-end execution efficiency of the ETL job is very poor, taking two hours to complete in a distributed environment. Such speed can be quite frustrating.

To make the ETL job run faster, we naturally need to improve its performance. But where should we start? Since the extractFields function will be called frequently, it’s a good idea to start from there and see if we can optimize it. After re-evaluating the type of the extractFields function, we can easily see that it is essentially a conversion from Seq[Row] to Seq[(String, Int)]. The core logic of the function is field extraction. As long as we can obtain Seq[(String, Int)] from Seq[Row], the goal is achieved.

To achieve this conversion between the two data types, we can use the functional programming paradigm in addition to the procedural programming, which developers often resort to without thinking. One of the principles of functional programming is to avoid side effects as much as possible. Side effects refer to modifications and changes to the state within a function. For example, in the previous example, the constant appending of tuples to the fields variable is a side effect.

Based on this idea, we have the second implementation option, as shown below. The biggest difference compared to the first implementation is the removal of the fields variable. Instead, to achieve the same result, we directly call the map operation on the input parameter Seq[Row] to extract the specific fields one by one and construct tuples. Finally, we use toSeq to convert the map to a sequence. This approach is clean, concise, and straightforward.

// Implementation option 2 -- Preferred
val extractFields: Seq[Row] => Seq[(String, Int)] = {
  (rows: Seq[Row]) =>
    rows.map(row => (row.getString(2), row.getInt(4))).toSeq
}

You may ask: “The only difference between the two code samples is the presence of an intermediate variable. How much difference can that make? It just looks like the code is more concise.” In fact, when I deployed the ETL job based on the second code sample, I was amazed to find that the end-to-end execution performance was doubled! It took only one hour instead of the original two hours in a distributed environment. Two code samples with the exact same functionality have such a significant difference in execution performance. Therefore, as you can see, in everyday development work, it is not enough to focus solely on implementing business functionality. We should not overlook any small parts that can be optimized.

Development Case 2: Data Filtering and Aggregation #

You might say, “This example is just an example, right? Moreover, the optimization in this example seems to only involve adjusting the programming paradigm, and it doesn’t seem to have much to do with Spark!” Don’t worry, let’s look at the second example. The second example will be a bit more complex, so let’s first clarify the business requirements and data relationships.

/**
(startDate, endDate)
e.g. ("2021-01-01", "2021-01-31")
*/
val pairDF: DataFrame = _

/**
(dim1, dim2, dim3, eventDate, value)
e.g. ("X", "Y", "Z", "2021-01-15", 12)
*/
val factDF: DataFrame = _

// Storage root path
val rootPath: String = _ 

In this case, we have two datasets, pairDF and factDF, both of which are DataFrames. The first dataset pairDF has a schema with two fields, namely the start date and end date. The second dataset has more fields, but the main ones are the Event date and the statistic value of interest in the business, called “Value”. Other dimensions such as dim1, dim2, and dim3 are mainly used for data grouping, and their specific meanings are not important. In terms of data volume, pairDF has a small amount of data, with approximately a few hundred records, while factDF has a large amount of data, with tens of millions of rows.

For these two datasets, the specific business requirements can be divided into 3 steps:

  1. For each time range in pairDF, filter out the data entries in factDF where the Event date falls within that time range.
  2. Group factDF by dimensions dim1, dim2, dim3, and Event date, and then summarize the business statistic Value.
  3. Save the final statistical result to Amazon S3.

For such business requirements, many students implemented the following steps in a routine manner. Next, I will explain the calculation logic using specific code.

// Implementation option 1 —— Anti-pattern
def createInstance(factDF: DataFrame, startDate: String, endDate: String): DataFrame = {
val instanceDF = factDF
.filter(col("eventDate") > lit(startDate) && col("eventDate") <= lit(endDate))
.groupBy("dim1", "dim2", "dim3", "event_date")
.agg(sum("value") as "sum_value")
```scala
instanceDF
}

pairDF.collect.foreach{
case (startDate: String, endDate: String) =>
val instance = createInstance(factDF, startDate, endDate)
val outPath = s"${rootPath}/endDate=${endDate}/startDate=${startDate}"
instance.write.parquet(outPath)
}

First, they define the function createInstance with factDF, start date, and end date as parameters. In the function body, it filters factDF based on the event date, then groups and summarizes the data by four dimensions, and finally returns the summary result. After defining the createInstance function, it collects pairDF to the driver and iterates through each time pair. For each pair, it calls the createInstance function with factDF, start date, and end date as arguments to obtain the filtered summary result. Finally, it writes the result to disk in Parquet format.

From a functional perspective, there is no problem with this code, and the data processing logic seems to match the expected results. However, the performance of the overall execution is very poor. In a distributed environment with 16 AWS EC2 C5.4xlarge instances, based on the above code, the ETL job took half an hour to complete.

Without comparison, there is no harm. Using the second implementation below, which is identical to the first one in terms of business functionality and calculation logic, the ETL job can be completed within 15 minutes with only 2 instances of the same EC2 machine type. The difference in performance between these two versions of the code is huge.

//Implementation 2 - Positive example
val instances = factDF
.join(pairDF, factDF("eventDate") > pairDF("startDate") && factDF("eventDate") <= pairDF("endDate"))
.groupBy("dim1", "dim2", "dim3", "eventDate", "startDate", "endDate")
.agg(sum("value") as "sum_value")

instances.write.partitionBy("endDate", "startDate").parquet(rootPath)

Now the question is, what is the difference between these two versions of the code and what causes such a huge difference in execution performance? Let’s first review the first implementation and see if we can identify any problematic code smells.

As we know, there are two types of Actions in Spark that trigger lazy evaluation: one is operations that directly write distributed computing results to disk, such as DataFrame’s write, RDD’s saveAsTextFile, etc.; the other is operations that collect distributed results to the driver, such as first, take, collect.

Clearly, for the second type of operations, the driver can become a bottleneck, especially when using the collect operation to collect large result sets, which can easily lead to performance issues. Therefore, in the first implementation, we can easily identify the collect operation here, which has a bad smell.

Although the smell of collect is not good, in our scenario, pairDF is a small dataset with only a few hundred records, so it is not a big problem to collect it to the driver.

The most deadly part is the foreach after collect. Keep in mind that factDF is a large distributed dataset. Although the logic of createInstance is to filter, summarize, and write factDF, createInstance will be called several hundred times in the foreach loop. The number of times createInstance is called depends on the number of time pairs in pairDF. In Spark’s DAG (Directed Acyclic Graph), without caching, triggering an Action will cause the entire DAG to be re-executed from start to finish.

After understanding this point, let’s take a closer look at this code. If we focus on foreach and factDF in createInstance, we will be surprised to find that the factDF with tens of millions of rows is repeatedly scanned hundreds of times! And it is a full scan! Scary, isn’t it? Analyzing in this way reveals that the poor performance of the end-to-end execution efficiency of the ETL job is caused by this.

On the other hand, in the second implementation, factDF and pairDF are joined based on the inequality condition pairDF.startDate < factDF.eventDate <= pairDF.endDate. In Spark, the implementation of inequality join is Nested Loop Join. Although Nested Loop Join is the worst-performing join implementation compared to Merge Join, Hash Join, Broadcast Join, etc., the data association between factDF and pairDF only requires a full scan of the data once. This advantage alone gives a huge performance boost over the first implementation.

Summary #

Today, we analyzed two cases, both of which are from the ETL scenarios of data applications. The first case discusses the situation where the performance overhead introduced by a simple variable in a function is multiplied when the function is frequently called. The second case discusses how inappropriate implementation leads to repeated scanning of massive data hundreds or thousands of times.

Through the analysis and discussion of these two cases, we found that for Spark application development, it is not enough to just complete the implementation of business functions. Even though Spark has inherent high execution efficiency, it still requires performance tuning according to specific application scenarios and runtime environments.

The benefits of performance tuning are obvious: firstly, it can save costs, especially for on-demand cloud costs, as shorter execution time means less expenses; secondly, it can improve development iteration efficiency, especially for those engaged in data analysis, data science, and machine learning. Higher execution efficiency enables faster acquisition of data insights and quicker convergence to the optimal solution for model training. Therefore, you can see that performance tuning is not just icing on the cake, but a necessary skill for developers to master.

So, are you ready for performance tuning in Spark? Life is not just about muddling through the present. Let’s embark on a performance tuning journey that sets off when we say so. Come on! Get on the bus! Hold on tight, fasten your seatbelt, we’re about to set off!

Daily Practice #

  1. In your daily work, have you encountered any cases where the implementation of a function is consistent, but the performance varies greatly?
  2. In the second case we discussed today, do you think it is possible to further optimize the positive code?

Looking forward to seeing your sharing in the comments section. Also, feel free to write down your thoughts on development cases. See you in the next lesson!