16 Memory Perspective Ii How to Avoid Cache Saturation Effectively

16 Memory Perspective II - How to Avoid Cache Saturation Effectively #

Hello, I’m Wu Lei.

In Spark application development, effectively utilizing cache can often greatly improve execution performance.

However, one day, a classmate told me that their execution performance became worse after adding cache. After carefully reviewing this classmate’s code, I was shocked. The code was filled with a large number of .cache operations, whether it was RDD or DataFrame. Wherever there was a distributed dataset, there was almost always a .cache afterwards. Obviously, the misuse of cache was the culprit for the decline in execution performance.

In fact, in some scenarios, cache is a panacea, while in other cases, excessive use of cache is a case of trying to catch a falling knife. So when should cache be used, how should it be used, and what are the precautions? In today’s lecture, let’s first review the working principle of cache and then answer these questions.

How Caching Works #

In the lecture on storage systems, we actually introduced the caching process of RDDs, but the perspective at that time was focused on MemoryStore, with the aim of understanding the working principle of the storage system. Today, let’s refocus on caching.

There are three aspects of Spark’s caching mechanism that we need to master:

  • Caching storage levels: These specify the storage medium for data caching, such as memory, disk, etc.
  • Caching computation process: This process involves transforming RDDs into blocks and storing them in memory or on disk.
  • Caching eviction process: This process involves evicting cached data from memory or disk, either actively or passively.

Now, let’s take a look at each of these aspects.

Storage Levels #

Spark’s Cache supports various storage levels, such as MEMORY_AND_DISK_SER_2 and MEMORY_ONLY. How can we remember and distinguish these similar-looking strings? In fact, each storage level has three basic elements:

  • Storage medium: memory, disk, or both.
  • Storage format: object values or serialized byte arrays. Levels with “SER” in their names indicate using serialization, while those without “SER” indicate using object values.
  • Replication factor: The number at the end of the storage level name represents the number of copies. If there is no number, the default is one copy.

When we break down the various storage levels, we find that they are just different combinations of these three basic elements. I have listed all the storage levels supported by Spark in the table above to help you deepen your understanding.

Although there are many cache levels, the most commonly used ones are MEMORY_ONLY and MEMORY_AND_DISK, which are the default storage levels for RDD and DataFrame caching, respectively. In daily development work, when you call the .cache function on RDDs or DataFrames, Spark defaults to using MEMORY_ONLY and MEMORY_AND_DISK.

Caching Computation Process #

In the MEMORY_AND_DISK mode, Spark will try to cache the entire dataset in memory first. If there is not enough memory, the remaining data will be spilled to disk. MEMORY_ONLY, on the other hand, does not consider whether there is enough memory, but simply caches the data in memory. Even if there is not enough memory, it will not spill to disk. It is easy to see that both of these storage levels first attempt to cache the data in memory. The data storage process in memory was explained in Lecture 6, so let’s review it here.

Whether it is an RDD or a DataFrame, their data partitions are stored in the form of iterators. Therefore, to cache the data, we need to first expand the iterator into actual data values, a step called “Unroll” (step 1). The expanded object values are temporarily stored in a data structure called ValuesHolder and then transformed into MemoryEntry. The transformation is implemented using toArray, so it does not generate additional memory overhead, and this step is called “Transfer” (step 2). Finally, the MemoryEntry and its corresponding BlockID are stored in a hash dictionary (LinkedHashMap) in the form of Key-Value pairs (step 3).

When all the data partitions of the distributed dataset are unrolled, transferred, and registered in the hash dictionary, the caching process in memory is completed.

Caching Eviction Process #

However, in many cases, the demand for data caching in applications may exceed the storage space available in the Storage Memory area. Although caching tasks can occupy space in the Execution Memory area, “what goes around, comes around”. As the execution of tasks progresses, the memory space occupied by caching tasks must be “released”. At this time, Spark needs to perform the eviction process for caching. You can think of Storage Memory as a popular internet celebrity restaurant, and the data shards waiting to be cached as customers waiting to be seated. When the demand exceeds the supply, and the number of customers far exceeds the number of seats, Spark naturally needs to establish some rules to “expel” those idle customers and make room for those waiting in line.

So, what rules does Spark use to “expel” customers? Next, I will take the case of caching multiple distributed datasets simultaneously to analyze what happens when memory is limited.

We can use a diagram to illustrate this process. Let’s assume that the MemoryStore contains cached data for 4 RDDs/Data Frames. After these 4 distributed datasets cache some data shards, the Storage Memory area becomes full. When RDD1 tries to cache the 6th shard into the MemoryStore, it finds that there is not enough memory and cannot fit in.

In this case, Spark will clear one by one the “idle customers” MemoryEntry to release memory, in order to get more available space to store new data shards. This process is called eviction, and the Chinese translation is quite vivid, it’s called “驱逐” (expulsion), which means to expel those unfortunate MemoryEntries from the MemoryStore.

So, going back to the previous question, how does Spark select these unlucky ones based on what rules? This rule is called LRU (Least Recently Used). Based on this algorithm, the one with the lowest recent access frequency is the unlucky one. Because LRU is a basic data structure algorithm, it is often tested in written exams and interviews, so I won’t talk much about its concept.

What we need to know is how Spark implements LRU. Here, Spark uses a clever data structure: LinkedHashMap, which naturally supports the LRU algorithm.

LinkedHashMap uses two data structures to maintain data, one is the traditional HashMap, and the other is a doubly linked list. The purpose of HashMap is for fast access. Given a specific BlockId, HashMap returns the MemoryEntry with an efficiency of O(1). The doubly linked list is different. It is mainly used to maintain the access order of elements (i.e., the key-value pairs of BlockId and MemoryEntry). Any element that has been accessed, inserted, read, or updated will be placed at the end of the linked list. Therefore, the elements at the head of the linked list are exactly the “least recently accessed” elements.

In this way, when the available memory space is insufficient and it is necessary to evict cached blocks, Spark only needs to use the LinkedHashMap to sequentially evict the data shards in the cache based on the “least recently accessed” principle.

In addition to this, in the lecture on the storage system, a student asked why MemoryStore uses LinkedHashMap instead of a regular Map to store the key-value pairs of BlockId and MemoryEntry. The answer I just mentioned explains this.

Going back to the example in the diagram, when RDD1 tries to cache the 6th data shard, but there is not enough available memory space, Spark scans the LinkedHashMap from start to finish and records the size of each MemoryEntry. When the total size of the unlucky MemoryEntries exceeds the size of the 6th data shard, Spark stops scanning.

Interestingly, the selection rule for the unlucky ones follows the phrase “兔子不吃窝边草” (rabbits don’t eat the grass near their burrows), MemoryEntries belonging to the same RDD will not be selected. Just as shown in step 4 of the diagram, the first blue MemoryEntry will be skipped, and the two MemoryEntries marked with crosses immediately after it will be selected.

Therefore, to summarize, in the process of clearing the cache, Spark follows two basic principles:

  • LRU: Clear the BlockId and MemoryEntry key-value pairs that have been “least recently accessed” first according to their access order.
  • Rabbits don’t eat the grass near their burrows: MemoryEntries belonging to the same RDD have “amnesty” during the eviction process.

Degradation to MapReduce #

Although there is a stage of cache eviction, the Storage Memory space will always be exhausted, and the MemoryStore will always “expel” those that cannot be removed. At this time, the MEMORY_ONLY mode will abandon the remaining data shards. For example, in the Spark UI, you often see that the cache ratio in the Storage tab is less than 100%. And from the Storage tab, we can also observe that in the MEMORY_AND_DISK mode, the dataset is divided into two parts occupying memory and disk.

This is because for the MEMORY_AND_DISK storage level, when memory is insufficient to hold all RDD data shards, Spark will use the DiskStore to cache the RDD shards that have not been expanded to disk. The working principle of DiskStore was explained in detail in the lecture on the storage system. You can go back and take a look. I recommend that you combine your knowledge of DiskStore to derive the caching process of RDD shards on disk.

Therefore, compared to MEMORY_ONLY, MEMORY_AND_DISK mode ensures that the dataset is physically stored 100% on the storage media. For RDDs or DataFrames with long computation chains, it is worth it to materialize the data on the disk. However, we should not call .cache for every RDD or DataFrame, because in the worst case, Spark’s in-memory computation will degrade to the disk-based computation mode of Hadoop MapReduce.

For example, you are developing an application using the DataFrame API, and the calculation process involves 10 transformations between DataFrames. Each DataFrame calls .cache to cache the data. Since the Storage Memory space is limited, the MemoryStore can only accommodate the data of two DataFrames at most. Therefore, there will be 8 exchanges of DataFrame data at the granularity of DataFrame. In the end, the MemoryStore will store the most frequently accessed DataFrame data shards, and the other data shards will be evicted to the disk. In other words, on average, at least 8 transformations of DataFrame will write the computation results to disk, which is basically the computation mode of Hadoop MapReduce.

Of course, we are considering the worst-case scenario, but this also helps us understand the potential risks and hazards of abusing the cache.

The Proper Use of Cache #

Since overusing Cache can have severe consequences, when is it appropriate to use Cache? I suggest following these two basic principles when making decisions:

  • If the number of references to the RDD/DataFrame/Dataset in the application is 1, Cache should not be used.
  • If the number of references is greater than 1 and the cost of execution exceeds 30%, Cache should be considered.

The first principle is easy to understand, so let’s talk more about the second one. Here we introduce a new concept: cost of execution ratio. It refers to the ratio between the total time consumed by computing a distributed dataset and the total execution time of the job. Let’s take an example. Suppose we have a data analysis application that takes 1 hour to complete end-to-end. There is a DataFrame that is referenced twice in the application. From reading the data source to performing a series of computations, it takes 12 minutes to generate this DataFrame. In this case, the cost of execution ratio of this DataFrame would be calculated as: 12 * 2 / 60 = 40%.

You may ask, “It’s easy to calculate the execution time of the job by checking the Spark UI, but how do we calculate the execution time of the DataFrame?” Here, I will share a small trick. We can extract the DataFrame calculation logic from the existing application and use the Noop provided by Spark 3.0 to precisely measure the execution time of the DataFrame. Let’s assume df is the DataFrame that is referenced twice. We can copy all the code that df depends on into a new job and then trigger the calculation using Noop on df. The purpose of Noop is cleverly designed. It only triggers the computation without involving disk writes and data storage. Therefore, the execution time of the new job is exactly the execution time of the DataFrame.

//Precisely calculate DataFrame execution time using Noop
df.write
.format("noop")
.save()

You might think it’s cumbersome to calculate the cost ratio every time. However, as long as you have sufficient understanding of the data source and a clear understanding of the intermediate process of computing the DataFrame, you don’t need to calculate the cost of execution ratio precisely every time. After a few tries, you will be able to estimate the cost ratio of the distributed dataset with reasonable accuracy.

Notes on Cache #

Now that we have understood when to use Cache, let’s talk about some important points to keep in mind when using Cache.

First of all, we all know that .cache is a lazy operation, so after calling .cache, we need to trigger the materialization of the cache by using an action operator. However, I have noticed that many students are very casual when choosing action operators, and they just use whichever is convenient among ‘first’, ’take’, ‘show’, and ‘count’.

This is definitely wrong. Out of these four operators, only ‘count’ triggers the complete materialization of the cache, while ‘first’, ’take’, and ‘show’ only materialize the data involved. For example, ‘show’ by default only produces 20 results. If we call the ‘show’ operator after .cache, it will only cache the 20 records in the dataset.

Once we have chosen the operator, let’s discuss how to handle the caching. You might think, “Isn’t it simple? Just call .cache after RDD or DataFrame”. Well, it’s not that simple. Let me give you a multiple-choice question to test you. If you have a DataFrame called df with dozens of columns and subsequent data analysis, which caching method should you use from the table below?

val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)

// Caching method 1
val cachedDF = df.cache
// Data analysis
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)

// Caching method 2
df.select(col1, col2).filter(col2 > 0).cache
// Data analysis
df.filter(col2 > 0).select(col1, col2)
df.select(col1, col2).filter(col2 > 100)

// Caching method 3
val cachedDF = df.select(col1, col2).cache
// Data analysis
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)

As we all know, due to limited Storage Memory space, the Cache should follow the ‘minimum common subset’ principle. This means that developers should only cache the necessary data columns for subsequent operations. According to this principle, method 1 should be excluded, as df is a wide table with dozens of columns.

Let’s take a look at method 2, which caches the data columns col1 and col2, where col2 values are greater than 0. The first analysis statement just swaps the order of filter and select. The second statement has a filter condition that limits col2 values to be greater than 100, which means the result of this statement is a subset of the cached data. So, at first glance, both analysis statements can logically utilize the cached data.

However, unfortunately, both analysis statements will skip the cached data and read the Parquet source file from disk again, and then calculate the projection and filtering logic from scratch. Why is that? The reason behind this is that the Cache Manager requires both Analyzed Logical Plans of the two queries to be completely identical in order to reuse the cache of the DataFrame.

The Analyzed Logical Plan is a relatively basic logical plan, mainly responsible for semantic checks of the AST query syntax tree to ensure the validity of referenced table, column, and other metadata. More intelligent reasoning such as predicate pushdown and column pruning will only take effect in the Optimized Logical Plan. Therefore, even if two queries are the same except for the order of select and filter, they will be considered as different logical plans in the Analyzed Logical Plan stage.

To avoid Cache miss caused by different Analyzed Logical Plans, we should use the third implementation method, assign the data we want to cache to a variable, and all subsequent analysis operations on this variable will completely reuse the cached data. You see, using cache is not as simple as just calling .cache.

In addition, we should also clean up used Cache in a timely manner to free up memory space for other datasets, in order to avoid Eviction as much as possible. Generally, we use .unpersist to clean up deprecated cached data, which is the inverse operation of .cache. The unpersist operation supports both synchronous and asynchronous modes:

  • Asynchronous mode: calling unpersist() or unpersist(False)
  • Synchronous mode: calling unpersist(True)

In asynchronous mode, after the Driver sends the request to clean up the cache to each Executor, it will immediately return and continue executing user code, such as subsequent task scheduling, broadcast variable creation, etc. In synchronous mode, after the Driver sends the request, it will wait for all Executors to give clear results (whether the cache clearing is successful or failed). Each Executor has different efficiency and progress in clearing the cache, and the Driver will continue executing the code on the Driver side only after the last Executor returns the result. Obviously, synchronous mode will impact the efficiency of the Driver’s work. Therefore, in general, when we need to actively clean up Cache, we often use the asynchronous calling method, that is, calling unpersist() or unpersist(False).

Summary #

To effectively avoid the misuse of Cache, we must start with the working principle of Cache and understand its three important mechanisms: storage level, caching computation, and cache eviction process.

Regarding the storage level, the two most commonly used in actual development are MEMORY_ONLY and MEMORY_AND_DISK, which are the default storage levels for RDD and DataFrame caching, respectively.

As for caching computation, it can be divided into three steps. The first step is Unroll, which materializes the iterators of RDD data into object values. The second step is Transfer, which encapsulates the object values into MemoryEntry. The third step is to register BlockId and MemoryEntry key-value pairs to the LinkedHashMap data structure.

In addition, when the data caching demand exceeds the storage memory capacity, Spark uses the characteristics of the LinkedHashMap data structure to follow two basic principles, LRU and “rabbits don’t eat grass near their nest”, to clear memory space:

  • LRU: Clear the “least recently used” BlockId and MemoryEntry key-value pairs according to the access order of elements.
  • “Rabbits don’t eat grass near their nest”: During the eviction process, MemoryEntries belonging to the same RDD have “immunity”.

Furthermore, we need to understand the general principles and considerations for using Cache, which can be summarized as follows:

  • If the RDD/DataFrame/Dataset is referenced only once in the application, we should not use Cache.
  • If the reference count is greater than one and the runtime cost accounts for more than 30%, we should consider enabling Cache (the runtime cost can be calculated using Spark 3.0’s noop feature).
  • The action operator count should be chosen to fully materialize the cached data, and when calling Cache, we should assign the data to a variable. This way, any analysis operation performed on this variable will fully reuse the cached data.

Daily Practice #

  1. Can you combine your knowledge of DiskStore to deduce the process of cache partitioning to disk under the MEMORY_AND_DISK mode for RDDs?
  2. Why do you think the eviction rule should follow the principle of “the rabbit does not eat grass near its burrow”? What are the risks if the same RDD’s MemoryEntry is allowed to be evicted?
  3. For the cache reuse of DataFrames, why didn’t the Cache Manager adopt the method based on the Optimized Logical Plan? What difficulties do you think exist? If you were to implement the Cache Manager, how would you do it?

Looking forward to seeing your thoughts and answers in the comments. If your friends are also struggling with how to use cache, feel free to forward them this lecture. See you in the next lecture!