05 Scheduling Systems How to Master the Essence of Distributed Computing

05 Scheduling Systems - How to Master the Essence of Distributed Computing #

Hello, I’m Wu Lei.

In the previous lesson, we introduced the Driver and Executors in Spark’s process model and their interaction using the example of a contractor and construction workers. The Driver is responsible for parsing user code, building the computation graph, and converting the graph into distributed tasks, which are then distributed to the Executors in the cluster for execution.

However, you may be curious: “How does the Driver break down the computation graph into distributed tasks for a given user code and the corresponding computation graph? And how are these tasks distributed to Executors based on what rules? Furthermore, how do the Executors actually execute the distributed tasks?”

We have emphasized repeatedly that the essence of distributed computing lies in how to transform abstract computation graphs into concrete distributed computing tasks and deliver them for parallel execution. Deep understanding of distributed computing is essential for successful big data development, as it effectively prevents us from falling into the trap of “single-machine thinking” and lays a solid foundation for performance-oriented development.

And the series of questions above are exactly the key to mastering distributed computing. Therefore, in today’s lesson, we will delve into the Spark scheduling system, following these questions, to understand the ins and outs of distributed computing.

Role Division and Spark Construction Group #

In the previous lecture, we have roughly clarified the core links and key steps of Spark’s distributed task scheduling through the analogy of “foreman and construction workers” and the example of Word Count. The core task of today’s lecture is to take you deep into each link and do both “seeing the forest and seeing the trees”. Let’s first summarize these links and components so that you have a clear overall understanding.

Image

It is not difficult to find that there are many steps and components in the table. If we talk about the scheduling system according to the book, you may not only feel sleepy while reading, but even I myself may fall asleep while writing. Therefore, let’s incorporate these links and components into a story, so that you can understand Spark’s scheduling system like reading a novel while laughing.

Once upon a time, there was a well-known construction group called “Spark” in the United States, which was famous at home and abroad. This construction group, called “Spark Group”, was large in scale and had a headquarters (Driver) and multiple branch offices (Executors). The main service target of Spark Group was architects (developers). Architects were responsible for providing design drawings (user code, computational graph), and Spark Group’s main business was to land and construct high-rise buildings based on the drawings.

To complete the main business, the group needed to recruit architects who could understand the drawings and transform them into architectural projects. Therefore, Spark Group poached a well-known architect called “DAGScheduler”. The group assigned DAGScheduler as the head of the headquarters, and also required two senior veterans, “Task” and “Backend”, to fully cooperate with DAGScheduler in his work.

You may wonder who “Task” and “Backend” are?

TaskScheduler joined Spark Group immediately after graduation and is currently the construction manager of the headquarters. He has successfully supervised the completion of many large and small projects and has outstanding performance, which is highly appreciated by the company. SchedulerBackend and Task were dormitory neighbors in college and had a very close relationship. SchedulerBackend is currently the director of human resources of the headquarters. He is responsible for coordinating and arranging human resources with branch offices. From the company’s arrangement, the division of labor among the three supervisors is quite clear.

Image

The reason why TaskScheduler and SchedulerBackend are regarded as senior veterans of the company is that they are the earliest and at the same time, created scheduling system components during the initialization of SparkContext / SparkSession. The relationship between these two components is very subtle: SchedulerBackend refers to TaskScheduler in its constructor, while TaskScheduler refers to SchedulerBackend during initialization.

It is worth mentioning that the instantiation of the SchedulerBackend component depends on the Spark MasterURL specified by the developer, that is, the “–master” parameter specified when using spark-shell (or spark-submit). For example, “-master spark://ip:host” represents the Standalone deployment mode, and “-master yarn” represents the YARN mode, and so on.

It is not difficult to find that the SchedulerBackend is tightly coupled with the resource manager (Standalone, YARN, Mesos, etc.) and is the proxy of the resource manager in Spark. Just like human resources, hardware resources are also used to “work”. Therefore, if we use the human resources of the company to analogize the hardware resources of the Spark cluster, then “Backend” is definitely the director of human resources.

From a global perspective, DAGScheduler is the initiator of task scheduling. DAGScheduler submits task scheduling requests to TaskScheduler with TaskSet as the granularity. During initialization, TaskScheduler creates a task scheduling queue, which is used to cache TaskSets submitted by DAGScheduler. TaskScheduler combines WorkerOffer provided by SchedulerBackend and schedules the tasks in the queue one by one according to the predetermined scheduling strategy.

Image

In short, DAGScheduler has “tasks” in hand, SchedulerBackend has “human resources” in hand, and the core function of TaskScheduler is to assign the appropriate “tasks” to the appropriate “human hands”. Therefore, TaskScheduler plays a crucial role in connecting the upper and lower levels and coordinating their work. This is also an important reason why we consider “Task” as one of the elders of Spark Construction Company.

So, understanding the roles and responsibilities of these three supervisors, let’s now talk in detail about how they each complete their work.

Chief Architect: DAGScheduler #

In our story, with the assistance of the two seniors, DAGScheduler’s work is proceeding relatively smoothly. However, under the surface of the ice, there are hidden currents. As a parachuted leader, Boss Dag needs to personally demonstrate his abilities to secure recognition and trust from his peers.

As the “chief architect” of the group company, DAGScheduler’s core responsibility is to break down the computation graph DAG into execution stages, where stages refer to different running stages. At the same time, DAGScheduler is responsible for transforming stages into sets of tasks, which means transforming “architectural drawings” into executable and operable “construction projects”.

To summarize the process of splitting DAG into stages in one sentence, it is: starting from Actions operator, backtrack DAG from back to front, and use Shuffle operation as the boundary to divide stages. In [Lesson 2] when introducing the programming model, we used Word Count as an example and mentioned that the execution of a Spark job consists of two phases. The first phase is to construct the computational graph lazily, and the second phase is to trigger the job’s calculation from the beginning through the Actions operator:

image

For the second phase in the figure, Spark further refines it into two steps during actual execution. The first step is to recursively transform the logical computational graph DAG into one stage after another, starting from the back and using Shuffle as the boundary.

image

Still taking Word Count as an example, Spark starts with the take operator, gradually adding the RDDs in the DAG into the first stage until it encounters the reduceByKey operator. Since the reduceByKey operator introduces a Shuffle, the first stage is created and only contains the wordCounts RDD. Then Spark continues to backtrack because it has not encountered any operator that introduces Shuffle, so it adds all the RDDs encountered “along the way” into the second stage.

After the stages are created, the second step of triggering the calculation begins: Spark recursively requests the execution of all the stages in reverse order.

image

Specifically, in the example of Word Count, the first stage that the DAGScheduler requests to execute is Stage 1. During submission, the DAGScheduler finds that the parent stage that Stage 1 depends on, which is Stage 0, has not been executed yet. At this point, it will push the submission action of Stage 1 onto the stack and instead request the execution of Stage 0. When Stage 0 is completed, the DAGScheduler pops again and requests the execution of Stage 1.

For each stage requested to execute, the DAGScheduler creates a distributed task collection, TaskSet, based on the partitions attribute of the RDD in the stage. TaskSet contains multiple distributed tasks, where the number of tasks in TaskSet is equal to the number of data partitions in the RDD. In other words, tasks and RDD partitions have a one-to-one correspondence.

You might ask, “What does the Task represent specifically?” To better understand Task, let’s take a look at its key attributes.

image

In the table above, stageId and stageAttemptId indicate the relationship between Task and the executing stage. taskBinary encapsulates the user code belonging to the execution stage. partition represents the RDD data partition we just mentioned. locs attribute records the preferred computing node or Executor ID for this task as a string.

It is not difficult to see that the taskBinary, partition, and locs attributes together describe the following: where (locs) the task (taskBinary) should be executed (partition) for whom.

So far, we have finished explaining the responsibilities of DAGScheduler. Let’s summarize briefly together. DAGScheduler refers to the DAGScheduler, and its main responsibilities are threefold:

  • Construct the DAG based on user code.
  • Cut Stages based on Shuffle boundaries.
  • Create TaskSets based on Stages and submit them to the TaskScheduler for scheduling.

Now, DAGScheduler has fulfilled its mission of converting the “blueprint” into a “construction project”. Next, it needs to assign these tasks to tasks, and tasks will further delegate the tasks.

image

However, for task delegation, tasks first need to figure out how many “eligible workers” there are within the company. To achieve this, they rely on their comrade, Beckendorf, for help.

Beckendorf: SchedulerBackend #

As the Human Resources Director of the group company, Beckendorf’s core responsibility is to gather and grasp the real-time status of the company’s human resources. As we mentioned earlier, the company’s human resources correspond to Spark’s computing resources. For the available computing resources in the cluster, SchedulerBackend uses a data structure called ExecutorDataMap to record the resource status of each executor in the computing node.

The ExecutorDataMap here is a HashMap, where the Key is a string that marks the executor, and the Value is a data structure called ExecutorData. ExecutorData is used to encapsulate the resource status of the executor, such as RPC address, host address, available number of CPU cores, and fully-configured number of CPU cores, etc. It is like a “resource portrait” of the executor.

Image

With this “HR handbook”, SchedulerBackend can create “resource portraits” for executors internally. Externally, SchedulerBackend provides computing resources with WorkerOffers as granularity. WorkerOffer encapsulates the Executor ID, host address, and number of CPU cores, and it represents an idle resource available for task scheduling.

Clearly, based on the executor resource portraits, SchedulerBackend can provide multiple WorkerOffers for distributed task scheduling. The name WorkerOffer is quite appropriate. “Offer” literally means the job opportunity provided by the company, but in the context of the Spark scheduling system, it becomes an opportunity to use hardware resources.

Image

You may wonder how Beckendorf, who sits in the headquarters, manages to have a comprehensive understanding of the entire group’s human resources without leaving the office? “A fence has three stakes; a hero has three helpers.” With Beckendorf’s individual efforts alone, it would be inadequate. The actual contributor behind the scenes is a group of minions stationed in branch offices: ExecutorBackend.

SchedulerBackend maintains periodic communication with the ExecutorBackend in all executors within the cluster. Both sides communicate with each other through messages such as LaunchedExecutor, RemoveExecutor, and StatusUpdate to exchange information about available computing resources. Beckendorf constantly updates the handbook in his hands through these “letters” sent by the minions, thus having a comprehensive understanding of the group’s human resources.

Image

Task: TaskScheduler #

After the “head” Deke handles the “work” and the “HR” Beckendorf provides the “human resources”, it’s finally the turn for the matchmaker Task to step in. As the construction manager, Task’s core responsibility is to select the most suitable “work” from the “human resources” provided by Beckendorf and assign it accordingly. The selection process is the core of task scheduling, as shown in step 3 of the figure below:

Image

Now the question is, based on the WorkerOffers provided by SchedulerBackend, what criteria does TaskScheduler use to select Tasks?

In a nutshell, for a given WorkerOffer, TaskScheduler selects Tasks suitable for scheduling in TaskSets based on the tasks’ local preferences. What does this mean? It sounds a bit abstract. Let’s start with the task set TaskSet created by DAGScheduler within a Stage.

As we just mentioned, tasks correspond one-to-one with the partitions of RDD. During the task creation process, DAGScheduler sets the locs attribute for each task based on the physical addresses of data partitions. The locs attribute records the computing nodes where the data partitions are located, and even the Executor process ID.

For example, when we call the textFile API to read source files from the HDFS file system, Spark retrieves the storage addresses of data partitions based on the metadata recorded in the HDFS NameNode, such as node0:/rootPath/partition0-replica0, node1:/rootPath/partition0-replica1, and node2:/rootPath/partition0-replica2.

Then, when DAGScheduler creates Task0 for this data partition, it records the computing nodes from these addresses in the locs attribute of Task0.

As a result, when TaskScheduler needs to schedule Task0, based on the locs attribute of Task0, it knows: “Task0’s data partitions are stored as replicas on nodes node0, node1, and node2. Therefore, if the WorkerOffer comes from computing resources on these three nodes, it is a good fit for Task0.” From this example, we can better understand that each task comes with locality inclination, in other words, each task has its own “scheduling preference”.

Returning to the analogy of the Spack Construction Group, it’s like a certain “job”, not everyone can do it, but it is inclined to be done by certain people because they are more professional. For example, when it comes to bricklaying, it is more inclined to be done by bricklayers with more than 3 years of experience; whereas for ceiling installation, it is more inclined to be done by carpenters with more than 5 years of experience, and so on.

Local inclinations directed at the granularity of computing nodes in Spark are called NODE_LOCAL in terms of terminology. In addition to being directed at nodes, tasks can also be directed at processes (Executors), racks, and arbitrary addresses, and their corresponding terms are PROCESS_LOCAL, RACK_LOCAL, and ANY, respectively.

For tasks with a preference for PROCESS_LOCAL, it requires that the corresponding data partitions have replicas stored in a certain process (Executor); while for tasks with a preference for RACK_LOCAL, it only requires that the corresponding data partitions exist on the same rack. ANY is equivalent to no preference, meaning that the task does not have a preference for the destination of distribution and can be scheduled anywhere.

The following diagram shows the running logic of task scheduling by the TaskScheduler based on locality preference:

Image

It is not difficult to see that from PROCESS_LOCAL, NODE_LOCAL, to RACK_LOCAL, and finally to ANY, the locality preference of tasks gradually becomes more lenient. After the TaskScheduler receives a WorkerOffer, it traverses the Tasks in the TaskSet in this order, scheduling tasks with a preference for PROCESS_LOCAL first, followed by NODE_LOCAL, then RACK_LOCAL, and finally ANY.

You may ask, “How does Spark differentiate between different locality inclinations, and what is its intention and purpose?” In fact, different locality inclinations are essentially used to distinguish the relationship between computation (code) and data.

The core idea of the Spark scheduling system is “data immobile, code mobile.” That is to say, in the process of task scheduling, in order to complete distributed computing, Spark tends to leave the data in place, unchanged, and schedule and distribute the computational tasks (code) to where the data is located, eliminating the performance risks introduced by data distribution. After all, distributing code is much lighter than distributing data.

Locality inclination means where the code and data should “meet”. PROCESS_LOCAL means the code is in the JVM process, NODE_LOCAL means within the node, RACK_LOCAL means within the range of the physical rack, while ANY means “no matter, not important”.

Image

Okay, up to this point, combining WorkerOffer with the locality inclination of tasks, the task scheduler TaskScheduler has selected suitable “jobs” for scheduling: Tasks. Next, the TaskScheduler sends these Tasks to SchedulerBackend, the good friend of the SchedulerBackend, through the LaunchTask message. After receiving these tasks, SchedulerBackend, the human resources director, also uses the LaunchTask message to further distribute the tasks to the subordinates of the branch office: ExecutorBackend.

So, how does ExecutorBackend, the subordinate, work after receiving the tasks? Let’s continue reading!

Put into Execution: ExecutorBackend #

As the human resources manager of the branch office, once ExecutorBackend receives the “tasks”, it assigns them to the construction workers of the branch office. These workers are the CPU threads in the Executors thread pool, and each thread is responsible for handling a task.

Whenever a task is completed, these threads send a StatusUpdate event to the SchedulerBackend on the Driver side through ExecutorBackend, informing the status of the task execution. Next, TaskScheduler and SchedulerBackend relay the status to DAGScheduler, as shown in steps 7, 8, and 9 in the figure.

Image

For tasks within the same TaskSet, when they have completed both the task scheduling and task execution, that is, the computational process in steps 1 to 9 in the diagram, the Spark scheduling system completes the task scheduling of a certain stage in the DAG.

However, the story does not end here. We know that a DAG contains multiple stages, and the end of one stage signifies the beginning of the next stage, which is the meaning of dividing the DAG into stages in the first place. Only when all stages are scheduled and executed can a complete Spark job be considered finished.

As time goes on and after cooperating for one construction project after another, the parachuted leader Dag finally won the trust and recognition of the veteran Taske and Biden, firmly securing the top position of the Spack Construction Group. The future looks promising, and Dag’s prospects are bright.

Key Review #

In today’s lecture, we introduced the working principle of the Spark scheduling system using the story of the Spock Construction Group. To understand the workflow of the scheduling system, you need to master the 5 key steps in the table below:

Image

Specifically, task scheduling consists of the following 5 steps:

  1. The DAGScheduler divides the compute graph DAG, designed by developers, into multiple execution stages (Stages) by shuffle, and creates a set of tasks (TaskSet) for each Stage.

  2. The SchedulerBackend interacts with the ExecutorBackend in the Executors to obtain real-time information about available computing resources in the cluster, and records this information in the ExecutorDataMap data structure.

  3. At the same time, SchedulerBackend creates WorkerOffers based on the available resources in the ExecutorDataMap, providing computing resources at the granularity of WorkerOffers.

  4. For each WorkerOffer, the TaskScheduler traverses the tasks in the TaskSet according to their locality preferences, in the order of PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, and ANY. It prioritizes scheduling tasks with stricter locality requirements.

  5. The selected tasks are passed by the TaskScheduler to the SchedulerBackend, and then dispatched to the ExecutorBackend in the Executors. Once the Executors receive the tasks, they invoke the local thread pool to execute the distributed tasks.

That’s all for today. The scheduling system is the core of distributed computing systems. By understanding the ins and outs of Spark task scheduling, you will grasp the essence of the Spark distributed computing engine, which will lay a solid foundation for developing high-performance Spark distributed applications.

Practice for Each Lesson #

At the end of the lesson, I’m going to leave you with a practice question. Please think about how the DAGScheduler in Spark determines that all tasks in a stage have been scheduled and executed before deciding to schedule the next stage in the DAG.

Feel free to answer this question in the comments. If you found this lesson helpful, I also encourage you to share it with more friends and colleagues. I’ll be waiting for you in the comments. See you in the next lesson!