22 Spark Ui How to Efficiently Locate Performance Issues (Continued)

22 Spark UI - How to Efficiently Locate Performance Issues (continued) #

Hello, I’m Wu Lei.

In the previous lecture, we looked at the first-level entry points of Spark UI together. The Executors, Environment, and Storage are detailed pages where developers can quickly understand the overall computational load, runtime environment, and detailed information about dataset caching. However, SQL, Jobs, and Stages are more like a list-style display, and to understand the details, you need to dig into the second-level entry points.

Continuing with the previous analogy, as a “doctor,” developers need various metrics to quickly locate the “lesion” based on their experience. The second-level entry points we will discuss today are more detailed and comprehensive compared to the first-level entry points. To become an “experienced clinician,” we first need to be proficient in interpreting these metrics.

Image

The so-called second-level entry points refer to pages that can only be accessed through hyperlinks. For the SQL, Jobs, and Stages entry points, the second-level entry points often provide enough information, covering the entire content of the “medical report.” Therefore, although Spark UI also provides a small number of third-level entry points (pages that require two clicks to reach), these third-level entry points hidden in the “corners and crevices” often do not require developers to pay special attention.

Next, we will follow the sequence of SQL -> Jobs -> Stages and visit their second-level entry points one by one in order to obtain more in-depth exploration and insights into the global DAG, jobs, and execution stages.

SQL Details Page #

In the SQL tab, we see three entries: count (statistics of application numbers), count (statistics of successful numbers), and save. The computation process for the first two is relatively simple as it involves reading the data source, caching the data, and triggering the materialization of the cache. Therefore, we focus on the save entry.

Image

Clicking on “save at:27” in the image above takes you to the execution plan page of this job, as shown in the following image.

Image

In order to focus on the main points, we have only captured part of the execution plan here. If you want the complete execution plan, you can access it here. For your convenience, I have hand-drawn a diagram of the execution plan for your reference, as shown in the image below.

Image

As you can see, the computation process of the “ratio and successful rate analysis” application is very representative, covering most of the operations in data analysis scenarios, including filtering, projection, join, group by aggregation, and sorting. The red section in the diagram represents Exchange, which represents Shuffle operations, the blue section represents Sort, which is sorting, and the green section represents Aggregate, which represents (local and global) data aggregation.

Undoubtedly, these three parts are the main consumers of hardware resources, and for these three types of operations, Spark UI provides detailed metrics to describe the corresponding hardware resource consumption. Next, let’s focus on studying the metrics for these three types of operations.

Exchange

The two parallel Exchanges in the following image correspond to the two Exchanges before the SortMergeJoin in the diagram. Their purpose is to shuffle the application code data and successful code data in preparation for data association.

Image

As you can see, for each Exchange, Spark UI provides abundant metrics to describe the Shuffle computation process. From Shuffle Write to Shuffle Read, from data volume to processing time, everything is covered. In order to facilitate explanation, I have compiled the explanation and interpretation of the metrics in tabular form for you to refer to at any time.

Image By combining this “medical report” on Shuffle, we can understand the calculation details of the Shuffle process in a quantitative way, thereby providing more insights and ideas for optimization.

To give you a sense of it, let’s use an example. For example, we observe that the size of the filtered lottery number data is less than 10MB (7.4MB). In this case, we would first think that it is unreasonable for Spark SQL to choose the SortMergeJoin strategy for joining a large table with a small table.

Based on this judgment, we can easily make Spark SQL choose the BroadcastHashJoin strategy to improve performance. As for the specific methods of optimization, I believe I don’t need to say much, and you must have already understood: either use forced broadcast, or take advantage of the AQE feature provided by Spark 3.x.

Feel free to use the code provided at the beginning of this lecture to optimize the conversion from SortMergeJoin to BroadcastHashJoin, and we look forward to you sharing your optimization results in the comments.

Sort

Next, let’s talk about Sort. Compared to Exchange, Sort doesn’t have as many metrics, but they are enough for us to get a glimpse of the memory consumption during runtime, as shown in the following figure.

Image

As usual, let’s organize these metrics into a table for easy reference.

Image

As can be seen, the two values “Peak memory total” and “Spill size total” are enough to guide us to set spark.executor.memory, spark.memory.fraction, and spark.memory.storageFraction more targetedly, in order to ensure sufficient protection for the Execution Memory region.

In the case of the above figure, combined with the peak consumption of 18.8GB and the disk spilling of 12.5GB, we can determine that the current 3GB of Executor Memory is far from enough. So naturally, we need to adjust the above three parameters to accelerate the execution performance of Sort.

Aggregate

Similar to Sort, the metrics for measuring Aggregate mainly record the memory consumption of the operation, as shown in the figure.

Image

As can be seen, for the Aggregate operation, Spark UI also records the spill size and peak memory total, providing a basis for memory adjustment. In the example of the above figure, zero spillage and a peak consumption of 3.2GB prove that the current 3GB Executor Memory setting is more than enough for Aggregate computation.

So far, we have introduced the metrics for Exchange, Sort, and Aggregate separately, and combined with the example of “ratio and subscription rate analysis,” we have performed simple optimization analysis.

Looking at the complete DAG of the “ratio and subscription rate analysis,” we can see that it contains several Exchanges, Sorts, Aggregates, Filters, and Projects. By combining the aforementioned metrics, for observing and understanding the execution plan, we need to take a holistic approach, from point to line, from local to global.

Jobs Details Page #

Next, let’s talk about the Jobs Details page. The Jobs Details page is very simple and intuitive, listing all the Stages belonging to the current Job. In order to access the execution details of each Stage, we still need to jump through the “Description” hyperlink.

Image

Stage Details Page #

In fact, there is another way to access the Stage details, which is to directly enter through the Stages entry and then navigate. Therefore, the Stage Details page is also classified as a secondary entry. Next, let’s take a look at the details of Stage with Id 10 and see what key information is recorded on the details page.

Among all the secondary entries, the Stage Details page can be said to have the most amount of information. When you click into the Stage Details page, you can see that it mainly contains three categories of information, namely Stage DAG, Event Timeline, and Task Metrics.

Among them, Task Metrics are further divided into “Summary” and “Entry details” sections, providing different levels of information summary. The metric categories recorded in Task Metrics can also be expanded through the “Show Additional Metrics” option.

Image

Stage DAG

Next, let’s discuss the content contained in these pages in the order of “Stage DAG -> Event Timeline -> Task Metrics”.

First, let’s start with the simplest Stage DAG. By clicking the blue “DAG Visualization” button, we can retrieve the DAG of the current Stage, as shown in the following figure.

Image

The reason why we say Stage DAG is simple is that we have already provided a detailed explanation of DAG in the secondary entry of SQL. The Stage DAG is just a subset of the complete DAG on the SQL page, after all, the DAG on the SQL page is for a job. Therefore, as long as we understand the DAG of the job, we naturally understand the DAG of each Stage.

Event Timeline

Beside “DAG Visualization”, above the “Summary Metrics”, there is an “Event Timeline” button. By clicking it, we can obtain the following visual information.

Image

The Event Timeline records the main time spent on different computing stages during the distributed task scheduling and execution processes. Each colored strip in the figure represents a distributed task. The different-colored rectangles represent the computing time of different stages.

To facilitate explanation, I have summarized the meanings and functions of these stages in a table for you, which you can save and refer to at any time.

Image

Ideally, most of the strips should be green (as shown in the figure), indicating that the time consumption of the tasks is mostly execution time. However, this is not always the case in practice. For example, sometimes the blue part occupies a large proportion or the orange part is larger.

In these cases, we can use the Event Timeline to judge whether the job has excessive scheduling overhead or heavy shuffle load, and then optimize different stages accordingly.

For example, if there are many deep blue parts (Scheduler Delay) in the strip, it indicates that the scheduling overhead of the tasks is heavy. At this time, we need to refer to the formula: D / P ~ M / C, to adjust CPU, memory, and parallelism accordingly in order to reduce the scheduling overhead of the tasks. Here, D represents the size of the dataset, P is the parallelism, M is the Executor memory, and C is the number of CPU cores of the Executor. The wave symbol ~ means that values on both sides of the equation should be at the same order of magnitude.

For example, if the yellow (Shuffle Write Time) and orange (Shuffle Read Time) parts in the strip are large, it indicates that the shuffle load of the tasks is heavy. In this case, we need to consider whether it is possible to eliminate Shuffle by using Broadcast Join, in order to alleviate the shuffle burden of the tasks.

Task Metrics After discussing Stage DAG and Event Timeline, let’s move on to the highlight of the Stage Details page: Task Metrics.

Task Metrics is an important part of the page because it provides detailed quantitative indicators at different granularities. Among them, “Tasks” records the execution details of each distributed task, while “Summary Metrics” summarizes the execution details of all tasks. Let’s take a look at the coarse-grained “Summary Metrics” first, and then delve into the fine-grained “Tasks”.

Summary Metrics

Firstly, we click on the “Show Additional Metrics” button and select “Select All” to enable all the metrics, as shown in the following screenshot. The purpose of doing this is to obtain the most detailed task execution information.

Image

After selecting “Select All”, Spark UI displays all the execution details. As before, for the convenience of narration, I have organized these metrics into a table for easy reference. Among them, Task Deserialization Time, Result Serialization Time, Getting Result Time, Scheduler Delay have the same meaning as in the previous table, so I won’t repeat them here. Here, we only organize the newly introduced Task Metrics.

Image

For these detailed Task Metrics, Spark UI provides statistical distributions of different metrics in the form of maximum, minimum (max, min), and percentiles (25th percentile, 50th percentile, 75th percentile). This is very important because these statistical distributions of metrics allow us to quantify the load distribution of tasks very clearly.

In other words, based on the statistical distribution information of different metrics, we can easily determine whether the different tasks of the current job are relatively balanced or there is a significant skewness. If we determine that there is skewness in the computation load, then we need to use AQE’s automatic skew handling to eliminate the imbalance between tasks and improve job performance.

In the above table, half of the metrics are directly related to Shuffle, such as Shuffle Read Size / Records, Shuffle Remote Reads, etc.

We have discussed these metrics in detail when introducing SQL details. In addition, for Duration, GC Time, and Peak Execution Memory, either we have already explained their meanings or they are too simple to require further explanation. Therefore, we won’t spend too many words on these 3 indicators.

What is worth your special attention here are the two metrics: Spill (Memory) and Spill (Disk). Spill refers to the data spilled out due to memory data structure (PartitionedPairBuffer, AppendOnlyMap, etc.) space constraints. Spill (Memory) represents the storage size of this data in memory, while Spill (Disk) represents the size of this data on disk.

Therefore, by dividing Spill (Memory) by Spill (Disk), we can obtain an approximate value of the “data inflation factor”, which we call the Explosion ratio. With the Explosion ratio, we can accurately estimate the storage size of data stored in memory for a piece of data stored on disk, thus accurately understanding the memory consumption of data.

Tasks

After introducing the coarse-grained Summary Metrics, let’s move on to the fine-grained “Tasks”. In fact, many of the metrics in Tasks highly overlap with Summary, as shown in the following screenshot. Similarly, we won’t go into detail about these overlapping metrics since you can refer to the Summary section to understand them. The only difference is that these metrics are measured for each individual task.

Image

As usual, we have organized the newly introduced metrics in Tasks into a table for future reference.

Image

As you can see, there are not many new metrics. The most noteworthy one here is Locality level, which refers to the locality preference of each task. As we mentioned in the scheduling system, each task has its own locality preference. Taking into account this locality preference, the scheduling system will schedule tasks to suitable executors or compute nodes to ensure " data immobility, code mobility “.

Logs and Errors are the three-level entry points of Spark UI. They are the execution logs of tasks and provide detailed runtime status of tasks. Generally, we don’t need to delve into the three-level entry points for debugging. The error messages provided in the Errors column are often enough to quickly locate the problems.

Key Review #

Well, today’s lesson is complete here. In today’s lecture, we studied SQL, Jobs, and Stages, which are the secondary entry points. Each secondary entry point has rich content, and it is very helpful for us to know in advance the information they cover in order to find, inspire, and explore performance optimization ideas.

So far, we have covered all the content related to Spark UI. The metrics involved in Spark UI are diverse and complex, and it is indeed difficult to remember them all at once. Therefore, through this lecture, as long as you understand how to find each level of entry point and know what information each metric can provide us, it is sufficient. Of course, just following me and learning once with your “naked eye” is only the first step. Afterwards, you need to combine your daily development work to explore and experience more. Keep it up!

Lastly, I would like to remind you that since our application is submitted through spark-shell, the Spark UI on port 8080 of the node will always display the “health report” of the application. After we exit spark-shell, the memory on port 8080 of the node will also disappear (404 Page not found).

To view the “health report” of the application again, you need to go to port 18080 of the node. This is the territory of the Spark History Server. It collects the “health reports” of all (completed) applications and displays them in the same Spark UI form. Remember, remember!

Practice for Every Lesson #

Today’s thinking exercise requires you to think outside the box. After learning about Spark UI, please discuss the various ways to identify data skewness problems.

Feel free to share your insights and experiences with using Spark UI in the comments section after class. Let’s discuss and progress together! I also encourage you to share this lesson with colleagues and friends.