08 Application Development Three Principles How to Expand Your Development Horizon

08 Application Development Three Principles - How to Expand Your Development Horizon #

Hello, I’m Wu Lei.

Starting today, we will enter the study of general performance optimization. In this article, we will learn some common optimization methods and techniques from four aspects: basic development principles, configuration settings, shuffle, and hardware resources. They will be applicable to all computing scenarios.

In today’s lecture, we will start from the perspective of application development and explore the fundamental principles that should be followed during the development phase. If a solid foundation is established in the development phase and preventive measures are taken, the execution performance of the business application will often have a good starting point. The development phase is like a student’s exam, although there are difficult questions to enhance the score, as long as we steady ourselves and answer the foundational questions that guarantee points, the results will not be too bad.

These foundational questions correspond to some “routine operations” in the work, such as Filter + Coalesce, using mapPartitions instead of map, and using ReduceByKey instead of GroupByKey, and so on. I believe that you have already accumulated a lot of these techniques in your daily development work. However, based on my observations, many students, after obtaining these techniques, tend to apply them without thinking. Many students have shared their feedback: “Why didn’t it work after trying? Well, I’ve tried everything that can be tried, and I really don’t have any other optimization ideas, so let’s just leave it like this.”

So, what should be done in this situation? I think the most important reason may be that the “routine operations” you have accumulated have not formed a system. Based on previous development experience, I have found that these “routine operations” can be classified into three categories:

  • Reaping the benefits without effort
  • Saving wherever possible and delaying whenever feasible
  • Breaking free from single-machine thinking

Without further ado, let’s have a good chat about this next.

Principle 1: Sit back and enjoy the benefits #

To see further, one must stand on the shoulders of giants. Therefore, before racking our brains to try various optimization techniques, we should make full use of the “performance dividend” provided by Spark, such as Project Tungsten, AQE, SQL functions, and so on. I call these principles “sit back and enjoy the benefits,” meaning that we can set the relevant configuration options or call the corresponding APIs to fully leverage the performance advantages brought by Spark itself.

So, what advantages can we all take advantage of?

How to leverage the advantages of Project Tungsten? #

First of all, we can leverage the “Project Tungsten” launched by Databricks in 2015. Its advantage is that it can improve the execution performance of Spark applications by optimizing the data model and algorithms. How is this achieved? Let’s start with its data structure.

In terms of data structure, Tungsten customizes a compact binary format. This data structure has several orders of magnitude higher storage efficiency compared to JVM object storage. In addition, since the data structure itself is in a compact binary form, it naturally avoids the computational overhead introduced by Java object serialization and deserialization.

Based on the customized binary data structure, Tungsten uses the Java Unsafe API to allocate off-heap memory to manage objects. Off-heap memory has two inherent advantages: it provides a more accurate estimate of memory usage, and it does not require garbage collection like the JVM heap.

Finally, at runtime, Tungsten replaces the volcano iterative model with whole-stage code generation, which can reduce virtual function calls, lower memory access frequency, improve CPU cache hit rate, greatly compress CPU idle time, and thereby increase CPU utilization.

Databricks official comparison experiments show that the execution performance of the application can be improved 16 times by enabling Tungsten! So you see, even if we do nothing, as long as the business application we develop can benefit from the various features provided by Tungsten, Spark can guarantee the execution performance of the application. As developers, why not take advantage of such a huge benefit?

How to leverage the advantages of AQE? #

In addition to Project Tungsten, the new feature introduced in Spark 3.0 version called AQE is what we should pay attention to. AQE (Adaptive Query Execution) can dynamically adjust the execution plan during the process of Spark SQL optimization.

We know that the optimization process of Spark SQL can be roughly divided into several steps such as syntax analysis, semantic parsing, logical planning, and physical planning. In versions before 3.0, Spark only optimizes the logical plan based on rules and strategies by traversing the AST query syntax tree during compilation. Once the best logical plan is selected based on the logical plan, Spark will strictly follow the steps of the physical plan to mechanically execute the computation.

With AQE, Spark can dynamically adjust the previous logical plan during different stages at runtime, combining with real-time runtime state. Then, based on the optimized logical plan, it reselects the optimal physical plan, thereby adjusting the execution mode of subsequent stages at runtime.

You may wonder, “It sounds impressive, but what specific improvements does AQE bring?” AQE brings three main improvements: automatic partition merging, data skew handling, and join strategy adjustment. Let’s take a closer look at each one.

First, automatic partition merging is easy to understand. Let’s take Filter and Coalesce as an example. After filtering a distributed dataset, some data partitions may have very little or even empty content. To avoid unnecessary scheduling overhead, we often use Coalesce to manually merge partitions.

In addition, there is also a need for partition merging during the shuffle process.

In the example above, the original data table has two partitions. After shuffle, five data partitions are generated during the Reduce phase. Due to uneven data distribution, three of the partitions have very little data. For the CPU, the scheduling overhead caused by these three small partitions would be a significant waste. Prior to Spark supporting AQE, developers had no control over this. Now, AQE automatically detects and merges these undersized partitions, relieving us of the need to worry about it.

Second is data skew, which is common in data analysis. If not handled properly, it can easily lead to OOM errors.

For example, let’s say we want to analyze the historical behaviors of each Weibo user. Whether it’s the number of posts or interaction frequency, regular users differ by several orders of magnitude from top users (celebrities, influencers, verified accounts, media). In this case, grouping and analyzing by user ID can lead to data skew. Moreover, the tasks executed within the same executor are generally allocated with average available memory. Therefore, with the mismatch between the average memory allocation and the data processing requirements that differ by several orders of magnitude, it’s no surprise that tasks with severe data skewing may face OOM errors.

In the past, when dealing with data skew, we often had to manually “salt” our applications by adding random prefixes to skewed keys to distribute data across different nodes. Now, in join scenarios, if AQE detects skewed data partitions in one table, it automatically salts the skewed partitions and duplicates the data in the other table. Before manually salting, developers also need to calculate the skewness of each key and determine the degree of salting. However, since the advent of AQE, this headache is now taken care of by AQE.

Lastly, adjusting join strategies is easy to understand. When two ordered tables need to be joined, Spark SQL in the optimization process often chooses the Sort Merge Join implementation. However, there are cases where one of the tables needs to be filtered before sorting, and the filtered table is small enough to fit into a broadcast variable. In such cases, Broadcast Join is more efficient than Sort Merge Join. However, the optimization process before version 3.0 was static and couldn’t dynamically switch join strategies.

In this case, AQE dynamically adjusts join strategies based on runtime statistics, changing the predetermined Sort Merge Join to Broadcast Join, thereby improving application performance.

After discussing all of this, how can we make the most of these natural advantages? First, to utilize the advantages of Tungsten, simply abandon the RDD API and use the DataFrame or Dataset API for development. It’s that simple, isn’t it?

However, AQE is disabled by default. If we want to fully utilize automatic partition merging, automatic data skew handling, and join strategy adjustment, we need to enable the related configuration settings, as shown in the table below.

In summary, with the Tungsten project and AQE, we can achieve low input and high output, which is essentially the core principle of obtaining results effortlessly. In addition, there are similar techniques, such as using SQL functions or feature transformation operators to replace UDFs, and so on. I hope you will actively explore and consolidate these ready-to-use techniques during the development process, and if you have any findings, I look forward to your sharing in the comments section.

Principle 2: Less is more, Delay when possible #

In many data processing scenarios, in order to quickly meet business requirements, I often use various combinations of operations such as data cleaning, filtering, extraction, association, and aggregation to complete development. The performance of these combinations of operations varies, some are good and some are bad. So how do we find a better implementation method in terms of performance?

At this time, we can use the second principle: “Less is more, Delay when possible”. “Less” refers to reducing the amount of data processing, because reducing the amount of data is equivalent to reducing the computational load, and a lower computational load naturally means faster processing speed. “Delay” refers to the Shuffle operation, because for conventional data processing, the later the calculation step, the smaller the amount of data to be processed. The Shuffle operation is performed later, and the amount of data to be written to disk and distributed is smaller, leading to lower disk and network overhead and higher execution efficiency.

To implement this, we can follow these 3 steps:

  • Try to push operations that can reduce the amount of data scanned and processed to the front.
  • Try to eliminate Shuffle and reduce the overhead of writing data to disk and distributing it.
  • If Shuffle cannot be eliminated, try to delay operations involving Shuffle as much as possible.

Next, let’s deepen our understanding of this principle through an example.

The business background of this example is very simple. We want to obtain two co-occurrence matrices: one is the item-user matrix, and the other is the item-user interest matrix. After obtaining these two matrices, we will try to calculate the latent vectors (also called implicit vectors) of the three features: item, user, and user interest, using matrix factorization. These latent vectors will ultimately be used to construct feature vectors for machine learning models.

Based on this business background, the code needs to read user access logs and then construct these two matrices. The access logs are stored in Parquet format files on a daily basis, and each record contains multiple fields such as user ID, item ID, user interest list, access time, user attributes, and item attributes. We need to read the log records, remove duplicates using distinct, then explode the interest list into individual interests, extract the relevant fields, and finally filter the records based on user access frequency and remove duplicates again, to obtain the desired co-occurrence matrix.

After receiving such a business requirement, how would you implement it? After reading it, student A immediately implemented the following code:

    val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
    val rootPath: String = _

    // Read log files, remove duplicates, and explode userInterestList
    def createDF(rootPath: String, date: String): DataFrame = {
        val path: String = rootPath + date
        val df = spark.read.parquet(path)
        .distinct
        .withColumn("userInterest", explode(col("userInterestList")))
        df
    }

    // Extract fields, filter, remove duplicates, and merge results from multiple days using union
    val distinctItems: DataFrame = dates.map{
        case date: String =>
        val df: DataFrame = createDF(rootPath, date)
        .select("userId", "itemId", "userInterest", "accessFreq")
.filter("accessFreq in ('High', 'Medium')")
.distinct
df
}.reduce(_ union _)

Let’s analyze this piece of code together. There are four main operations: distinct to remove duplicates, explode to expand the list, select to extract fields, and filter to filter log records. The last three operations are all in-memory calculations within the Stage, and only the distinct operation introduces a Shuffle, so we should focus on it. The distinct operation is called twice: once to remove duplicates after reading the log content, and another time to remove duplicates after obtaining the desired fields.

First, let’s focus on the first distinct operation: “distinct is immediately called to remove duplicates after reading the log records in the createDF function.” It is worth noting that the log records contain many fields, and the Shuffle operation introduced by distinct will trigger all data records and distribute all fields in the records in the network. However, what we ultimately need is the data records with a certain level of user stickiness, and we only need three fields: user ID, item ID, and user interest. Therefore, this distinct operation is actually distributing a large amount of data that we do not need in the cluster, which is undoubtedly a waste.

Next, let’s look at the second distinct operation: “remove duplicates after expanding, extracting, and filtering the data.” This time, the duplicates are different from the first distinct operation, and the data records distributed by the Shuffle operation involved in this operation do not contain any redundant records. The records only include the few fields required for the co-occurrence matrix.

At this point, we discovered that both distinct operations are for removing duplicates and have the same purpose, but the second distinct operation is more accurate and less expensive. Therefore, we can remove the first distinct operation.

By doing so, we eliminate a Shuffle operation that would introduce full data distribution, which naturally improves performance. However, after removing the first distinct operation, the following operation, explode, emerges. Although explode does not introduce a Shuffle, it still includes many fields such as user attributes and item attributes that we do not need when expanding the interest list in memory.

Therefore, we should push operations that can save data access, such as filtering and column pruning, as far forward as possible, and push operations with higher computational cost, like Shuffle, as far back as possible to reduce the overall data processing load and cost. Based on these analyses, we have an improved version of the code, as shown below.

val dates: List[String] = List("2020-01-01", "2020-01-02", "2020-01-03")
val rootPath: String = _

val filePaths: List[String] = dates.map(rootPath + _)

/**
一次性调度所有文件
先进行过滤和列剪枝
然后再展开userInterestList
最后统一去重
*/
val distinctItems = spark.read.parquet(filePaths: _*)
.filter("accessFreq in ('High', 'Medium'))")
.select("userId", "itemId", "userInterestList")
.withColumn("userInterest", explode(col("userInterestList")))
.select("userId", "itemId", "userInterest")
.distinct

In this code, all operations that can reduce data access, such as filter and select, are moved to the front, while the distinct operator that introduces Shuffle is moved to the back. After conducting experiments and comparison, the two versions of the code have a doubled difference in execution performance. Therefore, by following the development principle of “saving whenever possible, and postponing whenever possible,” you can often avoid many potential performance pitfalls.

Principle 3: Break Out of Single-Machine Thinking Mode #

So, if developers follow the above two principles to implement business logic, is everything going to be fine and worry-free? Of course not, let’s look at the following example.

In order to generate training samples, we need to join two large tables. Following the principle of “saving when possible, and shuffling when necessary”, we want to reduce the size of one of the tables and convert the Shuffle Join into a Broadcast Join, thus eliminating the shuffle process.

Although both tables are large, the Payload column in the right table only has one column, and all other columns are Join keys. Therefore, as long as we remove the Join keys, we can put the entire right table into a broadcast variable. However, directly removing the Join keys is not possible because the data association between the left and right tables is a necessary requirement. So, can we find another way to associate them?

Inspired by the working principle of Hash Join, we came up with the idea of concatenating all the Join keys together, and then using a hash algorithm to generate a fixed-length byte sequence as the new Join key. In this way, we can remove the original Join keys in the right table, greatly reduce the size of the right table, and make it small enough to fit into a broadcast variable. At the same time, the new Join key can also ensure that the data association between the left and right tables remains unchanged, killing two birds with one stone.

In order to perform hash calculations on the concatenated Join keys, we need to prepare various hash algorithms in advance and then transform the left and right tables. Upon receiving this requirement, our colleague, Xia A, immediately called the map operator on the right table and instantiated the Util class to get the hash algorithm. Finally, the transformation was completed by performing hash calculations on the concatenated Join keys. The specific code is as follows.

import java.security.MessageDigest

class Util {
  val md5: MessageDigest = MessageDigest.getInstance("MD5")
  val sha256: MessageDigest = _ // other hash algorithms
}

val df: DataFrame = _
val ds: Dataset[Row] = df.map{
  case row: Row =>
    val util = new Util()
    val s: String = row.getString(0) + row.getString(1) + row.getString(2)
    val hashKey: String = util.md5.digest(s.getBytes).map("%02X".format(_)).mkString
    (hashKey, row.getInt(3))
}

Upon closer inspection, we found that there is still room for optimization in this code. You see, the calculation performed by the map operator is on a per-data record basis. In other words, every data record in each data partition of the distributed dataset will trigger the calculation logic in the map operator. Therefore, we must be cautious when dealing with the calculations involved in the map operator. Obviously, the map operator should only include calculations related to data transformation, and any calculations unrelated to data transformation should be extracted outside of the map operator.

Looking back at the code above, the operations directly related to data transformation in the map operator are concatenating the Join keys and calculating the hash value. However, instantiating the Util object is only for obtaining the hash function, and it has nothing to do with data transformation. Therefore, we need to move it outside of the map operator.

Is it really necessary to be so meticulous about just one line of code? Yes, it is. This instantiation action will be triggered for each record, and if the entire dataset has billions of records, there will be billions of instantiations! A small computational cost can be amplified countless times under the scale effect, resulting in a significant performance issue that cannot be ignored.

val ds: Dataset[Row] = df.mapPartitions(iterator => {
  val util = new Util()
  val res = iterator.map{
    case row =>
      val s: String = row.getString(0) + row.getString(1) + row.getString(2)
      val hashKey: String = util.md5.digest(s.getBytes).map("%02X".format(_)).mkString
      (hashKey, row.getInt(3))
  }
  res
})

There are many such behaviors that ignore the instantiation of Util, such as repeatedly accessing RDD within a loop statement, using temporary variables to cache intermediate results of data transformation, and so on. This thoughtlessly diving into procedural programming, ignoring or neglecting the programming mode of distributed data entities is called single-machine thinking mode. As we mentioned in the RDD lecture, single-machine thinking mode will unconsciously introduce huge computational overhead in a distributed environment.

But you might say, “Single-machine thinking mode is everywhere, and it’s hard to prevent. How can we break out of it?”

Rome wasn’t built in a day. Since it is a thought pattern, it naturally cannot be formed in just one or two days, and it is not easy to get rid of it. However, when it comes to breaking out of single-machine thinking, I have a little trick to share with you. Of course, this may require some imagination.

Do you remember the potato workshop? Whenever you are ready to develop an application, you can construct a mental picture of a potato workshop and place the distributed datasets you need to define at appropriate positions in the workshop pipeline. When you need to process a certain dataset, you might want to spend some time thinking about the prerequisites for the current potato form. Continuously constructing the potato workshop in your mind can continuously deepen your understanding of the distributed computing process. Given time, I believe that you will be able to break out of the single-machine thinking mode!

Summary #

By following these 3 principles in your daily development work, not only can you give your application a good starting point in terms of performance, but you can also explore and expand more optimization techniques, accumulating optimization experience gradually.

First of all, by following the principle of “reaping what you sow”, you can make full use of the performance advantages brought by Spark by setting the relevant configuration options or calling the corresponding APIs. For example, by using the DataFrame or Dataset API for development, you can benefit from various optimization mechanisms of Catalyst and Tungsten. Moreover, by using file formats like Parquet or ORC, you can take advantage of predicate pushdown to improve data reading efficiency.

Secondly, if you can adhere to the principle of “save when possible, postpone when necessary”, try to push operations that save data scanning and data processing to the front, and delay operations involving Shuffle as much as possible, or even eliminate Shuffle altogether. By doing so, you can avoid many potential performance pitfalls.

Lastly, in your daily development work, it is important to guard against single-machine thinking and break free from that mindset. Getting rid of single-machine thinking helps cultivate a performance-oriented development habit. We can use our imagination during application development and construct a potato workshop (an analogy for a distributed computing environment). We can visualize each distributed dataset being placed on the assembly line of the workshop. When trying to obtain the calculation results, combined with the scheduling system, storage system, and memory management explained in the theoretical section, we can further imagine what the entire workshop needs to do and what costs it will bring.

Finally, let’s talk about the significance and value of summarization. The reason why we summarize various development techniques as development principles is not only because by following these principles, you can unknowingly avoid many performance pitfalls. More importantly, by starting from these principles and extrapolating outward, we often discover more development techniques, thereby expanding our boundaries of “conventional operations” and achieving the ability to derive more solutions from previous experience, truly avoiding the dilemma of “exhausted optimization ideas”.

Daily Practice #

  1. Regarding the three principles we discussed today, can you think of any other examples?
  2. Besides these three principles, do you think there are any other principles that developers need to pay special attention to?

Looking forward to seeing your thoughts and answers in the comments section. See you in the next lesson!