05 Task Scheduling System What Does It Mean to Move Data Less and Move Code More

05 Task Scheduling System - What Does it Mean to Move Data Less and Move Code More #

Hello, I’m Wu Lei.

In our daily development and optimization work, in order to fully utilize hardware resources, we often need to manually adjust task parallelism to improve CPU utilization. The parameter that controls task parallelism in Spark is spark.default.parallelism. Increasing parallelism can indeed make full use of idle CPU threads, but the value of parallelism should not be too large, as a too large value will introduce too much scheduling overhead, which is not worth the trade-off.

This optimization technique has been discussed extensively, and you can easily find information about it online. Do you know why increasing the value of parallelism leads to an exponential increase in scheduling overhead? And what exactly does scheduling overhead refer to? Moreover, if you don’t want to try out different values one by one, how should you set the value of parallelism to achieve the best results in the shortest amount of time? If you don’t have the answers, or if you’re not confident about getting them right, then you should listen carefully to what I’m going to tell you next.

In this lecture, I will talk to you about what a scheduling system is, how it works, and help you get rid of the awkward situation where optimization is based on knowing the facts without understanding the reasons, using a machine learning case study.

Case study: Label Encoding for User Interest Features #

In machine learning applications, feature engineering accounts for nearly 80% of the time and effort of algorithm scientists. After all, a high-quality training sample limits the upper limit and ceiling of model performance. The case we are going to discuss comes from a typical processing scenario in feature engineering: Label Encoding.

What is Label Encoding? In machine learning, feature engineering can be divided into two types based on continuity: continuous numeric features and discrete features. Discrete features often exist in the form of strings, such as user interest features including sports, politics, military, and entertainment. For many machine learning algorithms, string-type data cannot be directly used and needs to be converted into numerical values. For example, mapping sports, politics, military, and entertainment to 0, 1, 2, 3, etc. This process is called Label Encoding in the field of machine learning.

In this case, we want to perform Label Encoding for user interest features, which simply means converting strings into fixed values based on a predefined template and then converting user interests among billions of samples into corresponding index values. The fixed template is a file interface between offline model training and online model serving, and its content only includes the column of user interests. The strings in the template file are sorted according to predefined rules. We need to note that user interests contain 4 levels, so this template file is relatively large, with tens of thousands of records.

// Template file
// User interests
Sports-Basketball-NBA-Lakers
Military-Weapons-Rifle-AK47

How do we perform the conversion? For example, we can map the user interest “Sports-Basketball-NBA-Lakers” to 0 and the interest “Military-Weapons-Rifle-AK47” to 1, and so on. The requirement is quite clear. After receiving the requirement, my colleagues around me quickly implemented the following processing function without any hesitation.

/**
Implementation 1
Input parameters: template file path, user interest string
Return value: the index value corresponding to the user interest string
*/

// Function definition
def findIndex(templatePath: String, interest: String): Int = {
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
searchMap.getOrElse(interest, -1)
}

// Function call in the Dataset
findIndex(filePath, "Sports-Basketball-NBA-Lakers")

We can see that this function has two parameters, one is the template file path, and the other is the user interest in the training samples. The processing function first reads the template file, then constructs a mapping from interest to index based on the sorted strings in the file, and finally searches for the user interest passed in as the second parameter in this mapping. If it is found, it returns the corresponding index; otherwise, it returns -1.

This code seems fine, and my colleagues used this function to perform Label Encoding on billions of samples. However, it took 5 hours on a distributed cluster of 20 AWS EC2 instances of type C5.4xlarge, which is unacceptable in terms of execution performance. You might say, “The requirement is like this, so what else can we do?” Let’s take a look at another implementation.

/**
Implementation 2
Input parameters: template file path, user interest string
Return value: the index value corresponding to the user interest string
*/

// Function definition
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)

// Function call in the Dataset
partFunc("Sports-Basketball-NBA-Lakers")

My colleagues used the second approach, which is based on the exact same data set, to perform Label Encoding, and they finished the task in less than 20 minutes on a distributed cluster of 10 AWS EC2 instances of the same type. It can be said that the improvement in execution performance is obvious. So, what are the differences between the two versions of code?

As we can see, compared to the first piece of code, the second piece of code has no changes in the function body. It still reads the template file, constructs a mapping from interest to index, searches for the user interest, and finally returns the index. The biggest difference lies in the usage of higher-order functions in the second piece of code, specifically in two aspects:

  1. The processing function is defined as a higher-order function, with the template file path as the parameter and the resulting function from interest to index as the return result.
  2. The function called in the Dataset containing billions of samples is not findIndex as in the first piece of code, but partFunc obtained by calling findIndex with the template file. partFunc is a regular scalar function with the interest as the parameter and the index as the result.

So, are higher-order functions really so magical? In fact, the improvement in performance is not due to higher-order functions themselves, but the scheduling system plays a role. The core responsibility of the Spark scheduling system is to convert the DAG constructed by the user into distributed tasks, distribute the distributed tasks to executors based on the availability of distributed cluster resources and scheduling rules. This process may sound complex, so let’s tell a little story to help you understand.

Potato Factory Upgrade #

After learning the second layer of meaning in memory computing, the boss of the Potato Factory decided to upgrade the potato processing assembly line to improve the production efficiency and flexibility of the factory.

Here, let’s review the second layer of meaning in memory computing, which refers to the fact that all operations in the same stage are combined into one function, and this function is applied to the input data at once to produce the calculation result at once.

Before the upgrade, the potato processing DAG of the factory was divided into three execution stages: Stage 0, Stage 1, and Stage 2. Among them, Stage 0 produces ready-to-eat potato chips, Stage 1 distributes seasonings, and Stage 2 produces potato chips of different sizes and flavors. We focus on Stage 0, which has three processing steps: cleaning, slicing, and baking. These three steps require three different devices: a washing machine, a slicer, and an oven.

The factory has three assembly lines, and each type of equipment requires three sets, which costs a lot of money. Therefore, the boss of the factory has been thinking hard to reduce the cost of equipment.

At this time, the foreman suggested, “Boss, I heard that there is a programmable potato processing device on the market. It is a black box with only an input port and an output port. You can’t see the internal operation process from the outside. However, the black box is controlled by a program. With the given ingredients on the input port, we can write a program to control the output of the black box. With this programmable device, we not only save money, but also can flexibly expand the product line in the future. For example, if we want to produce various flavors of french fries or mashed potatoes, we only need to replace the program loaded into the black box!”

After listening, the boss was very happy and decided to spend money to buy a programmable potato processing device, replacing and eliminating the existing washing machine, slicer, and oven.

Therefore, the potato processing assembly line of the factory looks like this. The work of the workers has also changed from installing the corresponding equipment on the assembly line according to the key steps of the DAG flowchart to writing the corresponding program for the key steps and loading it into the black box. In this way, the productivity of this factory has been upgraded from a workshop-style production mode to the operation mode of a modern assembly line.

So, what is the relationship between this story and the scheduling system we are going to talk about today? In fact, the workflow of the Spark scheduling system includes the following five steps:

1. Split the DAG into different execution stages (Stages); - 2. Create distributed tasks (Tasks) and task groups (TaskSets); - 3. Obtain the available hardware resources in the cluster; - 4. Determine which tasks/groups should be scheduled first based on the scheduling rules; - 5. Distribute the distributed tasks to the Executors in sequence.

Except for step 4, the other steps are all one-to-one with the key steps on the potato factory assembly line. Their corresponding relationships are as follows:

Now you might think it’s unnecessary to remember these steps with a story, but when we finish learning all the principles and then go back to connect the main line of the story, you will be pleasantly surprised to find that you can easily remember and understand all the principles, which is much more efficient than memorizing them.

What are the core components of the scheduling system? #

Next, let’s investigate how the Spark scheduling system works by diving into each step in the process. However, before that, we need to understand which key components are included in the scheduling system, how different components interact with each other, and what roles they play in order to better understand each step in the process.

The Spark scheduling system consists of three core components: DAGScheduler, TaskScheduler, and SchedulerBackend. These three components run in the Driver process and work together to convert the DAG built by the user into distributed tasks and then distribute these tasks to executors in the cluster for execution. However, since all their names include “Scheduler,” it might be difficult to grasp their meanings just by looking at their names. Therefore, I have summarized their corresponding relationships with the five steps in the scheduling system process in the following table. You can take a look.

1. DAGScheduler #

The main responsibilities of DAGScheduler are twofold: first, to split the user DAG into stages. If you don’t remember this process, you can review the content of the previous lecture; second, to create computation tasks within each stage. These tasks encompass the data transformation logic implemented by combining different operators. Then, the Executors receive the tasks and apply the encapsulated computation functions to distributed data partitions to perform the distributed computation process.

However, if we distribute tasks to Executors that are busy or saturated in the cluster, the execution will be greatly compromised. Therefore, before distributing tasks, the scheduling system needs to determine which nodes have idle computing resources and then distribute the tasks there. So, how does the scheduling system determine if a node is idle?

2. SchedulerBackend #

SchedulerBackend is responsible for this. It encapsulates and abstracts the resource scheduler and provides corresponding implementation classes to support various resource scheduling modes such as Standalone, YARN, and Mesos. At runtime, Spark determines which implementation class to instantiate based on the MasterURL provided by the user. The MasterURL is the resource manager specified in various ways, such as --master spark://ip:host (Standalone mode) or --master yarn (YARN mode).

For the available computing resources in the cluster, SchedulerBackend uses a data structure called ExecutorDataMap to record the resource status of each Executor in the compute node. ExecutorDataMap is a HashMap, where the Key is a string that marks the Executor, and the Value is a data structure called ExecutorData, which encapsulates the resource status of the Executor, such as RPC address, host address, available CPU cores, and fully equipped CPU cores. It is like a “resource portrait” of the Executor.

In summary, internally, SchedulerBackend uses ExecutorData to create resource portraits of Executors; externally, SchedulerBackend provides computing resources based on WorkerOffer. WorkerOffer encapsulates Executor ID, host address, and CPU cores, representing idle resources available for task scheduling. Obviously, based on the Executor resource portraits, SchedulerBackend can provide multiple WorkerOffers for distributed task scheduling simultaneously. The name “WorkerOffer” is quite interesting. The literal meaning of “Offer” is the job opportunity provided by a company. In the context of the Spark scheduling system, it becomes an opportunity to use hardware resources.

So far, we have the computational tasks to be scheduled, which are the Tasks created by DAGScheduler through Stages, and the available computing resources for scheduling tasks, namely one WorkerOffer after another provided by SchedulerBackend. If we view task scheduling from the supply-demand perspective, DAGScheduler is the demand side, and SchedulerBackend is the supply side.

3. TaskScheduler #

With demand on the left and supply on the right, if we consider the Spark scheduling system as a marketplace, it still needs an intermediary to help match and mediate transactions between them in order to maximize the efficiency of resource allocation. In the Spark scheduling system, this intermediary is the TaskScheduler. The responsibility of the TaskScheduler is to match and mediate between the supply and demand sides based on established rules and strategies.

Obviously, the core of TaskScheduler is the rules and strategies for task scheduling. TaskScheduler’s scheduling strategy is divided into two levels: the scheduling priority between different stages, and the scheduling priority between different tasks within a stage.

First, for two or more stages that have no dependencies and are independent of each other, they will compete with each other for the same available computing resources. At this time, who is scheduled first, or who has priority to enjoy these computing resources, everyone needs to follow the established rules and protocols.

For the task scheduling between these stages, TaskScheduler provides two scheduling modes, namely FIFO (First In, First Out) and FAIR (Fair Scheduling). FIFO is easy to understand. In this mode, stages consume available computing resources in the order they were created. It’s like two people wanting to buy the same second-hand house at the same time. Regardless of how much each person is willing to pay, the one who pays the deposit first will be given priority by the agency in matching the transaction with the seller.

You may say, “This doesn’t make sense! If the second person is willing to pay more, the seller would naturally prefer to deal with him.” Yes, considering the developer’s intention, TaskScheduler provides the FAIR scheduling mode. In this mode, which stage is scheduled first depends on the definition in the configuration file fairscheduler.xml.

In the configuration file, Spark allows users to define different scheduling pools, and each pool can specify different scheduling priorities. Users can associate different jobs with corresponding scheduling pools during the development process. This directly links the scheduling of different stages to the developer’s intentions, so that different stages can enjoy different priority treatment. In the example of buying a second-hand house, if the second person is willing to pay a 30% premium, the agency would naturally prefer to match him with the seller.

After discussing the scheduling priority between different stages, let’s talk about the scheduling priority between different tasks within the same stage. The task scheduling within a stage is relatively simpler. When TaskScheduler receives a WorkerOffer from SchedulerBackend, it will prioritize dispatching tasks that meet the locality level requirements. As we all know, there are four locality levels: Process local < Node local < Rack local < Any. From left to right, they represent process locality, node locality, rack locality, and cross-rack locality. From left to right, the efficiency of accessing the required data for computing tasks becomes worse.

Process local means that the input data required by the computing task is already in the Executor process, so it is most cost-effective to schedule this task to the target process. Similarly, if the data source has not been loaded into the Executor process and is stored on the disk of a computing node, it is also a good choice to schedule the task to the target node. Again, if we cannot determine which machine the input source is on, but we can be sure it is on a certain rack, the locality level will degrade to Rack local.

During the process of partitioning Stages and creating distributed tasks, DAGScheduler assigns a locality level to each task, and this level records the intended computing node address or even the Executor process ID. In other words, tasks come with scheduling preferences, and they tell TaskScheduler where they would prefer to be scheduled through the locality level.

Since the individual preference of the computing task is so strong, TaskScheduler, as an intermediary, must prioritize satisfying their preferences. It’s like a programmer wanting to rent a house in Xi’erqi, but the results pushed by the real estate agency app are all houses in Guomao on the Third Ring Road. Then there must be a problem with the matching algorithm of this agency.

Therefore, the principle of the Spark scheduling system is to keep the data in place as much as possible, while distributing the code that performs the computing tasks to the closest location to the data, thereby minimizing network overhead in the distributed system. After all, the cost of distributing code is much lower than the cost of distributing data, which is why we have the saying “data doesn’t move, code moves”.

In summary, after TaskScheduler selects the to-be-computed tasks based on the locality level, it serializes these tasks. Then, it hands them over to the SchedulerBackend, which distributes the serialized tasks to the target Executor on the destination host through the RPC address and host address recorded in ExecutorData. Finally, after receiving the tasks, the Executor hands them over to the built-in thread pool, and the multiple threads in the thread pool concurrently execute the data processing functions encapsulated in the tasks on different data partitions, thereby achieving distributed computing.

Performance Optimization Case Review #

Now that we understand how the scheduling system works, let’s take a look back at the development case of Label encoding and discuss the differences between the two implementation approaches. Let’s first review the main calculation steps of the processing function in the case:

  1. Reading and iterating through the template file content to create a dictionary from strings to values;
  2. Finding the value index corresponding to the interest string based on the user’s interest in the sample.

The fundamental difference between the two implementation approaches lies in the distributed computing process of these two calculation steps in the function. In the first implementation approach, the function is a scalar function that accepts two parameters, and the Dataset calls this function to perform Label encoding on billions of samples.

In the Spark task scheduling process, this function is packaged as Tasks by the DAGScheduler on the Driver, scheduled by the TaskScheduler to the SchedulerBackend, and finally distributed to the Executors in the cluster for execution by the SchedulerBackend. This means that every Executor in the cluster needs to execute the two calculation steps encapsulated in the function. It is worth noting that the computational cost of the first step, which involves iterating through file contents and building a dictionary, is quite high.

On the other hand, in the second implementation approach, the two calculation steps are encapsulated in a higher-order function. In the user’s code, this higher-order function is first called on the Driver with the template file to complete the first step of dictionary building, while simultaneously outputting a scalar function with only one parameter. This scalar function carries the just-built mapping dictionary. Finally, the Dataset applies this scalar function to billions of samples for Label encoding.

Do you see the difference? In the second implementation approach, the first step of calculation in the function is only performed once on the Driver, and the scalar function encapsulating the dictionary is distributed to all Executors in the cluster. Then, on the Executors, they call this function on their respective data partitions, eliminating the scanning of the template file and the overhead of building the dictionary. In the end, we only need to pass in the user’s interest in the sample, and the function can return the numerical result with a query efficiency of O(1).

For a distributed cluster with hundreds or thousands of Executors, the performance difference brought about by these two different implementation approaches can be quite significant. Therefore, if you can keep in mind how the Spark scheduling system works, I believe that in the process of code development or review, you will be able to recognize the performance issues caused by the first calculation step. This kind of reflection in the development process is actually helping to establish a performance-oriented development habit subtly.

Summary #

In today’s lecture, we first compared the performance differences between two implementation methods through a machine learning case, and realized that we may have a superficial understanding of scheduling systems, which may introduce potential performance hazards during development. To address this, I outlined the five main steps in the workflow of a scheduling system:

  1. Splitting the DAG into different stages.
  2. Creating distributed tasks and task sets.
  3. Obtaining information about available hardware resources in the cluster.
  4. Deciding which tasks/sets to schedule first based on scheduling rules.
  5. Distributing distributed tasks to executors in order.

Combining these five steps, we conducted an in-depth analysis of the working principle of the Spark scheduling system. We can summarize it from two aspects: core responsibilities and core principles:

  1. The core responsibility of the Spark scheduling system is to transform the DAG built by the user into distributed tasks, combine them with the availability of distributed cluster resources, and distribute the distributed tasks to executors based on scheduling rules.
  2. The core principle of the Spark scheduling system is to keep the data in place as much as possible, while distributing the code that handles computational tasks to the closest location to the data (executors or computing nodes) to minimize network overhead in distributed systems.

Daily Exercise #

  1. How does DAGScheduler set the locality level for each task during task creation?
  2. Does the Node local locality level hold in a cloud computing environment where computation and storage are separated? Under what circumstances do you think it holds? Under what circumstances do you think it doesn’t hold?

Looking forward to seeing your thoughts and answers in the comments. If your friends are also eager to understand the working principles of scheduling systems, feel free to forward this lecture to them. See you in the next lecture!