18 Disk Perspective if Memory Were Unlimited, Would Disk Still Have a Place in Your Arsenal

18 Disk Perspective - If Memory were Unlimited, Would Disk Still Have a Place in Your Arsenal #

Hello, I’m Wu Lei.

We all know that one of Spark’s strengths is in-memory computing. When we hear “in-memory computing,” our first reaction is usually that it offers high execution efficiency! However, when we hear “disk-based computing,” we might assume that the performance won’t be as good. Some people might even think that it would be great if Spark had unlimited memory so we could completely eliminate the need for disks. Of course, this assumption is unlikely to come true, and this kind of black-and-white thinking is incorrect.

If memory were unlimited, we could indeed find ways to eliminate disk I/O during the execution of Spark jobs. However, the introduction of large-scale Full GC pauses (Stop The World) caused by unlimited memory may actually result in worse application execution performance compared to when disk operations are involved. This does not align with our repeated emphasis that the ultimate goal of optimization is to seek a balance between different hardware resources.

So, in today’s lecture, we will talk about the important roles that disks play in the execution process of Spark tasks, their functional roles, and their value in terms of performance. Mastering these aspects can help us use disks more reasonably and balance the computational load of different hardware resources based on cost advantages.

The Functions of Disk in Spark #

In Spark, where are disks used? In the discussion of Shuffle, we mentioned that in the Map stage, Spark uses two different in-memory data structures, PartitionedPairBuffer and PartitionedAppendOnlyMap, to cache data records in partitions based on whether the computation requires aggregation. Distributed computing often involves massive amounts of data, so these data structures usually cannot hold all the data in the partition. In the case of limited memory, the spilling mechanism ensures the smooth execution of tasks without immediately reporting OOM exceptions due to insufficient memory space.

Taking the game “Fairy Sprinkling Flowers” as an example, we use groupByKey to collect flowers of different colors. In the case where the size of PartitionedPairBuffer is 4, when Xiaohong receives more than 4 flowers, in order for the remaining flowers to enter memory, Spark must temporarily spill the contents of PartitionedPairBuffer to a temporary file to free up memory space. This is the first function of disks: spilling to temporary files.

After the last batch of data in a partition is loaded into PartitionedPairBuffer, it is merged with the temporarily spilled data on disk, resulting in Shuffle data files and index files that are also stored on disk. This is what we commonly refer to as Shuffle intermediate files. This is the second function of disks: storing Shuffle intermediate files.

In addition, the third function of disks is to cache distributed datasets. In other words, any storage mode with “DISK” in its name will cache data that cannot fit in memory to disk. These cached data, including temporary files and intermediate files we just mentioned, will be stored in the file system directory corresponding to the spark.local.dir parameter.

Value on Performance #

In the lecture on configuration items, we mentioned that configuring the spark.local.dir parameter to an SSD or another storage system with higher access efficiency can provide better I/O performance. In addition, disk reuse can also bring better performance improvements to execution. Disk reuse refers to the process of repeatedly using intermediate files generated in the Shuffle Write phase for multiple calculations. In the following, I will explain in detail the common applications of disk reuse and its benefits through two examples.

Disk Reuse in Failure Retry #

We often say that in the absence of RDD Cache, once a computation step fails, the entire DAG will be recomputed from the beginning, which is called failure retry. Strictly speaking, this statement is inaccurate. Because the source of computation for failure retry is not the “head” of the entire DAG, but the intermediate files of the most recent Shuffle from the trigger point.

Let’s take the DAG in the diagram as an example. After two transformations, the HDFS source data produces RDD1 and RDD2 respectively. RDD2 undergoes two more computations after the Shuffle, resulting in RDD3 and RDD4.

Unfortunately, some tasks fail during the computation of RDD4. During the failure retry, Spark does backtrack from RDD4, but with the existence of disk reuse mechanism, it does not backtrack all the way to the HDFS source data. Instead, it directly backtracks to the “data source” of RDD3 that has been materialized on the node, which is the intermediate file RDD2 output to disk during the Shuffle Write phase. Therefore, one of the benefits of disk reuse is to shorten the path of failure retry, thus improving execution performance while ensuring job stability.

To help you understand better, let’s imagine that the pipeline computation in the DAG is like irrigation in a canal. The wheat fields on the loess slope cannot get more than a few drops of rainwater throughout the year, relying entirely on artificial canals for irrigation. When the hydropower station opens the gates and releases water, the water will flow eastward along the canal to nourish the thirsty wheat seedlings in the branch canal.

A hydropower station often serves a village within a radius of hundreds of miles. If we wait for the hydropower station to open the gates every time for irrigation, in years of severe drought, the water will not even reach the branch canal before the wheat seedlings dry up. If we can build a reservoir every certain distance along the canal, then nearby villagers can take water close by for irrigation. In this analogy of canal irrigation, the water from the hydropower station is the source of HDFS data, and the reservoir is the Shuffle intermediate file, while taking water nearby and irrigation nearby is the disk reuse mechanism.

Disk Reuse under ReuseExchange Mechanism #

You may say, “Disk reuse is not that significant. It’s just taking a shortcut and avoiding some detours during failure retry. In the absence of task failures, this advantage cannot be utilized.” That’s correct, so let’s talk about another form of disk reuse: the ReuseExchange mechanism. ReuseExchange is one of the many optimization strategies in Spark SQL. It refers to sharing the intermediate results of Shuffle computations, i.e., the Shuffle intermediate files, among the same or similar physical plans. The ReuseExchange mechanism can help us reduce I/O overhead, even save Shuffle, and greatly improve execution performance.

So, how can we effectively utilize the ReuseExchange mechanism? In data warehousing scenarios, to obtain data reports or visualization charts, users often need to execute multiple similar queries, or even execute the same query multiple times. In this case, the ReuseExchange strategy can bring great benefits in terms of execution efficiency.

Even without DataFrame Cache, the same or similar queries can utilize the ReuseExchange strategy to shorten the execution path and eliminate the additional Shuffle computation. From the perspective of data reuse, the role of the ReuseExchange mechanism is equivalent to the DISK_ONLY mode of DataFrame Cache.

Let’s take an example. Now we have a business requirement: given user access logs, we need to separately calculate the PV (Page Views) and UV (Unique Views) of different users, and then merge the two statistical results for later use. The user logs contain main fields such as the user ID, access time, and page URL. The business requirement is clear and simple, and we can quickly write the code.

// Version 1: Calculate PV and UV separately, then merge
// Data schema (userId: String, accessTime: Timestamp, page: String)

val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)

val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))

val resultDF: DataFrame = dfPV.Union(dfUV)

// Result sample
| userId | metrics | value |
| user0  | PV      | 25    |
| user0  | UV      | 12    |

The code logic first reads the user log and then calls count and countDistinct on the same DataFrame to calculate PV and UV respectively. Finally, the two DataFrames corresponding to PU and UV are merged together.

Although the code implementation is simple and straightforward, if we use explain on resultDF or check the physical plan through Spark UI, we will find that even though count and countDistinct are calculated based on the same data source, the execution paths of these two operations are completely independent. They scan the Parquet source file separately and perform calculations through Shuffle. Before Shuffle, local aggregation is performed in the Map side, and after Shuffle, global aggregation is performed in the Reduce side.

For most merge scenarios, the computing process is roughly like this. Obviously, this approach is extremely inefficient, especially when merging multiple datasets, the repeated data scanning and distribution will introduce more performance overhead. So, is there any way to make multiple operators on the same data source read the Parquet file only once and perform Shuffle only once?

After all the preparations, the answer is “yes”. For the code in version 1, we can make some adjustments to fully utilize the ReuseExchange strategy for optimization.

// Version 2: Calculate PV and UV separately, then merge
// Data schema (userId: String, accessTime: Timestamp, page: String)
 
val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath).repartition($"userId")
 
val dfPV: DataFrame = df.groupBy("userId").agg(count("page").alias("value")).withColumn("metrics", lit("PV"))
val dfUV: DataFrame = df.groupBy("userId").agg(countDistinct("page").alias("value")).withColumn("metrics ", lit("UV"))
 
val resultDF: DataFrame = dfPV.Union(dfUV)
 
// Result example
| userId | metrics | value |
| user0  | PV      | 25 |
| user0  | UV      | 12 |

The only part that needs to be adjusted is the data source reading, and the code for other parts remains the same. After reading the user logs using the Parquet API, we add a step of repartition, which calls the repartition operator with userId as the partition key.

After this small change, if we call explain on resultDF or check the Spark UI, we will find that in the new physical plan, the branch for count or countDistinct shows “ReuseExchange”, which means that one side reuses the exchange result of the other side.

By observing the execution plan, it is not difficult to see that the benefits brought by ReuseExchange are considerable. Not only does the data source need to scan only once, but also the Shuffle, which acts as the “performance bottleneck”, only happens once.

Additionally, you may have noticed that the two similar queries that reuse the Shuffle intermediate result are not exactly the same. One is to perform statistical counting using count, and the other is to perform deduplication counting using countDistinct. You see, with the data reuse using ReuseExchange, two similar queries achieve the same effect as using DISK_ONLY cache. In other words, you don’t need to manually call persist(DISK_ONLY) or endure the calculation process of disk caching to enjoy the benefits. Isn’t it amazing?

You may wonder, “Since the ReuseExchange mechanism is so useful, what conditions need to be met for Spark SQL to choose this execution strategy?” In fact, there are at least two conditions for triggering this:

  • The partition rules for multiple queries that depend on data must be consistent with the partition rules for the Shuffle intermediate data.
  • The fields (Attributes) involved in multiple queries must be consistent.

Regarding the first condition, we have already demonstrated it in the example. Both queries are grouped by userId, which requires the data they depend on to be partitioned by userId. This is why we add the repartition operator with userId as the partition key in version 2 of the code. Only in this way can the partition rules of the Shuffle intermediate result be consistent with the partition rules required by the queries.

By carefully observing the fields involved in the count and countDistinct queries, we will find that they are exactly the same. In fact, if we change count("page") in the count statement to count("*"), it does not affect the calculation of PV. However, this seemingly irrelevant change will cause the second condition to fail, and thus the ReuseExchange mechanism cannot be used to improve the execution performance. After changing count("page") to count("*") in version 2 of the code, the physical plan will revert to version 1. I leave the changes for you to compare as homework.

Conclusion #

Although disks are far less efficient than memory in terms of processing latency, they are still indispensable in performance tuning. Understanding the functional and performance value of disks can help us make better use of disks and balance the computational load of different hardware resources in terms of cost advantage.

In terms of functionality, disks play three main roles in Spark: spilling temporary files, caching distributed datasets, and storing shuffle intermediate files. These three functions not only improve job stability but also lay the foundation for improving execution efficiency.

In terms of performance, utilizing disk reuse mechanisms can greatly improve the execution performance of applications. Disk reuse refers to the process of repeatedly using intermediate files generated during the Shuffle Write stage for multiple calculations. Disk reuse has two purposes: failure retry and ReuseExchange mechanism. Failure retry refers to the process of attempting to recompute from the beginning after a task failure. In this process, disk reuse shortens the path of failure retry, ensuring job stability and improving execution performance.

The ReuseExchange strategy refers to the sharing of intermediate results of Shuffle computation for identical or similar physical plans. ReuseExchange contributes significantly to execution performance, allowing multiple operators based on the same data source to read Parquet files only once and perform only one Shuffle operation, thus greatly reducing disk and network overhead.

However, in order for Spark SQL to choose ReuseExchange during the optimization phase, the business application must meet two conditions:

  • The partition rules on which multiple queries depend must be consistent with the partition rules of the Shuffle intermediate data.
  • The fields involved in multiple queries must be consistent.

Daily Practice #

  1. Please change count("page") to count("*") in the calculation of count to observe the changes in the physical plan. Please leave a comment in the message area about your observations.
  2. In order for the ReuseExchange mechanism to take effect, we partition the dataset according to userId. Given this, you may consider in which situations the ReuseExchange mechanism is not suitable? Why?

Looking forward to seeing your thoughts and answers in the message area. See you in the next lecture!