15 Memory Perspective I How to Maximize Memory Usage Efficiency

15 Memory Perspective I - How to Maximize Memory Usage Efficiency #

Hello, I am Wu Lei.

In the previous lecture, we discussed that in order to improve CPU utilization, it is important to allocate execution memory properly. However, execution memory is only a part of Spark’s memory partition. Therefore, in order to allocate execution memory properly, we must first divide all memory areas of Spark properly.

In practical application development, many colleagues around me have complained: “I understand the principles of dividing different memory areas in Spark, but I don’t know how to set the sizes of different memory areas. I’m always in a dilemma. In the end, I just kept the default values for all memory-related configurations.”

This situation where principles and practices cannot be combined is very common. So in today’s lecture, starting from the familiar example of Label Encoding, I will guide you step by step to analyze different adjustment methods for different memory areas in different situations, helping you summarize the general steps to maximize memory utilization efficiency. This way, when you adjust the memory, you can follow the needs of the application and proceed systematically.

Starting from an example #

Let’s start by reviewing the Label Encoding discussed in Lesson 5. In the business scenario of Label Encoding, we need to encode user interest features. Based on the interest strings and their index positions in the template, our task is to convert the user interests in billions of samples into corresponding index values. The content of the template file is as follows.

// Template file
// User Interests
Sports-Basketball-NBA-Lakers
Military-Weapons-Rifle-AK47

The code implementation is as follows. Please note that this code is an optimized version from Lesson 5.

/**
Input parameters: template file path, user interest string
Return value: index value corresponding to the user interest string
*/
// Function definition
val findIndex: (String) => (String) => Int = {
(filePath) =>
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
(interest) => searchMap.getOrElse(interest, -1)
}
val partFunc = findIndex(filePath)

// Function call in Dataset
partFunc("Sports-Basketball-NBA-Lakers")

Now, let’s review the implementation logic of the code, analyze the performance bottlenecks it currently faces, and discuss methods to optimize it.

First, the main logic of the findIndex function is relatively simple, which is to read the template file, build a mapping in the form of a Map, and search for the user interest and return the index. However, the findIndex function is defined as a higher-order function. As a result, when calling this higher-order function with the template file as an argument, we will get a scalar function named partFunc that has an embedded Map lookup dictionary. Finally, we call partFunc on billions of samples to perform the data conversion. By using higher-order functions, we avoid the inefficient approach of having each Task in the Executor read the template file and build the Map dictionary from scratch.

At runtime, this function will be encapsulated into Task after Task in the Driver, and then the Driver will distribute these Tasks to the Executor. Upon receiving a task, the Executor dispatches it to the thread pool for execution (the content of the scheduling system can be reviewed in Lesson 5). At this point, each Task is like an airplane carrying the code “passengers” and data “luggage”, flying from the Driver to the Executor. After landing at the Executor airport, the code “passengers” take taxis or airport shuttles to the JVM stack, while the data “luggage” is stored in the JVM Heap, which is commonly referred to as heap memory.

Reviewing the findIndex function in Label encoding, it is easy to notice that most of it is code “passengers”, and the only data “luggage” is a Map dictionary named searchMap. This kind of user-defined data structure consumes memory in the User Memory area of the heap (the Spark memory region division can be reviewed in Lesson 7).

Performance bottlenecks of User Memory #

At this point, do you think the findIndex function has any performance bottlenecks? Please think about it first and then read my analysis below.

The answer is, of course, “yes.” First, each airplane carries such a large piece of data “luggage”, naturally requiring more “fuel”, which refers to the network overhead introduced during Task distribution. Second, because each airplane, after landing at the Executor’s “passenger luggage area” User Memory, will store the same data “luggage”, User Memory needs to ensure that there is enough space to store the luggage of all passengers, which is a large amount of repeated data.

Then, how much memory space does User Memory need to prepare? Let’s do some calculations. The calculation is not difficult, we only need to multiply the number of flights by the luggage size.

User-defined data structures are often used to assist functions in completing computational tasks. Therefore, once the function execution is complete, the lifecycle of the data structure it carries also comes to an end. Therefore, when counting the number of Tasks, we don’t need to be concerned about how many Tasks an Executor needs to process in total, but only need to focus on the number of Tasks it can process in parallel at the same time, which is the size of the Executor’s thread pool.

As mentioned before, the size of the Executor’s thread pool is determined by the quotient of spark.executor.cores and spark.task.cpus (spark.executor.cores/spark.task.cpus). Let’s temporarily denote this quotient as #threads.

Next, we need to estimate the size of the data “luggage”. Since the searchMap is not a distributed dataset, we don’t need to use the approach of caching first and then extracting Spark execution plan statistics. For Java data structures like this, we can estimate the data storage size using regular Java methods in the REPL, and let’s denote the estimated searchMap size as #size.

Alright! Now we can calculate that User Memory needs at least #threads * #size memory space to support distributed task computation. However, for User Memory, using #threads * #size space to redundantly store the same data itself reduces memory utilization. So how can we reduce the memory consumption of #threads * #size?

Performance Optimization #

After learning about broadcast variables, I’m sure you have some ideas in mind. Yes, we can try to use broadcast variables to optimize the code in the example.

If we take a closer look at the findIndex function, we can see that there are two key calculation steps. First, reading the template file and creating a mapping dictionary; second, searching the dictionary with the given string and returning the search result. Obviously, the core requirement of converting billions of samples lies in the second step. In that case, we can encapsulate the created map dictionary into a broadcast variable and distribute it to various Executors.

With the help of broadcast variables, Task airplanes going to the same Executor no longer need to carry their own “baggage” of data. These large items of luggage will be sent to the Executors by the “Federated Broadcast Express” cargo planes. There is a dedicated cargo route between the Driver and each Executor. Now let’s take a look at the optimized code:

/**
Broadcast variable implementation
*/
// Define the broadcast variable
val source = Source.fromFile(filePath, "UTF-8")
val lines = source.getLines().toArray
source.close()
val searchMap = lines.zip(0 until lines.size).toMap
val bcSearchMap = sparkSession.sparkContext.broadcast(searchMap)

// Accessing the broadcast variable in the Dataset
bcSearchMap.value.getOrElse("体育-篮球-NBA-湖人", -1)

The implementation of the above code is simple: the first step is still to read the template file and create a map dictionary; the second step is to encapsulate the map dictionary into a broadcast variable. With this approach, when converting billions of samples, we directly access the content of the broadcast variable through bcSearchMap.value, and then use the getOrElse method of the map dictionary to obtain the index value corresponding to the user’s interest.

Compared to the initial implementation, the changes in the second implementation are relatively small. So, how does this improved code affect memory consumption?

We can see that the “passengers” on the Task airplanes have been changed! Previously, the Task airplanes needed to carry the findIndex function, but now they have been replaced with an “anonymous passenger”: an anonymous function that reads the broadcast variable and calls its getOrElse method. Since this “anonymous passenger” entrusts the large luggage to the “Federated Broadcast Express” special cargo plane, after the Task airplanes land, there is no “luggage” to be stored in User Memory. In other words, the optimized version does not occupy the User Memory area, so the memory consumption of #threads * #size in the first implementation can be avoided.

Storage Memory Planning #

In this way, the original memory consumption is transferred to the broadcast variable. However, broadcast variables also consume memory. Will this bring new performance issues? Let’s take a look at which memory area the broadcast variable consumes.

In the lecture on storage systems, we mentioned that the Spark storage system mainly consists of three service objects: Shuffle intermediate files, RDD cache, and broadcast variables. They are managed by the BlockManager on the Executor, and the storage of data in memory and disk is abstracted and encapsulated by the MemoryStore and DiskStore of the BlockManager.

Therefore, the data carried by the broadcast variable is materialized in the MemoryStore and provides a unique copy of data for all tasks at the granularity of the Executor. The memory consumption caused by MemoryStore is charged to the Storage Memory. Therefore, the memory consumed by the broadcast variable is the Storage Memory.

Next, let’s calculate how much memory the second implementation will consume. Since the distribution and storage of broadcast variables are done on a per-Executor basis, the amount of memory consumed by each Executor is the size of one copy of the searchMap data. We just calculated the size of searchMap, which is #size.

Now that we have determined the specific memory consumption of the Storage Memory area, we can adjust the relevant memory configuration settings based on the formula: (spark.executor.memory – 300MB) * spark.memory.fraction * spark.memory.storageFraction.

Two steps of memory planning #

Now, we have quantitatively analyzed the consumption and utilization of different memory areas in two different code implementations. By having a clear understanding of these consumptions, we can adjust the relevant configuration parameters accordingly in order to maximize the memory utilization. Following this approach, we need to follow two steps to achieve the goal:

  • Estimate memory consumption
  • Adjust memory configuration options

Let’s take the heap memory as an example to explain how to operate the two steps of memory planning. As we all know, the heap memory is divided into four regions: Reserved Memory, User Memory, Storage Memory, and Execution Memory. The reserved memory is fixed at 300MB and we don’t need to consider it. We need to plan for the other three regions.

Estimate memory consumption #

Firstly, let’s talk about the estimation of memory consumption, which mainly consists of three steps.

Step 1: Calculate the memory consumption of User Memory. We first consolidate the custom data structures in the application and estimate the total size of these objects as #size. Then, we multiply #size by the size of the Executor’s thread pool to get the memory consumption of User Memory, which is denoted as #User.

Step 2: Calculate the memory consumption of Storage Memory. We consolidate the broadcast variables and distributed dataset caches involved in the application and estimate the total size of these two types of objects as #bc and #cache respectively. Additionally, we define the total number of Executors in the cluster as #E. Thus, the formula to calculate the memory consumption of Storage Memory in each Executor is #Storage = #bc + #cache / #E.

Step 3: Calculate the memory consumption of Execution Memory. Based on the knowledge acquired in the previous lesson, we know that the memory consumption of Execution Memory is influenced by multiple factors. The first factor is the size of the Executor’s thread pool, denoted as #threads. The second factor is the size of data partitions, which depends on the size of the dataset, #dataset, and the parallelism, #N. Therefore, the formula to calculate the memory consumption of Execution Memory in each Executor is #Execution = #threads * #dataset / #N.

Adjust memory configuration options #

After obtaining the estimated sizes of these three memory regions, #User, #Storage, and #Execution, it becomes a straightforward task to adjust the corresponding memory configuration options (as can be seen from the formula (spark.executor.memory – 300MB) * spark.memory.fraction * spark.memory.storageFraction). Here, the adjustment can be divided into three steps.

Firstly, according to the definition, spark.memory.fraction can be calculated by the formula (#Storage + #Execution) / (#User + #Storage + #Execution).

Similarly, the value of spark.memory.storageFraction should refer to (#Storage) / (#Storage + #Execution).

Lastly, for the setting of the total heap memory size of the Executor, spark.executor.memory, we naturally need to consider the total consumption of the four memory regions, which is 300MB + #User + #Storage + #Execution. However, we need to note that the premise for calculating this formula is that the proportion of different memory regions is consistent with the consumption of different types of data.

In summary, in the two steps of memory planning, the first step of estimating the memory proportions of different regions is particularly crucial because the adjustment of parameters in the second step completely depends on the estimation results of the first step. If you follow these two steps to set the relevant memory configuration options, I believe your application will be able to fully utilize different memory regions at runtime, avoiding memory wastage caused by improper parameter settings, thereby improving the overall memory utilization.

Summary #

Reasonably partitioning all memory regions of Spark is the foundation for simultaneously improving CPU and memory utilization. Therefore, it is important to master memory planning. In today’s lecture, we summarize memory planning into two steps.

The first step is estimating memory usage.

  • Calculate the memory consumption of the User Memory region using the formula: #User = #size multiplied by the size of the Executor thread pool.
  • Calculate the memory consumption of the Storage Memory region in each Executor using the formula: #Storage = #bc + #cache / #E.
  • Calculate the memory consumption of the Execution Memory region using the formula: #Execution = #threads * #dataset / #N.

The second step is adjusting memory configuration: Based on the estimated sizes of the three memory regions, #User, #Storage, and #Execution, adjust all configuration items involved in the formula (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction.

  • The value of spark.memory.fraction can be calculated using the formula (#Storage + #Execution) / (#User + #Storage + #Execution).
  • The value of spark.memory.storageFraction should refer to (#Storage) / (#Storage + #Execution).
  • The setting of spark.executor.memory can be obtained using the formula 300MB + #User + #Storage + #Execution.

Here, I would like to add a few more words. The two-step memory planning process is ultimately a means to ensure that the proportions of different memory regions align with the consumption of different types of data, thus maximizing memory utilization.

Daily Practice #

  1. Do you know what methods there are to estimate the storage size of Java objects? What are the advantages and disadvantages of different methods?
  2. For the first step in memory planning, it is time-consuming and laborious to accurately estimate the memory consumption of each region at runtime, and the cost of tuning is high. If we want to skip the accurate calculation of the first step, do you know any methods to roughly and quickly estimate the consumption proportion of different memory regions?

Looking forward to seeing your thoughts and answers in the comments section, see you in the next lecture!