06 Shuffle Management Why Shuffle Is a Performance Bottleneck

06 Shuffle Management - Why Shuffle is a Performance Bottleneck #

Hello, I am Wu Lei.

In the previous lecture, we visited the headquarters of Spark International Construction Group and met the three giants of the Spark scheduling system: DAGScheduler, TaskScheduler, and SchedulerBackend. I believe you have already felt that the scheduling system has numerous components and operates with precision and complexity.

The first step in task scheduling is for the DAGScheduler to divide the computation graph (DAG) into multiple execution stages using Shuffle as the boundary. Obviously, Shuffle is the key to this process. So, we can’t help but ask: “What is Shuffle? Why does task execution require Shuffle? What is the process of Shuffle?”

In today’s lecture, we will “visit” the branch office of Spark International Construction Group and use the “task of moving bricks at a construction site” to understand Shuffle and its working principles. Since Shuffle computation consumes almost all types of hardware resources, such as CPU, memory, disk, and network, Shuffle is often the bottleneck of job execution performance in the majority of Spark jobs. Therefore, we must understand the working principles of Shuffle in order to lay a solid foundation for optimizing the Shuffle process.

What is Shuffle #

Let’s not rush to give Shuffle a formal definition. In order to help you quickly understand the meaning of Shuffle and achieve maximum efficiency with minimum effort, we can first visit the branch offices of the Spock Group to see what “brick laying on construction sites” is all about.

The various branch offices of the Spock Group are stationed at different construction sites, and the personnel and infrastructure of each branch office are more or less the same: in terms of personnel, each branch office has several construction workers and a foreman responsible for managing these workers. In terms of infrastructure, each branch office has a temporary warehouse for convenient storage and retrieval of building materials. These warehouses are equipped with various types of construction materials, such as concrete bricks, regular bricks, turf bricks, etc.

Since our purpose of visiting and investigating the Spock Construction Group is to learn Spark, we need to correspond the personnel and objects of the branch offices with the relevant concepts of Spark, so that you can quickly understand the various components and core principles of Spark.

The correspondence between the personnel and objects of the branch offices and the relevant concepts of Spark is as follows:

Image

Based on the corresponding relationship of different concepts in the figure, let’s take a look at the “brick-laying on construction sites” task. The three branch offices of the Spock Construction Group have each received three different construction tasks. The first branch office’s construction project is a skyscraper, the second branch office is required to build a “cute pet park” on the construction site, and the third branch office has received the task of creating an outdoor park. For the sake of narrative convenience, we will refer to the three branch offices as Branch Office 1, Branch Office 2, and Branch Office 3, respectively.

Obviously, different construction projects have different requirements for the selection of building materials. The construction of a skyscraper requires concrete bricks with higher rigidity, while the construction of an outdoor park requires turf bricks with good water permeability, and regular bricks are sufficient for the cute pet park.

However, different types of bricks are scattered in the temporary warehouses of the three branch offices. In order to achieve efficient utilization of resources, the construction workers of each branch office need to transport the required bricks from the other two branch offices. We call this process “brick-laying tasks”.

Image

With the intuitive comparison of “brick-laying on construction sites”, we can now give Shuffle a formal definition.

The original meaning of Shuffle is “shuffling” in poker. In the context of distributed computing, it is extended as cross-node and cross-process data distribution within a cluster. In the task of brick-laying on construction sites, if we regard different types of bricks as distributed datasets, then the process of transporting different types of bricks between the branch offices is similar to Shuffle in distributed computing.

To complete the task of brick-laying on construction sites, each worker needs to travel long distances to the other two branch offices and then transport the required bricks back from their temporary warehouses. The branch offices are far apart, and it is unrealistic for the workers to transport bricks one by one. Therefore, in order to improve the efficiency of brick-laying, each worker also needs to use cargo trucks to help. It is not difficult to see that the task of brick-laying on construction sites consumes a lot of manpower and resources, and can be described as labor-intensive.

The process of Shuffle is also similar, distributing distributed datasets within a cluster introduces a large amount of disk I/O and network I/O. In the computation chain of a DAG, the performance of the Shuffle stage is the worst. You may ask, “Since Shuffle has such poor performance, why do we have to introduce Shuffle in the computation process? Can’t we avoid the Shuffle stage?”

In fact, the need for Shuffle in the computation process is often determined by the computation logic, or in other words, the business logic.

For example, in the case of the brick-laying task, different construction projects require different building materials in order to meet different construction requirements. Similarly, in the Word Count example, our “business logic” is to count the occurrences of each word. In order to do the “summing” of words, we need to bring the “Spark” that is originally scattered among different Executors to a specific Executor before we can perform the counting operation.

Based on past work experience, we find that in the majority of business scenarios, Shuffle operations are necessary and unavoidable. Since we can’t avoid Shuffle, let’s explore what Shuffle is and how it works in the computation process.

Shuffle Task Working Principle #

To help you understand, let’s use the Word Count example. In this example, the reduceByKey operator is used to introduce the Shuffle operation. The code is as follows (for complete code, please review [Lesson 1]):

// Group and count by word
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y) 

Let’s first review the calculation process of this step, and then analyze the Shuffle operation involved:

Image

As shown in the above diagram, the computation of reduceByKey is divided into two stages by the Shuffle boundary. Conventionally, we call the stage before Shuffle the Map stage, and the stage after Shuffle the Reduce stage. In the Map stage, each executor performs initial aggregation (also known as map-side aggregation or local aggregation) on the data partitions it is responsible for. In the Shuffle phase, different words are distributed to different executors on different nodes. Finally, in the Reduce stage, executors perform a second aggregation (also known as global aggregation) based on the word key, completing the task of statistical counting.

It can be seen that the computation process of the Map stage and Reduce stage is relatively clear. Both of them use the reduce operation to perform local and global aggregation, respectively. In the computation process of reduceByKey, the Shuffle is the key.

By carefully observing the diagram above, it can be said that Shuffle is not so much a cross-node and cross-process data distribution, as it is a data exchange between the Map stage and the Reduce stage. So, how is data exchange achieved between the two execution stages?

Shuffle Intermediate Files #

In summary, the Map stage and Reduce stage use the production and consumption of Shuffle intermediate files to achieve data exchange across the cluster. In other words, the Map stage produces Shuffle intermediate files, and the Reduce stage consumes Shuffle intermediate files. Both stages use intermediate files as a medium to complete data exchange.

So, what are the Shuffle intermediate files, how are they generated, and how are they consumed?

I summarized the generation and consumption process in the diagram below:

Image

In the previous lesson where the scheduling system was introduced, we mentioned that the DAGScheduler creates a task set for each stage, and each task set contains multiple distributed tasks (tasks). In the Map execution stage, each task (hereinafter referred to as a Map task) generates Shuffle intermediate files containing data and index files as shown in the diagram above. In other words, the generation of Shuffle files is based on Map tasks. The number of Shuffle intermediate files produced is equal to the number of Map tasks.

Furthermore, Shuffle intermediate files are a general term and refer to two types of physical files: data files that record key-value pairs and index files that record the Reduce tasks to which the key-value pairs belong. In other words, the index files indicate which records in the data files should be consumed by which tasks in the downstream Reduce stage. In the diagram above, for illustration purposes, we assign the words starting with the letters S, i, and c to the 3 downstream Reduce tasks. It is clear that the data exchange rule here is the first letter of the word.

In Spark, the actual data exchange rules in the Shuffle phase are much more complex than this. The data exchange rules are also called partition rules, as they define how distributed datasets are partitioned in the Reduce stage. Assuming there are N Reduce tasks in the Reduce stage, and each task corresponds to an individual data partition, the formula below determines to which Reduce task each record should be sent in the Map stage:

P = Hash(Record Key) % N

For any data record, Spark first calculates the hash value of the primary key of the record using a pre-defined hash algorithm, and then takes the modulo of the hash value by N to obtain the partition number P of the record in the Reduce phase. In other words, during the Shuffle process, this record should be distributed to partition P in the Reduce phase.

Now that we are familiar with the partitioning rules and intermediate files, let’s talk about how the intermediate files are generated.

Shuffle Write #

As we just mentioned, the intermediate files in Shuffle are generated based on Map Tasks. Let’s use the Map Tasks and corresponding data partitions in the diagram below as an example to explain the generation process of the intermediate files. The data content of the partitions is shown in the green boxes in the diagram:

image

During the generation of the intermediate files, Spark utilizes a Map-like data structure to calculate, cache, and sort the data records in the data partitions. The key of this Map structure is (Reduce Task Partition ID, Record Key), and the value is the data value of the original record, as shown in the “In-memory data structure” in the diagram.

For each data record in the data partition, Spark calculates the target partition ID using the formula mentioned earlier, and then inserts the primary key (Reduce Task Partition ID, Record Key) and the data value of the record into the Map data structure. When the Map structure is full, Spark sorts the data records in the Map structure based on the primary key, and spills all the contents to temporary files on the disk, as shown in step 1 in the diagram.

After the Map structure is emptied, Spark continues to read the partition content and insert data into the Map structure until the Map structure is full again and spills again, as shown in step 2 in the diagram. This process repeats until all the data records in the data partition are processed.

At this point, there are several spilling temporary files on the disk, and the Map structure in memory still holds some data. Spark uses the merge sort algorithm to merge all the temporary files and the remaining data in the Map structure, and generates data files and corresponding index files, as shown in step 4 in the diagram. This process of generating intermediate files in the Shuffle phase is called Shuffle Write.

To summarize, the process of generating intermediate files in Shuffle can be divided into the following steps:

  1. For each data record in the data partition, calculate its target partition one by one, and then fill the in-memory data structure;
  2. When the data structure is full, if there are still unprocessed data records in the partition, sort the data records in the structure based on (target partition ID, Key), spill all the data to temporary files, and empty the data structure;
  3. Repeat steps 1 and 2 until all the data records in the partition are processed;
  4. Merge sort all the temporary files and the remaining data records in the in-memory data structure, and generate data files and index files.

So far, we have become familiar with the process of Spark generating intermediate files in the Map phase of Shuffle. Now let’s talk about how different Reduce Tasks locate their own data from these intermediate files during the Reduce phase in order to complete data fetching.

Shuffle Read #

First of all, we need to note that for each intermediate file generated by a Map Task, the number of target partitions is determined by the number of Reduce Tasks (also known as parallelism) in the Reduce phase. In the diagram below, the parallelism in the Reduce phase is 3, so the intermediate files of Map Tasks will contain data from 3 target partitions, and the index file is used to mark the start index of the data records belonging to each target partition.

image

For all the intermediate files generated by Map Tasks, Reduce Tasks need to download and fetch their own data content from the hard disks of different nodes over the network. The different Reduce Tasks determine which data content belongs to them based on the start indexes in the index files. The process of fetching data by Reduce Tasks in the Reduce phase is often referred to as Shuffle Read.

So far, we have answered several questions mentioned at the beginning of this article: “What is Shuffle? Why do we need Shuffle? How does Shuffle work?”. Shuffle is a crucial link that connects different execution phases, and the performance of Shuffle is often the key to the end-to-end execution efficiency of Spark jobs. Therefore, mastering Shuffle is a necessary step for us to get started with Spark. I hope today’s explanation can help you better understand Shuffle.

Key Takeaways #

Today’s content is quite extensive, let’s summarize it together.

First, we have given a clear definition of Shuffle. In the context of distributed computing, Shuffle refers to the cross-node and cross-process data distribution within a cluster.

We mentioned at the beginning that Shuffle computations consume all types of hardware resources. Specifically, the hash and sort operations in Shuffle consume a large amount of CPU resources. The process of generating intermediate files in Shuffle Write consumes precious memory resources and disk I/O. Finally, the data pulling in the Shuffle Read stage introduces a large amount of network I/O. It is not difficult to see that Shuffle is a resource-intensive computation, so it is essential for developers to understand Shuffle.

Next, we introduced the intermediate files in Shuffle. The intermediate files are a collective term, which includes two types of files: the data file that records (Key, Value) key-value pairs, and the index file that records the Reduce Task to which the key-value pairs belong. The Map and Reduce stages in the computation DAG complete the data exchange through intermediate files.

Then, we explained in detail the process of generating intermediate files in the Shuffle Write phase. In summary, this process consists of 4 steps:

  1. For the data records in each partition, calculate their target partition one by one, and then fill the in-memory data structure.
  2. When the data structure is full, if there are still unprocessed data records in the partition, sort the data records in the structure according to (target partition ID, Key), spill all the data to temporary files, and empty the data structure.
  3. Repeat steps 1 and 2 until all data records in the partition have been processed.
  4. Perform a merge sort on all remaining data records in the temporary files and the in-memory data structure, generate data files and index files.

Finally, in the Reduce stage, the Reduce Task “locates” its own data content through the index file and downloads its own data records from the data files on different nodes via the network.

Practice for Each Lesson #

That’s all for this lesson, and I have a question for you to think about:

In the computation process of Shuffle, the intermediate files are stored in the file directory set by the parameter spark.local.dir. The default value of this parameter is /tmp. How do you think this parameter should be set to be more reasonable?

Feel free to share your answer in the comment section, and I will be waiting for you there. If this lesson has been helpful to you, you can also share it with your friends. See you in the next lesson.