17 Memory Perspective Iii Whose Fault Are Ooms and How to Break Them

17 Memory Perspective III - Whose Fault are OOMs and How to Break Them #

Hello, I’m Wu Lei.

Whether it’s batch processing, stream computing, data analysis, or machine learning, we always encounter the problem of OOM (Out Of Memory) in Spark jobs. Once OOM occurs, the job will be interrupted, and the business functionality of the application will not be able to be executed. Therefore, timely handling of OOM issues is a very important task in our daily development.

However, the OOM problems reported by Spark can be quite diverse, often making it difficult to figure out the cause. For example, we often encounter situations where the data set should be able to fit completely into memory based on its size estimation, but Spark still reports an OOM exception. At this time, many students would refer to online practices and keep increasing the spark.executor.memory parameter, to the point of mental breakdown, only to realize that it doesn’t help, and eventually give up.

So, what should we do when we encounter this “hot potato” called OOM? The first thing we should clarify is “where exactly is the OOM happening”. Only by accurately identifying the specific area where the problem occurs can we optimize effectively. Specifically, this “location” can be viewed from at least three aspects.

  • The location (Line of Code) where the OOM occurs.
  • Whether the OOM occurs on the Driver side or on the Executor side.
  • If it occurs on the Executor side, which memory region the OOM occurs in.

Identifying the location of the error code is crucial but also very simple. We can quickly find the LOC that throws the problem by using the Stack Trace. Therefore, what’s more critical is to determine whether the problem occurs on the Driver side or the Executor side, and which memory region it happens in. The OOM issues on the Driver and Executor sides are different, so we naturally need to treat them differently.

So in today’s lecture, let’s talk about the OOM problems on the Driver side and the corresponding solutions. Because the memory on the Executor side is divided into different regions, for the various OOM problems that appear on the Executor side, we still need to discuss them with case studies. Finally, I will guide you to summarize a set of “martial arts cheats” for dealing with OOM, so that when you face OOM, you can respond appropriately and effectively!

OOM on the Driver side #

Let’s start by talking about OOM (Out-of-Memory) issues on the Driver side. The main responsibility of the Driver is task scheduling, while also participating in a very small amount of task computation. Therefore, the memory configuration for the Driver is generally low, and there are no more finely divided memory areas.

Because the Driver’s memory is just one piece, the OOM problem on the Driver side naturally is not a fault of the scheduling system, but rather can only come from the computation tasks it is involved in. There are mainly two types:

  • Creating small-scale distributed datasets: using APIs such as parallelize, createDataFrame, etc. to create datasets
  • Collecting computation results: using operators such as take, show, collect, etc. to collect results on the Driver side

Therefore, there are two main causes for OOM on the Driver side:

  • The created dataset exceeds the memory limit
  • The collected result set exceeds the memory limit

The first cause is self-evident, so we won’t go into detail. When it comes to the second cause, I’m sure the first thing you think of is the notorious collect operator. Indeed, when talking about OOM, it is inevitable to mention collect. The collect operator will retrieve the full data set from the Executors to the Driver side. Therefore, if the size of the result set exceeds the memory limit of the Driver, it will naturally lead to OOM.

The OOM problem triggered by direct invocation of the collect operator by developers is actually easy to locate, but the more difficult part is identifying the OOM caused by indirect invocation of collect. So, what does it mean by indirect invocation of collect? Do you still remember the working principle of broadcast variables?

During the creation of broadcast variables, the data distributed across all Executors needs to be pulled back to the Driver side and then the broadcast variable is constructed on the Driver side. Finally, the encapsulated broadcast variable is distributed back to the Executors by the Driver side. The data retrieval of the first step is actually implemented using collect. If the total size of the data shards in the Executors exceeds the memory limit of the Driver side, it will also result in OOM. In your everyday optimization work, you may encounter the following manifestations and symptoms:

java.lang.OutOfMemoryError: Not enough memory to build and broadcast

But the actual pathology is that the Driver’s memory is limited and cannot accommodate the retrieved result set. Once we identify the cause, dealing with the OOM on the Driver side becomes simple. All we need to do is estimate the size of the result set and then increase the memory configuration on the Driver side accordingly. To adjust the memory size on the Driver side, we can use the spark.driver.memory configuration item. We can estimate the size of the dataset using the “Cache first, then view the execution plan” method. Here is an example code:

val df: DataFrame = _
df.cache.count
val plan = df.queryExecution.logical
val estimated: BigInt = spark
  .sessionState
  .executePlan(plan)
  .optimizedPlan
  .stats
  .sizeInBytes

Executor OOM #

Let’s talk about the OOM (Out Of Memory) issue on the executor side. As we know, the execution memory is divided into four regions: Reserved Memory, User Memory, Storage Memory, and Execution Memory. Among these four regions, which ones are susceptible to OOM exceptions? And which ones are not likely to have OOM issues?

In Executors, only the memory regions related to task execution are susceptible to OOM. The Reserved Memory size is fixed at 300MB and cannot be controlled by the user because it is hardcoded in the source code. For Storage Memory, even if the dataset cannot be fully cached in the MemoryStore, Spark does not throw OOM exceptions. The additional data will either be spilled to disk (MEMORY_AND_DISK) or simply discarded (MEMORY_ONLY).

Therefore, when Executors encounter OOM problems, we can exclude Reserved Memory and Storage Memory and focus on Execution Memory and User Memory to identify the issue.

OOM in User Memory #

In the memory management section, we mentioned that User Memory is used to store user-defined data structures such as arrays, lists, dictionaries, etc. If the total size of these data structures exceeds the limit of the User Memory region, you may see error messages like the ones shown in the examples below.

java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf

java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

If your data structures are used for distributed data transformations, you need to consider the size of the Executor’s thread pool when calculating the memory consumption in User Memory. Do you remember this example?

val dict = List("spark", "tune")
val words = spark.sparkContext.textFile("~/words.csv")
val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

The custom list dict is distributed to all the Executors with the tasks. Therefore, multiple dict instances in different tasks will consume User Memory repeatedly. If we denote the size of dict as size and the Executor’s thread pool size as threads, then the total memory consumption of dict in User Memory is: size * threads. If the total consumption exceeds the limit of User Memory, an OOM problem will occur.

To solve the OOM issue in User Memory, the approach is similar to that of the Driver. First, estimate the memory consumption of the data structures and then increase the memory configuration of User Memory accordingly. However, compared to the Driver, User Memory has more factors influencing its upper limit. The total size is calculated as spark.executor.memory * (1 - spark.memory.fraction).

OOM in Execution Memory #

When it comes to the OOM hotspot, Execution Memory is the most susceptible. As the saying goes, “old habits die hard.” In the process of distributed task execution, Execution Memory is the first to bear the brunt, so the chances of encountering errors are higher compared to other memory regions. Regarding OOM in Execution Memory, I have noticed a common misunderstanding among many students: as long as the data size is smaller than the execution memory, there won’t be an OOM issue, and vice versa, there will be a certain probability of triggering OOM problems.

In reality, the data size is not the key factor determining whether an OOM issue will occur. The compatibility between data distribution and the runtime planning of Execution Memory is the crucial factor. This may sound abstract, but do you remember Huang Xiaoyi’s clever calculation? In order to increase the enthusiasm and initiative of the villagers for farming, he formulated a land transfer agreement. All the villagers’ applied land sizes range between 1/N/2 and 1/N. Therefore, if some villagers are greedy and buy seeds far exceeding the upper limit of 1/N, the excess seeds will be wasted.

Similarly, we can consider Execution Memory as the land and the distributed dataset as the seeds. Once the memory request of a distributed task exceeds the upper limit of 1/N, Execution Memory will encounter OOM problems. Moreover, compared to OOM issues in other scenarios, OOM in Execution Memory is much more complex. It not only depends on the memory space size and data distribution but also relies on the Executor’s thread pool and runtime task scheduling.

By identifying the core reason for OOM problems, we can confidently handle various manifestations of Execution Memory OOM. Now, let’s take a look at two common examples in development: data skew and data expansion. For illustrative purposes, consider that the hardware configuration of the computing nodes in these two examples is the same: 2 CPU cores with 2 threads each, 1GB of memory, and spark.executor.cores set to 3, with spark.executor.memory set to 900MB.

Example 1: Data Skew #

Let’s first look at the first example of data skew. In the Reduce phase, the node fetches data shards. The data shard sizes for the three Reduce tasks are 100MB and 300MB respectively. Obviously, there is a slight data skew in the third data shard. Since the Executor thread pool size is 3, each Reduce task can obtain a maximum of 120MB of memory space (360MB * 1 / 3 = 120MB). The memory space allocated to Task 1 and Task 2 is enough to accommodate Shard 1 and Shard 2, so they can complete the tasks smoothly.

The data shard size for Task 3 far exceeds the memory limit. Even if Spark supports Spill and Sort during the Reduce phase, 120MB of memory space is not enough to meet the basic computing needs of processing 300MB of data, such as the memory consumption of data structures like PairBuffer and AppendOnlyMap, as well as temporary memory consumption for data sorting, and so on.

The appearance of this example is an OOM caused by data skew, but in essence, it is the memory request of Task 3 exceeding the 1/N limit. Therefore, for data skew problems represented by this example, we have at least two optimization strategies:

  • Eliminate data skew by ensuring that all data shard sizes are not larger than 100MB
  • Adjust Executor thread pool, memory, and parallelism-related configurations to increase the 1/N limit to 300MB

Each of these strategies can be further refined into various methods. Taking the second strategy as an example, to meet the 1/N limit, the simplest approach would be to set spark.executor.cores to 1, that is, the Executor thread pool only works in a “parallel” manner with a single thread. At this time, the memory limit for each task becomes 360MB, which is sufficient to accommodate 300MB data shards.

Of course, setting the thread pool size to 1 is not feasible; it was just to illustrate the flexibility of optimization. Continuing with the second strategy, you need to balance multiple configuration options to solve the OOM problem while fully utilizing the CPU. For example:

  • Keep the concurrency and parallelism unchanged, increase the execution memory setting, and raise the 1/N limit to 300MB
  • Keep the concurrency and execution memory unchanged, use related configuration options to increase the parallelism and shuffle the data, making all data shard sizes smaller than 100MB

I have provided detailed explanations on balancing and setting thread pool size, memory, and parallelism from a CPU perspective in a previous lecture. You can review it. As for how to eliminate data skew, you can think about it and share your ideas.

Example 2: Data Inflation #

Now let’s look at the second example of data inflation. In the Map phase, the node fetches HDFS data shards. The data shard sizes for the three Map tasks are all 100MB. According to the previous calculation, each Map task can obtain a maximum of 120MB of execution memory, so there should be no OOM problem.

The awkward part is that the data in the disk expands after entering the JVM. In our example, the data shard triples in size after loading into the JVM heap, transforming the original 100MB data into 300MB. Therefore, OOM becomes an inevitable issue.

In this example, the appearance is an OOM caused by data inflation, but fundamentally, it is still the memory request of Task 2 and Task 3 exceeding the 1/N limit. Therefore, for data inflation problems represented by this example, we also have at least two optimization strategies:

  • Distribute the data, increase the number of data shards, reduce the data granularity, and reduce the inflated data size to around 100MB
  • Increase memory configuration, adjust in combination with Executor thread pool, and increase the 1/N limit to 300MB

Summary #

To efficiently solve various OOM (Out of Memory) problems, the most important thing is to accurately locate the areas where the problems occur. This way, our optimization can be targeted. I suggest you follow two steps.

First, locate the code where the OOM occurs. You can quickly find the answer through the Stack Trace.

Second, determine if the OOM occurs on the Driver side or the Executor side . If it occurs on the Executor side, then locate the specific area where it occurs.

OOM on the Driver side can be divided into two categories:

  • Creating a dataset that exceeds the memory limit
  • Collecting a result set that exceeds the memory limit

The general method to deal with OOM on the Driver side is to first estimate the size of the result set appropriately, and then increase the memory configuration on the Driver side accordingly.

OOM on the Executor side is only related to User Memory and Execution Memory areas, as they are both related to task execution. The causes and solutions for OOM in the User Memory area are no different from those of the Driver side, so you can refer to them directly.

The cause of OOM in the Execution Memory area is that the data distribution does not match the runtime planning of Execution Memory, meaning the memory request of the distributed task exceeds the 1/N limit. The approach to solving the OOM problem in the Execution Memory area can generally be divided into three categories:

  • Eliminate data skewness so that the size of all data shards is less than the 1/N limit
  • Scatter the data to increase the number of data partitions and reduce the data granularity, so that the inflated data volume is less than 1/N
  • Increase memory configuration, adjust the Executor thread pool, and raise the 1/N limit

Daily Practice #

  1. In the examples where data expansion leads to OOM, why can Task1 obtain 300MB of memory space? (Hint: You can review the lecture on the CPU perspective to find the answer.)
  2. In daily development, what other OOM manifestations have you encountered? Can you classify them according to what we discussed today?

Looking forward to seeing your thoughts and sharing in the comment section. See you in the next lecture!