09 Optimization at a Glance Quick Reference Manual to Improve Efficiency by Halving the Effort Part 1

09 Optimization at a Glance - Quick Reference Manual to Improve Efficiency by Halving the Effort Part 1 #

Hello, I’m Wu Lei.

When it comes to performance tuning in Spark, application development and configuration settings are the two most important and commonly used entry points. However, in our day-to-day tuning work, whenever we need to start from configuration options to find tuning ideas, opening the Configuration page on the Spark official website presents us with hundreds of options. Some of them require setting True or False, while others need specific values to be used. This often leaves us confused and unsure of where to start.

So I often think, if there could be a Spark configuration options manual that categorically records the options relevant to performance tuning, it would definitely save us a lot of trouble.

Therefore, in the next two lectures, let’s gather together to compile this manual. This manual will allow you to quickly locate the configuration options that you may need when searching for tuning ideas. It not only provides a clear structure but also ensures that nothing is missed, truly saving time and effort!

Classification of Configuration Items #

In fact, there are only a few configuration items that can significantly affect execution performance, not to mention that in the Spark distributed computing environment, the computation workload is mainly handled by Executors, and the Driver is mainly responsible for distributed scheduling. With limited optimization space, we do not consider the configuration items on the Driver side. The configuration items we want to summarize revolve around Executors. With previous practical experience and a comprehensive review of the official website’s complete set of configuration items, I divided them into three categories: hardware resource category, Shuffle category, and Spark SQL category.

Why do we divide them like this? Let’s explain them one by one.

Firstly, the hardware resource category includes configuration items related to CPU, memory, and disk. As we mentioned before, the entry point for optimization is the bottleneck. One effective method to locate the bottleneck is to observe the load and consumption of a certain type of hardware resource from a hardware perspective. If the load and consumption of a certain type of hardware significantly exceed others, the optimization process tends to converge to a state where all hardware resources are balanced and there are no bottlenecks. Therefore, mastering the configuration items related to hardware resources is vital. Proper configuration of these items determines whether the application can break through bottlenecks and achieve balanced utilization of different hardware resources.

Secondly, the Shuffle category is specifically for Shuffle operations. In the vast majority of scenarios, Shuffle is a performance bottleneck. Therefore, we need to summarize the configuration items that affect the Shuffle computation process. At the same time, optimizing Shuffle is the most difficult part, so summarizing Shuffle configuration items can help us narrow down the search range during optimization, saving time.

Finally, Spark SQL has evolved into a new generation of underlying optimization engines. Whether in sub-frameworks such as Streaming, Mllib, Graph, or in PySpark, as long as you use the DataFrame API, Spark will use Spark SQL for unified optimization at runtime. Therefore, we need to identify a category of configuration items to fully leverage the inherent performance advantages of Spark SQL.

We have repeatedly emphasized that balancing hardware resources is the key to performance optimization. So in today’s lecture, let’s start with the hardware resource category and summarize the configuration items that should be set. In this process, I will help you understand the definition and purpose of these configuration items, as well as what problems their settings can solve, laying a foundation for resource balancing. In the next lecture, we will discuss the Shuffle category and the Spark SQL category.

First, let’s talk about the configuration options related to CPU, which mainly include spark.cores.max, spark.executor.cores, and spark.task.cpus. They specify the number of CPUs allocated for computation at different granularities: cluster, executor, and task. Developers can use these parameters to determine how many CPU resources are allocated to Spark for distributed computing.

To fully utilize each CPU allocated to the Spark cluster, specifically each CPU core, you need to set the matching parallelism. The parallelism can be set using the parameters spark.default.parallelism and spark.sql.shuffle.partitions. For RDDs without explicit partitioning rules, we use spark.default.parallelism to define their parallelism, while spark.sql.shuffle.partitions is used to specify the number of partitions for data association or aggregation operations.

When talking about parallelism, we cannot ignore parallel computing tasks. These two concepts are closely related but have different meanings, and many students often confuse them.

Parallelism refers to how distributed datasets are divided, thus used for distributed computing. In other words, the starting point of parallelism is data, and it specifies the granularity of data distribution. The higher the parallelism, the finer the granularity of the data, the more data shards, and the more scattered the data. Therefore, concepts such as partition number, shard number, and partitions are synonyms for parallelism.

Parallel computing tasks, on the other hand, refer to the number of tasks that can be computed simultaneously in the entire cluster at any given moment. In other words, its starting point is the computational task, or the CPU, and it is determined by three parameters related to the CPU. Specifically, the upper limit of parallel computing tasks in each Executor is the quotient of spark.executor.cores and spark.task.cpus, denoted as #Executor-tasks. The total number of parallel computing tasks in the entire cluster is then the product of #Executor-tasks and the number of Executors in the cluster, denoted as #Executors. Therefore, the final value is: #Executor-tasks * #Executors.

As we can see, parallelism determines data granularity, data granularity determines partition size, and partition size determines the memory consumption of each computational task. In the same Executor, multiple concurrently running computational tasks “basically” share the available memory equally, and each computational task has an upper limit on the memory it can obtain. Thus, the number of parallel computing tasks will in turn restrict the setting of parallelism. You see, these two are truly a pair of antagonistic enemies!

As for how to balance the relationship between parallelism and parallel computing tasks, we will discuss it in later courses. For now, let’s just remember the meanings, differences, and roles of configuration options related to CPU settings.

After talking about CPU, let’s move on to the configuration options related to memory management. We know that in terms of memory management, Spark is divided into on-heap memory and off-heap memory.

Off-heap memory is further divided into two areas: Execution Memory and Storage Memory. In order to enable off-heap memory, we need to set the parameter “spark.memory.offHeap.enabled” to true and specify the size of off-heap memory using “spark.memory.offHeap.size”. On-heap memory is also divided into four areas: Reserved Memory, User Memory, Execution Memory, and Storage Memory.

There are five basic configuration options related to memory, and their meanings are summarized in the following table:

Configuration Option Meaning
spark.driver.memory Amount of memory to be allocated for the driver
spark.executor.memory Amount of memory to be allocated per executor
spark.memory.fraction Fraction of JVM heap space to be used for Spark
spark.memory.storageFraction Fraction of JVM heap space to be used for cached data
spark.memory.offHeap.size Amount of off-heap memory to be allocated

Simply put, these configuration options determine the size of the memory areas mentioned earlier, which is easy to understand. However, many students have various doubts when it comes to actually setting the size of memory areas, such as:

  • How much memory should be allocated to on-heap and how much should be allocated to off-heap?
  • Within the on-heap memory, how should we balance the User Memory and the memory used by Spark for computation?
  • Under the unified memory management mode, how should we balance the Execution Memory and Storage Memory?

Don’t worry, we’ll solve these questions one by one.

Balancing on-heap and off-heap memory #

Compared to on-heap memory in the JVM, off-heap memory has many advantages, such as more accurate memory usage statistics, no need for garbage collection, and no need for serialization and deserialization. You might say, “Since off-heap memory is so good, why not allocate all the memory to it?” But let’s not jump to conclusions just yet, let’s look at an example together.

Let’s say we have a user table that stores user data, with each data entry consisting of four fields: an integer user ID, a string name, an integer age, and a character gender. If you were asked to store each user record using a byte array, how would you do it?

Together, let’s do it step by step. First, except for the name field, all the other three fields are fixed-length data types, so they can be directly inserted into the byte array. For variable-length data types like String, since we do not know the length of each user’s name in advance, we can only resort to a workaround: first record the offset of the name field within the entire byte array, then record its length, and finally insert the complete name string at the end of the byte array, as shown in the following diagram.

Therefore, when the data schema becomes complex, the cost of Spark directly managing off-heap memory will be very high.

So, how do we balance the division of JVM heap memory and off-heap memory with limited memory resources? I believe you already have the answer. For data sets that have a relatively flat data schema with mostly fixed-length data types, it is better to use off-heap memory more. On the other hand, if the data schema is complex with nested structures or many variable-length fields, it is more appropriate to use JVM heap memory.

How is User Memory allocated with available Spark memory? #

Next, let’s talk about User Memory. We all know that the spark.memory.fraction parameter determines the percentage of Spark’s available memory within the entire heap. In other words, it determines how much memory is available for Spark’s consumption within the heap. Accordingly, 1 - spark.memory.fraction is the percentage of User Memory within the heap.

Therefore, the spark.memory.fraction parameter determines how heap memory is divided between the two. The larger the coefficient of spark.memory.fraction, the more memory Spark has available, and the smaller the proportion of User Memory. The default value of spark.memory.fraction is 0.6, which means 60% of the JVM heap space is allocated to Spark, leaving 40% for User Memory.

So, what is User Memory used for? Is it necessary to reserve such a large space? Simply put, User Memory mainly stores developer-defined data structures, which are often used to assist in processing distributed datasets.

For example, do you remember the Label Encoding example in the chapter on task scheduling?

/**
Method 2
Input parameters: template file path, user interest string
Return value: index value corresponding to the user interest string
*/

// Function definition
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)

// Function call in the dataset
partFunc("Sports-Basketball-NBA-Lakers")

In this example, we first read the template file containing user interests, then construct a mapping dictionary from interests to indices based on the template content. When performing Label Encoding on billions of samples, this dictionary can quickly look up interest strings and return the corresponding index to assist in data processing. Such mapping dictionaries are referred to as custom data structures, and this data is stored in the User Memory area.

Therefore, when balancing Spark’s available memory and User Memory within the JVM, you need to consider whether your application has a significant proportion of such custom data structures. Then, adjust the relative proportions of the two memory areas accordingly. If there are only a few custom data structures in your application, you can increase the spark.memory.fraction configuration to allow Spark to have more memory space for distributed computing and caching distributed datasets.

How to balance Execution Memory and Storage Memory? #

Lastly, let’s talk about balancing Execution Memory and Storage Memory. In the lecture on memory management, I told you a story about a landlord named Huang Silang and his rental agreements, and I used this story to illustrate the competitive relationship between execution memory and cached memory. Execution tasks and RDD caching share the available memory in Spark, but execution tasks have higher priority when it comes to preemption.

Therefore, generally speaking, in the unified memory management mode, the setting of spark.memory.storageFraction is not so important because regardless of the value of this parameter, execution tasks still have the chance to preempt the cached memory. And once the preemption is completed, the memory will not be released until the task execution finishes.

However, there is an exception. If your application is “cache-intensive,” such as machine learning training tasks, it is necessary to adjust this parameter to ensure the full caching of data. This type of computation often requires repeated traversal of the same distributed dataset, and whether the data is cached or not plays a decisive role in the execution efficiency of the task. In such cases, you can increase the value of the spark.memory.storageFraction parameter and consciously fill the cache at the beginning of the application, and then perform the calculation based on the cached data to implement the business logic of the computation.

But in this process, you need to pay special attention to balancing RDD caching and execution efficiency. Why is that?

Firstly, when the memory space occupied by RDD cache increases, the memory space available for executing distributed computing tasks in Spark naturally decreases. Moreover, common operations in data analysis scenarios, such as join, sort, and aggregation, consume execution memory. When the execution memory space decreases, it naturally affects the efficiency of these types of computations.

Secondly, the introduction of a large amount of cache increases the burden of garbage collection (GC), which is a huge hidden danger to execution efficiency.

Do you still remember the land rented by Huang Silang, which was divided into managed fields and self-managed fields? The managed fields were taken care of by Huang Silang’s appointed people, who were responsible for turning the soil and leveling it after the autumn harvest, in preparation for the next year’s crops. The garbage collection in the heap memory works in a similar way. The JVM generally divides the heap memory into the young generation and the old generation. The young generation stores objects with short lifespans and low reference counts, while the old generation stores objects with long lifespans and high reference counts. Therefore, data like RDD cache that is continuously stored in memory will be arranged by the JVM in the old generation.

Garbage collection in the young generation is called Young GC, and garbage collection in the old generation is called Full GC. Whenever the available memory space in the old generation is insufficient, the JVM triggers Full GC. During Full GC, the JVM preempts the application’s execution threads and forcibly takes all the CPU threads in the compute node, so to speak, “concentrating efforts to accomplish great things.” When all CPU threads are used for garbage collection, the execution of the application can only be temporarily suspended. Only after Full GC is finished and the CPU threads are released, can the application continue to execute. This phenomenon, in which the execution of the application is paused because CPU threads are preempted by Full GC, is called “Stop the World.”

Therefore, Full GC is far more detrimental to the application than Young GC, and the efficiency of GC is inversely proportional to the number of objects. The more objects there are, the worse the GC efficiency becomes. At this point, for data cached in the old generation like RDD, it is easy to introduce Full GC problems.

Generally speaking, in order to improve the access efficiency of RDD cache, many developers choose to cache data in memory using object values because storing data as object values avoids the computational overhead of serialization and deserialization during data access. By default, when we call the cache method on RDD, DataFrame, or Dataset, this type of storage method is used.

However, caching data using object values, whether it’s RDD, DataFrame, or Dataset, means that each data sample constitutes an object, either a user-defined case class or a Row object. In other words, the number of objects stored in the old generation is basically equal to the number of your samples. Therefore, when the number of your samples reaches a certain scale, you need to consider the Full GC problems that may be introduced by a large amount of RDD cache.

Based on the analysis above, we can see that before deciding to use a large amount of memory space for RDD cache, you need to weigh the impact it may have on execution efficiency.

You may ask, “My application is cache-intensive, and I really need to cache data. Is there a way to balance execution efficiency?” There are indeed some methods.

Firstly, you can abandon the object value caching method and use the serialization caching method, which converts multiple objects into a byte array. This way, the problem of the number of objects can be initially alleviated.

Secondly, we can adjust the spark.rdd.compress parameter. By default, RDD cache is not compressed, but enabling compression can significantly improve the storage efficiency of the cache, effectively saving the memory occupied by the cache and leaving more memory space for distributed task execution.

Through these two types of adjustments, developers can enjoy the RDD data access efficiency while effectively considering the overall execution efficiency of the application, achieving the best of both worlds. However, nothing comes without cost. Although these two types of adjustments optimize the usage efficiency of memory, they come at the expense of introducing additional computational overhead and sacrificing CPU. This is why we have always emphasized that the process of performance tuning is essentially a process of constantly balancing the consumption of different hardware resources.

In the lecture on storage systems, we briefly mentioned the configuration parameter spark.local.dir. This parameter allows developers to set the disk directory used to store RDD cache data blocks and intermediate files in Shuffle.

Usually, spark.local.dir is configured to a file system with ample capacity in the local disk because a large number of temporary files will be stored in this directory. Sufficient storage capacity is needed to ensure the stability of distributed task computation. However, if you have sufficient funding and the ability to provide an adequate amount of SSD storage or even more memory resources in the compute nodes, you can add the file system directory on SSD or the memory file system to the spark.local.dir configuration item to improve I/O performance.

Summary #

Mastering the configuration options for hardware resources is our secret weapon to break performance bottlenecks and balance the utilization of different hardware resources. Specifically, we can divide it into two steps.

Step 1: Understand the performance configuration options for CPU, memory, and disk, as well as their meanings. Therefore, I have summarized the meanings of the configuration options for hardware resources in a table, which is convenient for you to check at any time. With this manual, you will be able to optimize the configuration of hardware resources without missing anything.

Step 2: Focus on understanding the functions of these configuration options and the problems they can solve.

First, for CPU configuration options, we need to understand the difference between parallelism and the number of parallel computing tasks. Parallelism starts from the perspective of data and clarifies the granularity of data partitioning. The higher the parallelism, the finer the data granularity, the more scattered the data, and the more fully the CPU resources are utilized. However, we should also be aware of the scheduling system overhead caused by excessively fine data granularity.

On the other hand, the number of parallel computing tasks is different. It starts from the perspective of computation and emphasizes the ability and capacity of distributed clusters to process in parallel at any given time. Parallelism and the number of parallel computing tasks affect and restrict each other.

Second, for memory configuration options, we need to know how to set them to balance different memory regions. Here, we mainly need to understand three questions:

  1. When balancing off-heap and on-heap memory, we need to focus on the data pattern. If the data pattern is relatively flat and there are many fixed-length fields, we should use more off-heap memory. On the contrary, if the data pattern is more complex, we should use more on-heap memory.
  2. When balancing the manageable memory and User memory, we need to focus on the custom data structures in the application. If there are many data structures, we should reserve enough User memory space. On the contrary, if there are fewer data structures, we should allocate more available memory resources to Spark.
  3. When balancing Execution memory and Storage memory, if RDD caching is a must, we increase the value of spark.memory.storageFraction and prioritize caching the data, and then apply the calculation logic to the cached data. In addition, we can also adjust spark.rdd.compress and spark.memory.storageFraction simultaneously to mitigate the impact of Full GC.

Daily Practice #

  1. What are the disadvantages of setting parallelism too high?
  2. What built-in data structures in Spark can make full use of off-heap memory resources during the shuffle process?
  3. Can you think of any other factors that need to be considered when choosing between off-heap and on-heap memory?
  4. If there are sufficient memory resources, what ways can be used to create a memory file system for configuring the spark.local.dir parameter?

Looking forward to seeing your thoughts and answers in the comments. Feel free to share this hardware resource configuration manual with more friends. See you in the next lecture!