07 Memory Management Fundamentals How Spark Efficiently Utilizes Limited Memory Space

07 Memory Management Fundamentals - How Spark Efficiently Utilizes Limited Memory Space #

Hello, I am Wu Lei.

For memory computing engines like Spark, memory management and utilization are crucial. Only by fully utilizing memory can the execution performance reach its optimal level.

So, do you know how Spark uses memory? What is the relationship between different memory regions, and how are they divided? In today’s lecture, I will combine an interesting story to discuss the basic knowledge of Spark memory management in depth.

Memory Management Modes #

In terms of memory management, Spark distinguishes between On-heap Memory and Off-heap Memory. Here, “heap” refers to the JVM Heap, so On-heap Memory is actually the heap memory of the Executor JVM, while Off-heap Memory refers to memory space that is directly allocated and deallocated from the operating system using the Java Unsafe API, similar to C++.

In the case of On-heap Memory, the allocation and deallocation are handled by the JVM. For example, when Spark needs memory to instantiate objects, the JVM is responsible for allocating space from the heap and creating the objects, then returning the references to the objects. Spark will store the references while recording the memory consumption. Similarly, when Spark requests to delete objects, the available memory will be recorded, and the JVM will mark such objects for deletion. Then, through the Garbage Collection (GC) mechanism, the objects will be cleared and the memory will be released.

In this management mode, Spark’s memory release has some delay. Therefore, when Spark tries to estimate the currently available memory, it is likely to overestimate the available memory in the heap.

On the other hand, Off-heap Memory is different. Spark directly allocates and deallocates memory space in the operating system memory by invoking the allocateMemory and freeMemory methods in Unsafe, which is now more similar to how memory is managed in C++. This memory management mode naturally does not require a garbage collection mechanism, eliminating the frequent scanning and performance overhead introduced by it. More importantly, the allocation and deallocation of space can be accurately calculated. Therefore, Spark’s estimation of the available off-heap memory will be more accurate, and the utilization of memory will be more reliable.

To help you understand this process more easily, let me tell you a little story.

Renting Land by Landlord (Part 1): Land Division #

A long time ago, at the foot of the Yan Mountains, there was a small village. In the village, there was a landlord named Huang Silang, who owned thousands of acres of fertile land, which stretched for hundreds of miles. Huang Silang lived an idle and comfortable life and naturally would not personally go to work in the fields. However, he couldn’t just leave so much land barren. So, he came up with a plan to make money without getting his hands dirty: renting!

Although Huang Silang was lazy, he was quite good at management. He divided the land into two parts: “Managed Land” and “Self-managed Land”.

We know that after a bountiful harvest, the land needs to be plowed, leveled, and dried, so that it can be planted with crops the following year. Therefore, the Managed Land refers to the land where Huang Silang sends someone to take care of the tedious tasks like plowing and leveling after a harvest, so the tenants don’t have to worry about it. On the other hand, Self-managed Land means that the tenants have to grow their crops by themselves and take care of the land after the autumn harvest.

Undoubtedly, for tenants, the Managed Land is less troublesome, while the Self-managed Land is more troublesome. Of course, compared to Self-managed Land, the rent for Managed Land is naturally higher.

In this story, Huang Silang’s Managed Land represents the On-heap Memory in memory management, while the Self-managed Land represents the Off-heap Memory. The tasks of plowing and leveling the land in the story are actually equivalent to the GC in the JVM. Doesn’t this analogy make it easier to understand?

Memory Partitioning #

Now that we have some background information, let’s get back to the topic of memory management in Spark. We now know that Spark has two memory management modes: on-heap and off-heap. How does Spark partition memory regions?

Let’s start with off-heap memory. Spark divides off-heap memory into two areas: one for executing distributed tasks such as Shuffle, Sort, and Aggregate operations, called Execution Memory; and one for caching RDDs and broadcast variables, called Storage Memory.

The partitioning of on-heap memory is similar to off-heap memory. Spark also divides on-heap memory into two parts for execution and caching. In addition, Spark also partitions a portion of the on-heap memory called User Memory for storing custom data structures.

In addition, Spark also reserves a small amount of on-heap memory called Reserved Memory for storing various internal objects of Spark, such as BlockManager and DiskBlockManager in the storage system.

In terms of performance tuning, we have more room for improving the utilization of the first three memory partitions, as they are mainly consumed by business applications, namely Execution Memory, Storage Memory, and User Memory. On the other hand, we cannot touch the reserved memory because it only serves Spark’s internal objects and is not used by business applications.

Okay, I have summarized the partitioning and calculation of different memory regions in the table below for your reference.

Memory Partition Purpose
Execution Memory Used for executing distributed tasks
Storage Memory Used for caching RDDs and broadcast variables
User Memory Used for storing custom data structures
Reserved Memory Used for storing internal Spark objects

Execution and Storage Memory #

Among all memory regions, the most important ones are undoubtedly Storage Memory and Execution Memory. The two layers of memory computing refer to dataset caching and pipelined computation within a stage, corresponding to Storage Memory and Execution Memory, respectively.

Before Spark 1.6, the space allocation for Execution Memory and Storage Memory was static, meaning that once the space was allocated, the purposes of the different memory regions were fixed. This means that even if you didn’t cache any RDDs or broadcast variables, the idle memory in the Storage Memory region couldn’t be used for the mapping, sorting, or aggregation operations in Shuffle. As a result, valuable memory resources were wasted.

To address the potential waste of static memory allocation, Spark introduced the Unified Memory Management mode after version 1.6. Unified Memory Management means that the Execution Memory and Storage Memory can be converted between each other. Although the initial sizes of the two regions are determined by the configuration option spark.memory.storageFraction, at runtime, the Storage Memory region may be used for task execution (e.g., Shuffle), and Execution Memory may also store RDD caches.

However, as we all know, execution tasks have a higher priority in terms of memory contention compared to caching tasks. Have you ever wondered why? Next, let’s explore the deeper reasons behind this with an investigative mindset.

First of all, execution tasks can be divided into two categories: one involves data transformation, mapping, sorting, aggregation, and merging during the Shuffle Map stage, and the other involves data sorting and aggregation during the Shuffle Reduce stage. These tasks involve data structures that consume execution memory.

Let’s assume that execution tasks and caching tasks follow the principles of “justice, fairness, and openness” in terms of memory contention. This means that whenever one side occupies the memory of the other, it will immediately release it when the other side needs it. For example, if the preset ratio is initially 50:50, but because caching tasks are located later in the application, execution tasks occupy 80% of the memory space first. When caching tasks catch up, execution tasks need to free up 30% of the memory space for caching tasks.

What happens in this situation? Let’s assume that there are a total of 80 CPUs in the cluster, meaning that the parallel computing capacity of the cluster at any given time is 80 distributed tasks. In the case where 80% of the memory is occupied, the 80 CPUs can be fully utilized, and the computational load of each CPU is relatively full. After completing one task, it can move on to the next task.

However, because 30% of the memory needs to be returned to caching tasks, it means that there are 30 parallel execution tasks that have no memory available. In other words, 30 CPUs are constantly in I/O wait state and unable to work! Valuable CPU computing resources are wasted, which is a great loss. Therefore, compared to caching tasks, the execution tasks must have a higher preemption priority. After saying so much, why do we need to understand the reasons behind it? I believe that only by understanding the logic behind preemption priority can we understand why we need to adjust the CPU and memory configurations at the same time, and only then can we achieve synergy and balance between different hardware resources. This is also the ultimate goal we want to achieve in performance tuning.

However, even if the execution tasks have a higher preemption priority, they must still follow certain rules when preempting memory. So, what are these rules specifically? Next, let’s continue with the story of the landlord renting out the land to talk about the interesting rules between Execution memory and Storage memory.

Landlord Renting the Land (Part 2): Rental Agreement #

Not long after Huang Silang put up the rental notice, two young and strong men in the village came to rent the farmland. One is named Huang Xiaoyi, a distant relative of Huang Silang, who recently came to seek refuge with Huang Silang. The other is named Zhang Mazhi, although he comes from a poor farming family, his life has been improving. Zhang Mazhi plans to rent the land to grow crops such as wheat and corn. Huang Xiaoyi has different plans, he is quite business-minded, and he wants to rent the land to grow cash crops such as cotton and coffee.

Both men are eager to make a career and want to rent all of Huang Silang’s land! With no worries about finding tenants, Huang Silang is naturally full of joy, but worries also follow suit: “I have to take care of Xiaoyi, but I can’t discourage Mazhi’s enthusiasm. I have to come up with a foolproof plan.”

So, he thought of a plan: “In theory, after we measure the land, we should draw a solid line in the middle to distinguish the ownership of the land. However, since Mazhi is from this village and Xiaoyi came from afar, he should be treated with a little more care than Mazhi.” Zhang Mazhi frowned: “How should we take care of him?”

Huang Silang continued, “It’s very simple, we can change the solid line to a dotted line, and the hardworking one gets more. Originally, you should each work in the piece of land defined by the boundary line. However, your progress is different, so the diligent one, after filling up his own land, can cross the boundary line and occupy the vacant land of the other person.”

Huang Xiaoyi asked puzzledly, “Uncle, isn’t this just a competition on who can grow faster? I don’t see any special care for me!” Zhang Mazhi also had a knot in his eyebrows, “What should we do if the person who grows slower catches up and wants to take back the occupied land?”

Huang Silang proudly said, “I just said it, we have to take care of Xiaoyi. So if Mazhi works hard, takes over Xiaoyi’s land first, and grows wheat and corn, if Xiaoyi catches up later and wants to take back his land, then of course Mazhi has to return the extra land. Whether the crops are ripe or not, Mazhi has to level the land and give it back to Xiaoyi to grow cotton and coffee.”

Huang Silang glanced at the reactions of the two men, and continued, “On the other hand, if Xiaoyi works harder, occupies Mazhi’s land first, and Mazhi catches up later and wants to take it back, then we have to take care of Xiaoyi. Xiaoyi has the right to continue occupying Mazhi’s land until the cotton and coffee on the land are harvested, and then return the extra land. What do you guys think?”

Huang Xiaoyi was delighted after listening. Although Zhang Mazhi was not happy in his heart, he understood the family relationship between Huang Silang and Huang Xiaoyi and didn’t say much. He thought, “Anyway, I just need to work harder and fill up the land first.” So, the three of them shook hands and made an agreement.

Okay, the story of the landlord renting out the land ends here. It is not difficult to see that Huang Xiaoyi’s land is analogous to Execution Memory, and Zhang Mazhi’s land is actually Storage Memory. The agreement between them is actually the preemption rules between Execution Memory and Storage Memory, which can be summarized as follows:

  • If the opponent’s memory space is idle, both parties can preempt it;
  • For the Execution Memory preempted by the RDD caching task, when the execution task needs memory, the RDD caching task must immediately return the preempted memory, and the involved RDD caching data either needs to be written to disk or cleared;
  • For the Storage Memory space preempted by the distributed computing task, even if the RDD caching task needs to reclaim memory, it can only be released after the task is completed.

At the same time, I have summarized the key content in this example and its corresponding relationship with Spark below, hoping to help you deepen your understanding.

Analyzing Memory Consumption from Code #

Now that we’ve discussed the theory, let’s move on to practical examples to intuitively understand the different memory areas that are consumed by different parts of the code.

The sample code is very simple. Its purpose is to read the words.csv file and count the specified words.

val dict: List[String] = List("spark", "scala")
val words: RDD[String] = sparkContext.textFile("~/words.csv")
val keywords: RDD[String] = words.filter(word => dict.contains(word))
keywords.cache
keywords.count
keywords.map((_, 1)).reduceByKey(_ + _).collect

This code snippet contains 6 lines of code, and we will analyze them line by line.

Firstly, the first line defines a dictionary called dict. This dictionary is generated in the driver program and will be distributed to the executor program along with the subsequent RDD calls. The second line reads the words.csv file and generates the words RDD. The third line is crucial. It filters the words RDD using the dict dictionary. At this point, dict has been distributed to the executor program, and the executor will store it in the heap memory to filter the strings in the data partitions of words. The dict dictionary belongs to a user-defined data structure, so the executor will store it in the User Memory area.

Next, the fourth and fifth lines cache and count the keywords RDD for frequent access later. The cached distributed dataset occupies the Storage Memory area. In the last line of code, we call reduceByKey on keywords to count the words individually. We know that the reduceByKey operator introduces a Shuffle, and the internal data structures involved in the Shuffle process, such as mapping, sorting, and aggregation operations relying on buffers, arrays, and hash maps, consume memory in the Execution Memory area.

I have summarized the memory areas consumed by different parts of the code in the table below for your convenience.

Memory Consumption by Different Code

Summary #

Understanding the mechanisms of memory management helps us fully utilize the memory of applications and improve their execution performance. Today, we focused on learning the fundamentals of memory management.

Firstly, let’s look at the memory management methods. Spark distinguishes between on-heap memory and off-heap memory. For off-heap memory, Spark directly requests and releases memory space in the operating system memory by calling Java Unsafe’s allocateMemory and freeMemory methods, which incurs higher management costs. For on-heap memory, Spark delegates the management to the JVM. However, frequent JVM garbage collection poses a potential risk to execution performance. In addition, Spark’s estimation of on-heap memory usage is often not precise enough, and overestimating available memory can lead to potential out-of-memory errors.

Secondly, we discuss unified memory management and the preemption rules between Execution Memory and Storage Memory. They are like Huang Xiaoyi and Zhang Mazis in the story of renting land, and the preemption rules are like the land sharing agreement between them. There are mainly three rules:

  • If the other party’s memory space is free, both parties can preempt.
  • The RDD caching task must immediately return the preempted execution memory when there is a memory requirement for the executing task. The RDD caching data involved should either be written to disk or cleared.
  • Even if the RDD caching task needs to reclaim memory, the Storage Memory space preempted by distributed computing tasks cannot be released until the tasks are completed.

Finally, let’s discuss the memory consumption of different code in different memory regions. The memory regions include Reserved Memory, User Memory, Execution Memory, and Storage Memory. Reserved Memory is used to store Spark internal objects, User Memory is used to store user-defined data structures, Execution Memory is used for distributed task execution, and Storage Memory is used to store RDD caching and broadcast variables.

Well, these are the fundamentals of memory management. Of course, there are still many topics related to memory, such as memory overflow, RDD caching, memory utilization rate, parallel computation of execution memory, and more. In the performance optimization section, I will continue to discuss these topics with you from a memory perspective.

Daily Practice #

  1. Do you know which calculation steps in Spark can make use of off-heap memory after it is enabled? Could you provide some examples?
  2. Compared to on-heap memory, why is Spark’s estimation of memory usage more accurate in off-heap memory?
  3. Considering the configuration parameters given below, could you calculate the specific sizes of different memory areas (Reserved, User, Execution, Storage)?

Looking forward to seeing your thoughts and answers in the comments section. See you in the next lecture!