08 Memory Management How Spark Uses Memory

08 Memory Management - How Spark Uses Memory #

Hello, I am Wu Lei.

In [Lesson 6], we visited a branch of the Spark Construction Group and familiarized ourselves with the office environment and personnel configuration of the branch. We also used the analogy of “bricklaying tasks on a construction site” to explain the working principle of Spark Shuffle.

In this lesson, we will revisit the branch to see which construction projects the Spark company is undertaking and how these projects are being implemented. By understanding the construction process of the projects, we will learn about Spark’s memory management.

Image

Compared to other big data computing engines, when it comes to Spark’s features and advantages, the phrase you have probably heard the most is “in-memory computing”. Proper and efficient utilization of memory resources is one of Spark’s core competitive advantages. Therefore, as developers, it is important for us to understand how Spark uses memory.

Alright, without further ado, please put on your safety helmet and join me in visiting the Spark Construction Group branch again. However, before the official “visit”, there is some preparatory work we need to do, which is to first understand how Spark’s memory regions are divided.

Memory Area Partition in Spark #

For any Executor, Spark divides the memory into four areas: Reserved Memory, User Memory, Execution Memory, and Storage Memory.

image

Among them, Reserved Memory is fixed at 300MB and cannot be controlled by developers. It is a memory area reserved by Spark for storing various internal objects. User Memory is used to store the custom data structures defined by developers, such as arrays, lists, maps, etc., referenced in RDD operations.

Execution Memory is used to execute distributed tasks. The computation of distributed tasks mainly includes data transformation, filtering, mapping, sorting, aggregation, merging, and other steps, and the memory consumption of these computation steps all comes from Execution Memory.

Storage Memory is used to cache distributed datasets, such as RDD Cache and broadcast variables. The details of broadcast variables will be discussed in Lesson 10. RDD Cache refers to the copy of RDD materialized in memory. In a long DAG, if the same RDD is referenced multiple times, caching this RDD in memory can greatly improve the execution performance of the job. We will introduce the specific usage of RDD Cache at the end of this lesson.

It is not difficult to see that Execution Memory and Storage Memory, these two memory areas, play a crucial role in the execution performance of Spark jobs. Therefore, among all memory areas, Execution Memory and Storage Memory are the most important and also the ones that developers need to pay the most attention to.

Before Spark version 1.6, the space allocation for Execution Memory and Storage Memory was static. Once the space allocation was completed, the purposes and sizes of different memory areas were fixed. In other words, even if you didn’t cache any RDD or broadcast variables, the free memory in the Storage Memory area couldn’t be used to perform mapping, sorting, or aggregation tasks, and valuable memory resources would be wasted.

Considering the drawbacks of static memory partitioning, starting from version 1.6, Spark introduced the Unified Memory Management mode. In this mode, Execution Memory and Storage Memory can be converted into each other. What does this mean? Next, let’s step into the sub-company of the Spark Group and take a look at the logic of converting between different memory areas.

Conversion between Different Memory Areas #

As soon as we entered the gate of the branch office, we could see the workers busy on the construction site. When we asked, we found out that they had undertaken a project to convert shipping containers into mobile homes. As the name implies, the specific task of this project is to convert shipping containers into compact and stylish mobile homes.

The production process of mobile homes is not complicated. With a series of simple steps, shipping containers can be transformed into small and unique mobile homes. These steps include cleaning, cutting windows, cutting doors, painting, installing partitions, arranging furniture, and decorating. The production of mobile homes is completed on the construction site, and successfully converted mobile homes are immediately transported away by freight trucks to the logistics distribution center of the group company.

Okay, now that we have introduced the project of converting shipping containers into mobile homes, we must explain the relationship between this project and Spark. After all, no matter how interesting the story is, it is meant to assist us in learning Spark better.

I have organized the analogy relationship between the materials, construction steps of the project, and Spark in the following table: Table

From the table, we can see that the shipping container corresponds to RDD data source, and the construction steps such as cutting doors and windows correspond to various RDD operators. The construction site, which provides the construction site, plays the same role as the computing node memory in providing a place for data processing. From this perspective, the project of converting shipping containers into mobile homes can be seen as a Spark job or a Spark application.

Next, let’s take a look at the construction process of this project. As we approached the construction site, we found a red dotted line dividing the construction site in half. On the left side of the dotted line, there were several shipping containers covered in mud, while on the right side of the construction site, workers were busy converting shipping containers into mobile homes, some have already started arranging furniture, and some were cutting doors and windows.

Image

Seeing the red line on the ground, we couldn’t help but ask about it. The foreman explained the reason to us.

Ideally, production materials such as shipping containers and furniture should be stored in temporary warehouses (node hard disks), and the construction site (node memory) should only be used for conversion operations. However, the construction site is some distance away from the temporary warehouse, and transportation back and forth is not very convenient.

In order to improve work efficiency, the construction site is divided into two areas. In the above picture, the left side of the red line is called the storage area (Storage Memory), which is specifically used to temporarily store construction materials. The right side is called the execution area (Execution Memory), which is used by workers to convert shipping containers and produce mobile homes.

The reason for using the dotted line is that the size of the two areas is not fixed. When one area is empty, the other area can occupy it.

For example, suppose there are only two workers (CPU threads) working on converting shipping containers in the execution area. At this time, there is empty space in the execution area that can accommodate two more objects. Therefore, this empty space can be temporarily used to stack construction materials, and the storage area is thus expanded.

Image

However, when there are enough workers to expand production, for example, on the basis of the original two workers, two more workers come in, making a total of four workers who can produce mobile homes simultaneously. Any objects (such as the sofa and double bed in the above picture) between the red dotted line and the blue solid line need to be moved to the temporary warehouse, and the empty area is handed over to the two new workers to convert shipping containers. After all, converting shipping containers and producing mobile homes are the core tasks of the project.

Image

On the other hand, if there are fewer objects in the storage area and more workers available, for example, if there are six workers who can work simultaneously, then the empty space in the storage area will be temporarily used by the execution area. At this time, the execution area is effectively expanded.

Image

When more objects need to be placed in the storage area, the expanded execution area needs to be contracted back to the location of the red dotted line. However, for the area between the red solid line and the red dotted line, we must wait until the workers complete the conversion of the mobile homes before returning this area to the storage area.

Alright, that’s the end of the introduction to the project of mobile homes. It is not difficult to see that the execution area corresponds to Execution Memory, and the storage area is actually Storage Memory. The three rules of memory contention between Execution Memory and Storage Memory can be summarized as follows:

  • If the other party’s memory space is idle, both parties can contend for it;
  • For the part of Execution Memory contested by Storage Memory, when distributed tasks require computation, Storage Memory must immediately return the contended memory, and the involved cached data needs to be either written to disk or cleared;
  • For the part of Storage Memory contested by Execution Memory, even if Storage Memory needs to reclaim memory, it must wait until the distributed task is completed before releasing it.

After explaining the memory contention rules between Execution Memory and Storage Memory, next, let’s take a look at how the initial sizes of different memory areas are set.

Memory Configuration Options #

In general, the division of Executor JVM Heap is determined by three configuration options shown in the graph:

Image

Among them, spark.executor.memory is an absolute value, which specifies the total size of Executor JVM Heap. The other two configuration options, spark.memory.fraction and spark.memory.storageFraction, are fractional values, which specify the proportions of different areas.

spark.memory.fraction is used to determine the total memory size for processing distributed datasets in Spark. This memory includes both Execution Memory and Storage Memory, as shown by the green rectangle in the graph. The size of User Memory, which is the blue area in the graph, can be calculated as (M – 300) * (1 – mf).

spark.memory.storageFraction is used to further divide the initial sizes of Execution Memory and Storage Memory. As mentioned before, Reserved Memory is fixed as 300MB. The initial size of Storage Memory is (M – 300) * mf * sf, and the initial size of Execution Memory is (M – 300) * mf * (1 – sf).

By understanding these three configuration options, as developers, we can adjust different memory areas in a targeted manner to improve the efficiency of memory usage. As mentioned earlier, properly using RDD Cache can often greatly improve the execution performance of jobs. Therefore, in the last part of this lecture, let’s learn the specific usage of RDD Cache together.

RDD Cache #

In a Spark job, the calculation graph DAG often contains multiple RDDs. We need to understand when and which RDD should be cached. Blindly overusing cache is not a wise choice. Let’s start with the conclusion that when an RDD is referenced multiple times, it is worth considering caching it to improve job execution efficiency.

Let’s take the Word Count example from Lecture 1 as an illustration. The complete code is as follows:

import org.apache.spark.rdd.RDD

val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"

// Read file content
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

// Split by line
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))

// Transform RDD elements into (Key, Value) format
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// Group by word and count
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

// Print the top 5 most frequent words
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

// Save the group counts to a file
val targetPath: String = _
wordCounts.saveAsTextFile(targetPath)

If you run this code in spark-shell, you will find that both the take and saveAsTextFile operations execute slowly. At this point, we can consider improving the efficiency by adding cache to the wordCounts RDD.

So, how to add cache? It’s very simple. After defining the wordCounts RDD, you just need to call cache and count on this RDD in sequence, as shown below:

// Group by word and count
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

wordCounts.cache // Use the cache operator to inform Spark to cache the wordCounts RDD
wordCounts.count // Trigger the computation of wordCounts and cache it in memory

// Print the top 5 most frequent words
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

// Save the group counts to a file
val targetPath: String = _
wordCounts.saveAsTextFile(targetPath)

Since the cache function does not immediately materialize the RDD in memory, we also need to call the count operator to trigger this execution process. After adding these two statements, you will notice that the take and saveAsTextFile operations run much faster. It is highly recommended to compare the running speed before and after adding cache in spark-shell to intuitively feel the performance improvement of RDD cache.

In the above example, we add cache to the wordCounts RDD by calling the cache function, and behind the scenes, the cache function actually further calls persist(MEMORY_ONLY) to complete the computation. In other words, the following two statements are equivalent, both of which mean materializing the RDD into memory:

wordCounts.cache
wordCounts.persist(MEMORY_ONLY)

For adding cache, compared to the cache operator, the persist operator is more versatile. It allows developers to flexibly choose the storage medium, storage format, and replication count for cache, using various storage levels (such as MEMORY_ONLY) offered by Spark.

Spark supports a rich set of storage levels, each having three basic elements.

  • Storage Medium: Whether to cache data in memory or on disk, or both.
  • Storage Format: Whether to store data content as object values or byte arrays. “SER” indicates serialization, while without “SER” indicates using object values.
  • Replication Count: The number of copies of data. The integer at the end of the storage level name represents the number of copies. If no number is specified, the default is one copy.

I summarized the available storage levels in Spark in the following table. The checkboxes indicate the storage medium and storage format supported by each storage level. Take a look at it to get a better understanding.

image

After breaking down the storage levels listed in the table, we can see that they are just combinations of these three basic elements: storage medium, storage format, and replication count. The table lists all the storage levels currently supported by Spark, which enables you to quickly compare and find different storage levels to meet different business needs.

Key Points Review #

In today’s lesson, you need to grasp the principles of dividing the Executor JVM Heap and learn how to divide different memory areas through configuration items.

Specifically, Spark divides the Executor memory into four regions: Reserved Memory, User Memory, Execution Memory, and Storage Memory.

By adjusting the configuration items spark.executor.memory, spark.memory.fraction, and spark.memory.storageFraction, you can flexibly adjust the sizes of different memory areas to adapt to the memory requirements of Spark jobs.

Image

Furthermore, in Unified Memory Management mode, Execution Memory and Storage Memory can compete with each other for memory. You need to understand the preemption logic between the two. In summary, there are three rules for memory preemption:

  • If the other side has free memory space, both can preempt each other.
  • For the portion of Execution Memory preempted by Storage Memory, when distributed tasks need to compute, Storage Memory must immediately return the preempted memory, and the involved cached data needs to be either written to disk or cleared.
  • For the portion of Storage Memory preempted by Execution Memory, even if Storage Memory needs to reclaim memory, it must wait until the distributed task completes execution before releasing it.

Finally, we introduced the basic use of RDD Cache. When the reference count of an RDD in the code is greater than 1, you can consider improving job performance by caching the RDD. The specific approach is to call the cache or persist function on the RDD.

Among them, persist is more versatile. You can flexibly choose the storage medium, storage form, and number of replicas of the cache by specifying the storage level, thereby satisfying different business needs.

Practice for Each Lesson #

Alright, that’s all for this lesson. Today’s exercise is as follows:

Given the following configuration settings, please calculate the sizes of different memory areas (Reserved, User, Execution, Storage).

Feel free to share your answers in the comment section. If this lesson has been helpful to you, please feel free to share it with your friends. See you in the next lesson.