In Depth Interpretation of Spark Job on Spark Ui Part 2

In-Depth Interpretation of Spark Job on Spark UI - Part 2 #

Hello, I am Wu Lei.

In the previous lecture, we discussed the primary entry points of Spark UI. The Executors, Environment, and Storage sections are detailed pages that allow developers to quickly understand the overall computational load, runtime environment, and detailed information about dataset caching in the cluster. On the other hand, the SQL, Jobs, and Stages sections provide more of a list-style display, and to delve into the details, we need to go into the secondary entry points.

Continuing with the previous analogy, as a developer acting as a “doctor”, in order to quickly locate the “lesions” based on experience, various indicators are needed. The secondary entry points we are going to talk about today are more comprehensive and detailed compared to the primary entry points. To become an experienced “doctor” in terms of clinical experience, we first need to be proficient in interpreting these metrics.

Image

The term “secondary entry points” refers to pages that can only be accessed through hyperlinks. For the SQL, Jobs, and Stages entry points, the secondary entry points often provide sufficient information, covering the entire content of the “medical reports”. Therefore, although Spark UI also provides a small number of tertiary entry points (pages that require two clicks to access), these tertiary entry points hidden in the corners do not typically require special attention from developers.

Next, we will visit the secondary entry points for SQL, Jobs, and Stages in order, in order to gain a deeper understanding and insight into the global DAG, jobs, and execution stages.

SQL Details Page #

In the SQL tab at the primary entry point, we see three items: count (statistics of application numbers), count (statistics of successful application numbers), and save. The calculation processes for the first two items, which involve reading data sources, caching data, and triggering materialization of caches, are relatively simple. Therefore, we will focus on the save item.

Image

By clicking on “save at :27” in the image, we can access the execution plan page for that job, as shown in the following image.

Image

To focus on the key points, we have only captured a portion of the execution plan here. If you want to obtain the complete execution plan, you can access it here. For your convenience, I have hand-drawn a schematic diagram of the execution plan below for your reference.

Image

As can be seen, the calculation process of the “rate calculation” application is very representative, covering most operations in data analysis scenarios, including filtering, projection, joining, grouping and aggregation, and sorting. The red part in the graph represents Exchange, which represents the Shuffle operation, the blue part represents Sort (sorting), and the green part represents Aggregate, which represents (local and global) data aggregation.

Undoubtedly, these three parts are the main consumers of hardware resources. Moreover, for these three types of operations, Spark UI provides detailed metrics to depict the corresponding hardware resource consumption. Next, let’s focus on studying the metrics for these three types of operations.

Exchange

The two Exchanges shown side by side in the following image correspond to the two Exchanges before the SortMergeJoin in the schematic diagram. Their purpose is to perform Shuffle on the application code data and successful application code data in preparation for data joining.

Image

As can be seen, for each Exchange, Spark UI provides rich metrics to depict the Shuffle calculation process. From Shuffle Write to Shuffle Read, from data volume to processing time, everything is covered. To facilitate explanation, I have compiled the interpretation and explanation of the metrics in tabular form for you to refer to at any time.

Image By combining this “physical examination report” of Shuffle, we can quantitatively grasp the calculation details of the Shuffle process, thereby providing more insights and ideas for optimization.

To give you an intuitive understanding, let me give you an example. For example, we observe that the size of the filtered lottery number data is less than 10MB (7.4MB). At this time, our first thought is that it is unreasonable for Spark SQL to choose the SortMergeJoin strategy for joining a large table with a small table.

Based on this judgment, we can make Spark SQL choose the BroadcastHashJoin strategy to provide better execution performance. As for the specific methods of optimization, I believe I don’t need to say much, and you must already know: either use forced broadcasting or use the AQE feature provided in version 3.x.

You can try to optimize the transformation from SortMergeJoin to BroadcastHashJoin based on the code provided at the beginning of this lecture, and I look forward to you sharing your optimization results in the comments section.

Sort

Next, let’s talk about Sort. Compared with Exchange, Sort has fewer metrics, but they are enough for us to see the memory consumption of Sort at runtime, as shown in the following figure.

Image

As usual, let’s organize these metrics into a table for later reference.

Image

It can be seen that the two values, “Peak memory total” and “Spill size total,” are sufficient to guide us in setting spark.executor.memory, spark.memory.fraction, and spark.memory.storageFraction in a more targeted manner, thus ensuring sufficient protection for the Execution Memory region.

Taking the above figure as an example, combined with the peak consumption of 18.8GB and disk spillage of 12.5GB, we can conclude that the current 3GB Executor Memory is far from enough. Therefore, we naturally need to adjust the above three parameters to speed up the execution performance of Sort.

Aggregate

Similar to Sort, the metrics for measuring Aggregate also mainly record the memory consumption of the operation, as shown in the figure.

Image

It can be seen that for the Aggregate operation, the Spark UI also records the spill size and peak memory consumption, that is, Spill size and Peak memory total. These two values also provide a basis for memory adjustment. Taking the above figure as an example, zero spillage and a peak consumption of 3.2GB prove that the current 3GB Executor Memory setting is more than enough for Aggregate calculations.

So far, we have introduced the metrics for Exchange, Sort, and Aggregate respectively. Combined with the example of “rate calculation,” we have conducted a simple optimization analysis.

Looking at the complete DAG of “rate calculation,” we can see that it includes several Exchanges, Sorts, Aggregates, as well as Filters and Projects. Combining the aforementioned metrics of various types, we need to observe and gain insights into the execution plan in a coordinated manner, from specific to overall.

Jobs Details Page #

Next, let’s talk about the Jobs details page. The Jobs details page is very simple and intuitive. It lists all the stages belonging to the current job. To access the execution details of each stage, we still need to click on the “Description” hyperlink.

Image

Stage Details #

In fact, there is another way to access the Stage details, which is to directly enter from the Stages entry point and then complete the redirection. Therefore, the Stage details page is also classified as a secondary entry point. Next, let’s take Stage with Id 10 as an example to see what key information is recorded on the details page.

Among all the secondary entry points, the Stage details page can be said to have the most information. By clicking into the Stage details page, you can see that it mainly contains three categories of information, namely Stage DAG, Event Timeline, and Task Metrics.

Among them, Task Metrics is divided into “Summary” and “Entry details” sections, providing different levels of information summaries. The categories of metrics recorded in Task Metrics can also be expanded through the “Show Additional Metrics” option.

Image

Stage DAG

Next, let’s talk about the content included in these pages in the order of “Stage DAG -> Event Timeline -> Task Metrics”.

First, let’s start with the simplest Stage DAG. By clicking the blue “DAG Visualization” button, we can get the DAG of the current stage, as shown in the following figure.

Image

The reason why the Stage DAG is said to be simple is that we have already provided a detailed explanation of the DAG in the SQL entry point. The Stage DAG is only a subset of the complete DAG of the SQL page, after all, the DAG of the SQL page is for the job. Therefore, as long as you understand the DAG of the job, you naturally understand the DAG of each stage.

Event Timeline

Parallel to “DAG Visualization”, above “Summary Metrics”, there is an “Event Timeline” button. By clicking it, we can obtain the following visual information.

Image

The Event Timeline records the main time expenses of different computing stages in the process of distributed task scheduling and execution. Each strip in the figure represents a distributed task, with different colors forming the strip. The rectangles of different colors represent the computation time of different stages.

To facilitate the description, I have organized the meanings and functions of these stages in a table form for you to refer to at any time.

Image

Ideally, most of the strips should be green (as shown in the figure), indicating that the time consumption of the tasks is mainly in the execution time. However, the actual situation is not always like this. For example, sometimes the blue parts comprise a large proportion, or the orange parts are relatively large.

In these cases, we can combine the Event Timeline to determine whether the job has excessive scheduling overhead or heavy Shuffle load, and then optimize different stages accordingly.

For example, if there are a lot of deep blue parts (Scheduler Delay) in the strip, it means that the scheduling overhead of the tasks is heavy. At this time, we need to refer to the “Three-legged Approach” optimization technique to adjust the CPU, memory, and parallelism accordingly, in order to reduce the scheduling overhead of the tasks.

Another example is that if the area of yellow (Shuffle Write Time) and orange (Shuffle Read Time) parts in the strip is large, it means that the Shuffle load of the tasks is heavy. In this case, we need to consider whether it is possible to eliminate the Shuffle by using Broadcast Join, so as to alleviate the Shuffle burden of the tasks.

Task Metrics After discussing Stage DAG and Event Timeline, let’s talk about the highlight of the Stage Details page: Task Metrics.

Task Metrics provides detailed quantitative indicators at different granularities. “Tasks” records the execution details of each distributed task, while “Summary Metrics” provides statistical summaries of all task execution details. Let’s start with the coarse-grained “Summary Metrics” and then dive into the fine-grained “Tasks”.

Summary Metrics

First, we click the “Show Additional Metrics” button and select “Select All” to enable all metric indicators, as shown in the following image. The purpose of doing this is to obtain the most detailed task execution information.

Image

After selecting “Select All”, Spark UI prints out all the execution details. As usual, for ease of narration, I will organize these metrics into a table for your reference. Task Deserialization Time, Result Serialization Time, Getting Result Time, Scheduler Delay have the same meaning as in the previous table, so I won’t repeat them here. We will only focus on the new Task Metrics.

Image

For these detailed Task Metrics, importantly, Spark UI provides statistical distributions of different metrics using maximum, minimum (max, min), and percentiles (25th percentile, 50th percentile, 75th percentile). This is crucial because these statistical distributions of metrics allow us to clearly quantify the workload distribution of tasks.

In other words, based on the statistical distribution information of different metrics, we can easily determine whether there is a relatively balanced or severe skewness among different tasks in the current job. If it is determined that there is skewness in the computational load, we need to use techniques like “manual skew handling” or Automatic Query Execution’s (AQE) automatic skew handling to eliminate the imbalance between tasks and improve job performance.

In the table above, half of the metrics are directly related to Shuffle, such as Shuffle Read Size / Records, Shuffle Remote Reads, and so on.

We have discussed these metrics in detail when introducing SQL details. Additionally, Duration, GC Time, and Peak Execution Memory, either have been explained or are too simple to require further explanation. Therefore, for these three metrics, we won’t spend more ink on them.

What’s worth noting here are the Spill (Memory) and Spill (Disk) metrics. Spill refers to the data that is moved out due to the limited space of memory data structures (e.g., PartitionedPairBuffer, AppendOnlyMap, etc.). Spill (Memory) represents the size of this data stored in memory, while Spill (Disk) represents the size of this data on disk.

Therefore, by dividing Spill (Memory) by Spill (Disk), we can obtain an approximate value for the “data explosion coefficient,” which we will remember as the Explosion ratio. With the Explosion ratio, we can accurately estimate the memory consumption of data stored on disk, thus accurately understanding the memory consumption of data.

Tasks

After introducing the coarse-grained Summary Metrics, let’s now talk about the fine-grained “Tasks”. In fact, many of the metrics in Tasks highly overlap with the Summary, as shown in the following image. Similarly, we won’t repeat those overlapping metrics here, and you can refer to the Summary section to understand these metrics. The only difference is that these metrics are measured for each individual task.

Image

As usual, we will organize those new metrics in Tasks into a table for future reference.

Image

As you can see, there are not many new metrics. The most important one here is the Locality level, which represents the locality preference of each task. As we have discussed in the scheduling system, each task has its own locality preference. Combining the locality preference, the scheduling system schedules tasks to suitable executors or compute nodes, ensuring “data locality” as much as possible while executing the code.

Logs and Errors are the third-level entry points in Spark UI. They are the execution logs of tasks and provide detailed runtime status of tasks during execution. Generally, we don’t need to dive deep into the third-level entry points for debugging. The error messages provided in the Errors column are often enough to quickly locate the problem.

Key Review #

Alright, today’s lesson is finished here. In today’s lecture, we have separately learned about the SQL, Jobs, and Stages in the secondary entry point. Each of the contents of the secondary entry point is very rich. Knowing in advance the information they cover is very helpful for us to search, inspire, and explore performance optimization ideas.

So far, all the content about Spark UI has been covered. The metrics involved in Spark UI are numerous and complicated. Just following me to learn “visually” once is only the first step. Afterwards, you need to combine it with your daily development, explore and experience more. Keep it up!

You are also welcome to share your experiences and insights in using Spark UI in the comments section after class. Let’s discuss and progress together!

Lastly, I would like to remind you that since our application is submitted through spark-shell, the Spark UI on port 8080 of the node will continue to display the “health report” of the application. After we exit spark-shell, the memory on port 8080 of the node will disappear immediately (404 Page not found).

To view the “health report” of the application again, you need to go to port 18080 of the node. This is the territory of the Spark History Server, which collects the “health reports” of all (completed) applications and displays them in the form of Spark UI. Remember, remember.

Practice for Each Lesson #

Today’s thinking question requires you to think creatively. After studying Spark UI, please tell me, through which ways can we locate data skew problems?

You are welcome to communicate and discuss with me in the comment section. I also welcome you to recommend this lecture to friends and colleagues who may need it.