11 Why Shuffle Is an Unparalleled Performance Killer

11 Why Shuffle is an Unparalleled Performance Killer #

Hello, I am Wu Lei.

When you hear the word “Shuffle,” what comes to mind? I believe many people’s first reaction would be the most stubborn and challenging performance bottleneck in applications.

In previous lessons, we have mentioned Shuffle on more than one occasion, especially during the discussion on development principles. I even advised you to follow the principle of “minimize if possible, postpone if necessary” to avoid Shuffle as much as possible in your applications. If it cannot be avoided due to business logic constraints, try to postpone Shuffle as much as possible.

So why do we cringe and avoid talking about Shuffle? In today’s lesson, I will dive deep into how Shuffle works through the process of implementing a game called “The Fairy’s Flower Shower.” We will discuss why Shuffle is considered the ultimate performance killer in distributed applications. After all, only by understanding the working principles of Shuffle can we better avoid it.

Understanding Shuffle #

Assume that your boss assigned you a development task today and asked you to use Spark to implement a game requirement. This implementation requirement comes from a little story called “Fairy Scattering Flowers”.

Long ago, at the foot of Mount Yan, there was a small village with a “Rainbow” Elementary School. Children within a hundred-mile radius came to this school. One day, Teacher Huang from Grade 2, Class 2, and the five children in the class were playing a game called “Fairy Scattering Flowers”.

Teacher Huang’s backpack was filled with five different colored flowers - red, orange, yellow, purple, and blue. She randomly distributed the flowers to the five little students: Little Red, Orange, Little Yi, Zi, and Little Blue. After distributing the flowers, each student received a similar number of flowers, but with different colors.

Then, Teacher Huang announced the game rules: “You need to collaborate and collect the flowers together in the shortest possible time, grouping them by color. The game is considered finished when there are five piles of flowers on the desk, each pile having flowers of the same color.”

Everyone was eager to try. Little Yi said, “Hold on, let’s come up with a strategy. First, we’ll set up five desks in front, and then each person needs to do two things - first, sort their flowers by color into five groups, and then place the sorted flowers on the corresponding desk!” So, following Little Yi’s strategy, the kids quickly completed the game.

In fact, the game process of “Fairy Scattering Flowers” is similar to the workflow of Shuffle. Of course, the steps and stages involved in the Shuffle process are more complex than the game.

The original meaning of Shuffle is “to shuffle cards”. In a distributed computing environment, it has two stages. Generally speaking, the first stage is called the “Map stage”, and the second stage is called the “Reduce stage”. Of course, some people also refer to them as the Shuffle Write stage and the Shuffle Read stage.

In the game of “Fairy Scattering Flowers”, from the teacher distributing the flowers to the five students categorizing them by color, corresponds to the Map stage of Shuffle. The process of the students distributing the categorized flowers to the respective desks is similar to the Reduce stage of Shuffle.

Next, let’s delve into the two stages of Shuffle - the Map stage and the Reduce stage - using this story as a guide.

Since version 2.0, Spark has unified the management of the Shuffle operation under the Sort shuffle manager. Therefore, in today’s lesson, we will focus on the Shuffle distribution process implemented by the Sort shuffle manager.

How does the Map phase output intermediate files? #

Learning with a focus on the end and the results-oriented approach is often more efficient. Before delving into how the Map phase produces data, let’s first clarify what the output of the Map phase actually is.

As mentioned before, the data produced by the Map phase is ultimately materialized into intermediate files stored on disk. These intermediate files are stored in the directory specified by spark.local.dir. There are two types of intermediate files: data files with a .data suffix that store the data produced by the Map phase, and index files with an .index suffix that record the offset addresses of different partitions within the data files. Here, the partitions refer to the partitions in the Reduce phase, so the number of partitions corresponds to the parallelism in the Reduce phase.

With this in mind, we can further focus our analysis on how Spark produces these intermediate files in the Map phase. However, it is important to note that the execution flow of each Task in the Map phase is the same, and each Task will ultimately generate a data file and an index file. Therefore, the number of intermediate files is equal to the parallelism of the Map phase. In other words, for each Task, the Map phase will produce a corresponding number of data files and index files.

Next, let’s use Spark to implement the game of “Fairy Scattering Flowers” to analyze how the intermediate files are generated in the Map phase while playing the game.

Implementing “Fairy Scattering Flowers” with groupByKey #

In the game of “Fairy Scattering Flowers,” Mr. Huang asks everyone to collect flowers of the same color together. In the distributed development framework of Spark, the calculation process most similar to this game is undoubtedly the groupByKey transformation. Therefore, we can use groupByKey to implement the game.

First, let’s take a look at the flowers.txt file:

Yellow flower
Purple flower
Red flower
Orange flower
Green flower
Yellow flower
Purple flower
Orange flower
Green flower
......

Next, let’s see how Mr. A, one of the students, implements the “Fairy Scattering Flowers” game using groupByKey:

val flowers = spark.sparkContext.textFile("flowers.txt")
// Mr. Huang distributes flowers to 5 students
val flowersForKids = flowers.coalesce(5)
val flowersKV = flowersForKids.map((_, 1))
// Mr. B's two steps: first, everyone categorizes the flowers by color, then puts the categorized flowers on the respective desks.
flowersKV.groupByKey.collect

We can see that the code steps correspond to the game process. However, after reading the flower file, since groupByKey is a pair RDD transformation that requires consuming data in the (Key, Value) format, we need to transform the original flower data. Taking data partition 0 as an example, the data transformation process is illustrated in the following diagram. You can think of data partition 0 as the flowers distributed by Mr. Huang to Little Red.

Based on the key of the pair RDD, which is the flower color, the Map Task can calculate the target partition for each data record in the Reduce phase, which corresponds to the desks in the game. In Mr. B’s strategy, which flower goes on which desk is discussed and agreed upon in advance, but in Spark, the target partition to which each data record should be distributed is determined by the hash value of the key.

Once the target partitions are calculated, the Map Task will store each data record and its target partition in a special data structure called a “PartitionedPairBuffer,” which is essentially an array-based cache structure. How does it store the data records?

Each data record will occupy two adjacent elements in the array. The first element is (target partition, Key), and the second element is the Value. Let’s say the size of the PartitionedPairBuffer is 4, meaning it can store a maximum of 4 data records. So, if we take data partition 0 as an example, the storage status of Little Red’s first 4 flowers in the PartitionedPairBuffer would look like this.

For us, the ideal situation is that the PartitionedPairBuffer is large enough to hold all the data that the Map Task needs to process. However, in reality, each task is allocated a limited amount of memory, so the PartitionedPairBuffer cannot guarantee that it can accommodate all the data in the partition. Therefore, Spark needs a mechanism to ensure that the calculation can still be completed when the total amount of data exceeds the available memory. This mechanism is: sorting, spilling, and merging.

Take the PartitionedPairBuffer of size 4 as an example. There are 16 flowers in data partition 0, corresponding to 16 data records. At least 4 batches of data need to be processed in order. Before processing the next batch of data, the Map Task needs to move the existing data in the PartitionedPairBuffer. The way to move is simple: the Map Task directly spills the data to temporary files on the disk.

However, before spilling, for the existing data in the PartitionedPairBuffer, the Map Task first sorts them according to the first element of the data record, which is the target partition + key. In other words, although the data temporarily spills to the disk, the data in the temporary file is also sorted.

In this way, the PartitionedPairBuffer moves again and again, and the flowers in data partition 0 are processed in batches until all the flowers are processed. There are 16 flowers in partition 0, and the size of the PartitionedPairBuffer is 4. Therefore, the PartitionedPairBuffer is moved a total of 3 times, generating 3 temporary files, each containing 4 data records. Out of the 16 data records, 12 are scattered in the 3 files, and 4 are cached in the PartitionedPairBuffer.

Up to this point, we are only one step away from producing the intermediate files used to distribute data in the network during the Map phase. Remember? There are two types of intermediate files produced in the Map phase, one is data files and the other is index files. The data records scattered in the 3 temporary files and the PartitionedPairBuffer are the input sources for generating these two types of files. Finally, the Map Task uses the merge-sort algorithm to write the data from the 4 input sources into the data files and index files, as shown in the figure below.

diagram

Okay, up to now, we have used groupByKey to implement the game “Fairy Scatters Flowers” and explained in detail the process of producing intermediate files in the Map phase. Although there are many computing steps in the Map phase, the most important steps can be summarized as 4 steps:

  1. Calculate the target partition for each data record in the shard one by one and fill it into the PartitionedPairBuffer.
  2. After the PartitionedPairBuffer is filled, if there are still unprocessed data records in the shard, sort the data records in the Buffer by (target partition ID, Key), spill all the data to temporary files, and empty the cache at the same time.
  3. Repeat steps 1 and 2 until all data records in the shard are processed.
  4. Merge-sort all temporary files and the PartitionedPairBuffer, and finally generate the data files and index files.

It is not difficult to see that “Fairy Scatters Flowers” is actually a game of grouping and collecting. It should be said that it is relatively simple to use Spark to implement group and collect games. So, how can we transform “Fairy Scatters Flowers” into a game of “grouping and counting”?

Upgraded “Fairy Scatters Flowers” Game #

After the 5 children finished the game, it was still early before the end of class. Therefore, Teacher Huang adjusted the game rules: “You five still work together, this time, you need to count the number of flowers of different colors in the shortest time.”

Xiao Hong couldn’t wait and said, “It’s simple! Just follow the previous strategy, first sort the flowers into piles, and then each of us counts the flowers on the desks!”

Teacher Huang frowned and said, “Don’t rush, the new game rules also have a time limit. I have come up with a method similar to yours, divided into three steps: First, each person calculates the number of flowers of different colors they have; second, we just need to write the number of flowers on the corresponding desks; third, we separately sum up the numbers on the five desks. This way, we can finish faster.”

Implementing the Upgraded Fairy Scattering Flowers with reduceByKey #

If we want to implement the upgraded game using Spark, it’s actually quite simple. We just need to replace groupByKey with reduceByKey.

val flowers = spark.sparkContext.textFile("flowers.txt")
// Distribute flowers to 5 children
val flowersForKids = flowers.coalesce(5)
val flowersKV = flowersForKids.map((_, 1))
// Two steps for Huang Xiaoyi: count by color for each child, then count by desk
flowersKV.reduceByKey(_ + _).collect

Next, let’s analyze the differences between reduceByKey and groupByKey in the Map phase. In terms of Map-side computations, reduceByKey is similar to groupByKey. Both start by populating an in-memory data structure, followed by sorting and spilling to disk, and finally merging and sorting.

The difference lies in the data structure used by reduceByKey during the computation. It uses a data structure called PartitionedAppendOnlyMap, which is essentially a map with values that can be accumulated and updated. Therefore, PartitionedAppendOnlyMap is very suitable for aggregation calculations such as counting, summing, averaging, and finding extreme values.

In the above diagram, the values of the four key-value pairs are the state before scanning the green flowers in data partition 0. In PartitionedAppendOnlyMap, since the values can be accumulated and updated, this data structure can accommodate more flowers than just the four. As a result, compared to PartitionedPairBuffer, PartitionedAppendOnlyMap has higher storage efficiency and lower frequency of spilling data to disk.

By relying on efficient in-memory data structures, fewer disk files, and smaller file sizes, we can greatly reduce the disk and network overhead in the Shuffle process.

In fact, compared to groupByKey, collect_list, and other collection-based operators, aggregation-based operators (reduceByKey, aggregateByKey, etc.) have a performance advantage. Therefore, we should avoid introducing collection-based operators in aggregation computations. While this approach may not hinder the implementation of business logic, it is considered a major taboo in performance optimization.

How is the data distribution done in the Reduce phase? #

Finally, let’s talk about the Reduce phase. In the game of “Fairy Scattering Flowers,” after everyone has categorized their flowers, they actively place the flowers of different colors on the corresponding desks. This process is actually the data distribution in the Shuffle process. However, unlike the desks passively receiving flowers, Shuffle actively pulls data from the intermediate files on the Map side during the Reduce phase.

As mentioned earlier, each Map Task generates intermediate files as shown in the figure above, and the number of partitions in the file is the same as the parallelism of the Reduce phase. In other words, each data file generated by a Map Task contains the portion of data required by all Reduce Tasks. Therefore, for any Reduce Task to complete its calculation, it must first pull its own portion of data from the intermediate files of all Map Tasks. The index file is used to determine which part of the data belongs to which Reduce Task.

The process of Reduce Tasks pulling intermediate files over the network is actually the data distribution process between different stages. In the game of “Fairy Scattering Flowers,” there are 25 back and forth trips between 5 children and 5 desks. If you let 100 children collect 100 different colored flowers and place them on 100 desks, there will be 10,000 round trips between the 100 children and 100 desks! Obviously, the network overhead of data distribution in Shuffle will explode exponentially as the number of Map Tasks and Reduce Tasks increases linearly.

The Reduce Task fills the received data blocks into the read buffer, and then continuously consumes and processes the data records in the buffer according to the task’s calculation logic, as shown in the figure below.

As we can see, the steps marked with circles 1, 2, 3, 4 in the Reduce phase are exactly the same as the four steps in the Map phase. Yes, cause is effect, and effect is cause. When we say that a certain stage is the Map phase or the Reduce phase, our starting point or anchor is the Shuffle. For Shuffle 0 in the above figure, Stage 0 is the Map phase, and Stage 1 is the Reduce phase. However, for later Shuffle 1, Stage 1 becomes the Map phase. Therefore, when we broaden our perspective, we can see that Map and Reduce, which seem to be in opposition, actually have many commonalities.

Performance Killer #

After the analysis in the previous two stages, you should now have a more intuitive understanding of why Shuffle becomes a performance bottleneck. Here, let me summarize it for you.

First of all, for Shuffle, it consumes all hardware resources :

  • Whether it’s memory data structures like PartitionedPairBuffer, PartitionedAppendOnlyMap, or read/write buffers, they all consume valuable memory resources.

  • Due to limited memory space, overflowing temporary files introduce a large amount of disk I/O, and intermediate files outputted by the Map phase also consume disk space.

  • The exponential growth of cross-node data distribution brings a significant network overhead that should not be underestimated.

Secondly, it is difficult to balance the consumption of different hardware resources by Shuffle. Disk and network consumption are essential steps in Shuffle. However, compared to CPU and memory, the processing latency of disks and networks differs by several orders of magnitude. Taking the table below as an example, if we calibrate the units from nanoseconds to seconds, using the processing latency of CPU L1 cache as a reference, we will be surprised to find that when CPUs and memory process data in seconds, the processing latency of disks and networks is in the range of days to months!

It is precisely because of these characteristics of Shuffle that we become “terrified” when we talk about Shuffle, and we try our best to save and delay wherever possible.

Summary #

In this lecture, with the requirement of implementing the game “Fairy Scattering Flowers,” I have introduced the calculation process of Shuffle in an intuitive way. Shuffle consists of two computation phases, the Map phase and the Reduce phase. We should focus on understanding the computation flow of the Map phase, which can be summarized into 4 steps:

  1. For each data record in the shard, calculate its target partition one by one and fill the memory data structure (PartitionedPairBuffer or PartitionedAppendOnlyMap).
  2. When the data structure is full and there are still unprocessed data records in the shard, sort the data records in the structure based on (target partition ID, Key), spill all the data to temporary files, and clear the data structure.
  3. Repeat the previous 2 steps until all data records in the shard are processed.
  4. Merge-sort all temporary files and remaining data records in memory data structures to generate data and index files.

In the Reduce phase, we should pay attention to the process of Reduce Task pulling intermediate files from the network, which is actually the data distribution process between different stages. Moreover, the network cost of data distribution in Shuffle will increase exponentially with the linear growth of Map Tasks and Reduce Tasks.

Finally, from the perspective of hardware resources, Shuffle is very demanding for each type of hardware resource, especially memory, disk, and network. Due to the significant differences in processing latency between different hardware resources, it is difficult for us to balance the computation cost between CPU, memory, disk, and network during the Shuffle process. Therefore, we should avoid Shuffle as much as possible and minimize its impact when necessary.

Daily Practice #

  1. Taking the flowers that Little Red received (data partition 0) as an example, can you deduce each step of the Map stage in reduceByKey? (Hint: How many times does PartitionedAppendOnlyMap spill to temporary disk files? What is the content of each temporary file? What is the content of the final intermediate file? Is it the same as the intermediate file generated by groupByKey?)

  2. Both the Map stage and the Reduce stage involve data caching and storage. Combining the Spark configuration options introduced in the previous lecture, can you match the relevant configuration options to each stage?

Looking forward to seeing your thoughts and discussions in the comments section. See you in the next lecture!