14 From the CPU Perspective How to Efficiently Utilize the CPU

14 From the CPU Perspective - How to Efficiently Utilize the CPU #

Hello, I’m Wu Lei.

In daily development and optimization work, some colleagues often complain to me, “Why is the CPU utilization of my application so low? In such a large cluster, the CPU utilization is only 10%!” Indeed, low CPU utilization is not only a waste of valuable hardware resources, but also makes it difficult to achieve satisfactory end-to-end performance of the application. So, in the development of distributed applications, how can we efficiently utilize the CPU?

As we mentioned before, the ultimate goal of performance tuning is to seek synergy and balance among all participating hardware resources, so that the hardware resources can reach a balanced and bottleneck-free state. For the CPU, the hardware resource that most requires synergy and balance is none other than the memory. There are mainly two reasons for this: on the one hand, only memory can bear the processing latency; on the other hand, the memory supplies data directly to the CPU registers through the data bus on the motherboard. Therefore, clarifying the relationship between them can lay a better foundation for performance tuning.

So, in today’s lecture, let’s start from the perspective of hardware resource balance and analyze how the CPU and memory should collaborate.

What is the essence of the balance between CPU and memory? #

We know that Spark divides memory into two categories: Execution Memory and Storage Memory, which are used for distributed task execution and RDD caching, respectively. Although RDD caching ultimately occupies Storage Memory, the calculation tasks still consume Execution Memory before RDD unrolling. Therefore, the balance between CPU and memory in Spark is essentially the coordination and allocation between CPU and execution memory.

In order to achieve a balance between CPU and execution memory, we need to use three types of configuration parameters, each of which controls parallelism, execution memory size, and the parallel computing capability of the cluster. Only when these parameters are properly set, can both CPU and execution memory be fully utilized. Otherwise, the balance between CPU and execution memory will be disrupted, resulting in either underutilized CPU or out-of-memory (OOM) errors.

To understand which specific configuration options are included in these three types of parameters, and how they are configured, we need to first understand some basic knowledge about how the execution memory is divided among the threads of parallel computation. To help you understand, let me tell you a story.

Huang Xiaoyi’s Plan: How does parallel computing threads divide and execute memory? #

Do you remember the story of the landlord renting out the land? After signing a land occupancy agreement with Zhang Mazi, Huang Xiaoyi began to calculate how to make the most profitable use of the land he obtained.

He thought to himself, “This piece of land is too big for me to cultivate on my own. First, I’m not as hardworking as Zhang Mazi. Second, whether it’s planting cotton or coffee, it takes a lot of time and effort. I can’t afford to spend so much time and effort working with my back facing the sky and my face towards the yellow earth! It’s better to transfer the land to someone else to cultivate, and I will focus on purchasing and selling, earning the price difference will be enough for me to live comfortably!”

So, he made up his mind and posted a notice.

Within three days of posting the notice, people from ten miles around came to seek land transfer. Most of them were hardworking farmers who wanted to improve their lives through this opportunity, so each person wished they could acquire all 500 hectares of land.

Seeing this, Huang Xiaoyi was overjoyed. He believed that not only would his land be quickly cultivated, but he could also rightfully occupy Zhang Mazi’s land. However, it was not just about the scale, he also needed to ensure the quality of cotton and coffee production, and more importantly, find a way to make this operating model sustainable.

Therefore, Huang Xiaoyi added a supplementary clause: “In view of the enthusiasm of the villagers participating, for fairness, I have formulated new rules for land transfer: First, each villager will have a minimum and maximum area of land that they can obtain. The specific values will be determined by the total cultivable land area and the number of applicants. Secondly, the validity period of land transfer rights will be consistent with the crop growth cycle. Once the crops are harvested, the transferee must return the land, and if they wish to cultivate again, they need to reapply.

For example, at the current stage, the total cultivable land area has expanded from 500 hectares to 800 hectares (after Huang Xiaoyi occupies Zhang Mazi’s land). If 400 villagers apply for land transfer rights, then each villager can obtain a maximum of 2 hectares (800/400) of land, and a minimum of 1 hectare (800/400/2) of land. In other words, if the number of villagers is N, the area of land each villager can obtain will fluctuate between (1/N/2, 1/N).

Everyone agreed to these rules and regulations, and before long, all 800 hectares of land were transferred. This win-win deal allowed everyone to get what they needed and even Huang Siliang, who was always cunning, couldn’t help but admire and exclaimed, “The new generation always surpasses the old generation!”

Alright, the story temporarily ends here, but what does this have to do with Huang Xiaoyi’s plan and what we are going to talk about today?

We have mentioned before that the land leased by Huang Xiaoyi is analogous to the Execution Memory in the memory area. In today’s story, the cotton and coffee farmers recruited by Huang Xiaoyi correspond to the threads in the Executor thread pool, each thread executing distributed tasks. The rules for land transfer correspond to the basic logic that multiple threads need to follow when competing for memory resources during concurrent task execution.

Therefore, the rules for memory resource competition during execution are as follows: In the same Executor, when multiple threads (denoted as N) attempt to compete for execution memory, they need to follow two basic principles:

  • The total size of execution memory (denoted as M) is the sum of two parts, one is the initial size of Execution Memory, and the other is the remaining space of Storage Memory.
  • Each thread is allocated a certain amount of available memory, with a lower limit of M/N/2 and an upper limit of M/N, which means the mean value.

Three-legged Stool: Parallelism, Concurrency, and Execution Memory #

Now that we have clarified the relationship between threads and execution memory, let’s talk about the three types of configuration settings corresponding to parallelism, concurrency, and execution memory, as well as how they affect the balance between CPU and computation memory.

Three types of configuration settings #

We have talked about parallelism, which refers to the number of partitions into which a distributed dataset is divided in order to achieve distributed computing. Parallelism determines the granularity of data partitioning: the higher the parallelism, the finer the granularity of data, the more data partitions, and the more dispersed the data.

Parallelism can be set using two parameters: spark.default.parallelism and spark.sql.shuffle.partitions. The former sets the default parallelism for RDDs, while the latter specifies the default parallelism for the Shuffle Reduce phase in the Spark SQL development framework.

So what is concurrency? As we mentioned in the configuration settings lecture, the size of the Executor’s thread pool is determined by the parameter spark.executor.cores, and the number of threads consumed by each task during execution is given by the spark.task.cpus configuration setting. The ratio of the two is the concurrency, which represents the maximum number of tasks that can run simultaneously within an Executor at the same time. Since spark.task.cpus defaults to 1 and usually does not need to be adjusted, concurrency is basically determined by the spark.executor.cores parameter.

As for the Executor’s thread pool, although threads themselves can be reused, each thread can only compute one task at a time, and each task is responsible for processing a data partition. Therefore, at runtime, there is a one-to-one correspondence between threads, tasks, and partitions.

Distributed tasks are distributed from the Driver to the Executor, where the Executor encapsulates the Task as a TaskRunner and then hands it over to the recyclable thread pool (newCachedThreadPool). When a thread in the thread pool receives a TaskRunner, it requests memory from the Execution Memory and then starts executing the task.

If we liken cotton farmers and coffee farmers to CPU threads, then the TaskRunner can be understood as farming tools, and the data partitions to be processed by the Task can be understood as crop seeds. After getting the farming tools and seeds, the farmers need to go to Mr. Huang’s place to apply for a piece of land before they can start farming.

Finally, let’s talk about execution memory. Mr. Huang’s land represents execution memory. The initial value of execution memory in the heap is determined by multiple parameters, and the specific calculation formula is: spark.executor.memory * spark.memory.fraction * (1 - spark.memory.storageFraction). In comparison, the calculation of off-heap execution memory is slightly simpler: spark.memory.offHeap.size * (1 - spark.memory.storageFraction).

In addition, in the unified memory management mode, if the Storage Memory is not fully occupied by RDD caching, the executing tasks can dynamically preempt the Storage Memory. Therefore, when calculating the total amount of memory that can be allocated to executing tasks, this portion of memory space that is likely to be preempted should also be taken into account. This is why the total cultivable land area owned by Mr. Huang gradually expands from the initial 500 acres to 800 acres.

It can be seen that the total amount of allocated execution memory will dynamically change with the increase and decrease of cached tasks and executing tasks. However, regardless of how it changes, the available total amount of execution memory will not be lower than the initial value set by the configuration setting.

Now that we understand the concepts of parallelism, concurrency, and execution memory, as well as their respective configuration settings, let’s summarize how they affect the balance between CPU utilization and computation memory through two examples that often affect CPU utilization.

One of the reasons for inefficient CPU utilization: Thread suspension #

Given the total amount of execution memory M and the total number of threads N, in order to ensure that each thread has a chance to obtain a sufficient amount of memory to process data, Spark uses a HashMap data structure to record the amount of memory consumed by each thread in the form of (Key, Value), and ensures that all Value values do not exceed M/N. In extreme cases, some threads may not be able to obtain the required amount of memory, and the total memory they can obtain is less than M/N/2. At this time, Spark will suspend these threads until other threads release enough memory space.

You may ask, “Since the maximum amount of memory each thread can obtain is M/N, which is the average of the total memory and the number of threads, why are there some threads that can’t even get M/N/2? This doesn’t make mathematical sense!” This is a good question. The occurrence of this situation is due to three factors and their interactions:

  • Dynamically changing total amount of execution memory M
  • Dynamically changing concurrency N
  • Data distribution of the distributed dataset First, let’s talk about the dynamically changing total execution memory M. The lower limit of M is the initial value of Execution Memory, and the upper limit is determined by spark.executor.memory * spark.memory.fraction and encompasses all memory regions. When the application starts executing, the value of M is equal to this upper limit, but as RDD caching gradually fills the Storage Memory, the value of M will decrease.

Additionally, up to this point, when calculating the upper and lower limits (1/N/2,1/N), we have used the total number of threads N, which is fixed. The value of N represents the maximum concurrency within an executor, and a stricter calculation formula is spark.executor.cores divided by spark.task.cpus. However, in practice, the calculations for the upper and lower limits use N~, not N. The meaning of N~ is the current concurrency within the executor, i.e., the number of tasks that are currently running in parallel in the executor. Obviously, N_ <= N.

In other words, although there are N CPU threads in an executor, not all of these threads are necessarily active. During the Spark task scheduling process, these N threads might not all be assigned distributed tasks at the same time, so the threads that are assigned tasks first have the opportunity to request more memory. In some extreme cases, the threads that are assigned tasks later may not even be able to apply for any memory. However, as the tasks are executed and the task scheduling progresses, N~ will quickly approach N, and the scenarios of CPU thread suspension and memory allocation will gradually improve.

Just like the example mentioned in the additional clause, when the total area of arable land is 800 hectares and there are 400 villagers applying for land transfer rights, each villager can at most obtain 800/400 = 2 hectares of land, and at least obtain 800/400 / 2 = 1 hectare of land.

But if these 400 villagers do not arrive at the same time, but in two groups of 200 people, there will be a problem. According to his rules, the first group of 200 villagers can obtain a maximum of 800/200 = 4 hectares of land per person. As we mentioned before, each villager who applies wants to make some money through this opportunity, so all 200 of these villagers apply for 4 hectares of land, and Huang Xiaoyi’s land is quickly divided up! The second group of 200 villagers who arrive later have no land to cultivate, and they can only wait until the first group of villagers have a successful harvest of cotton and coffee, and then apply for land transfer rights together.

Assuming that the first group of villagers has a successful harvest at the same time, according to Huang Xiaoyi’s transfer rules, they have to give up the land use rights and if they want to continue farming, they have to reapply with the second group of villagers. In this case, the calculation of the upper and lower limits is the same as the algorithm Huang Xiaoyi initially mentioned.

The third factor that affects task concurrency and memory allocation is the distribution of distributed datasets. In the previous example, if each person in the first group of villagers only applies for 2 hectares of land, then there will still be land available for the second group of villagers. The size of land applied by each individual depends on the amount of crop seeds they have. We previously compared the data slices that each task needs to process with crop seeds. Therefore, the amount of data in the data slices determines how much memory is required for executing the task. If the parallelism of the distributed dataset is set properly, the thread suspension problem caused by task scheduling lag will be alleviated.

Reason for CPU inefficiency 2: Scheduling overhead #

When the problem of thread suspension is alleviated, CPU utilization will be improved. In that case, should we maximize the parallelism, make each data slice small enough so that each CPU thread can request memory, and eliminate thread suspensions? Of course not, if the degree of parallelism is large enough, it will indeed make the data shards more scattered and the data granularity finer, thus requiring less memory for each executing task. However, excessive data scattering can have serious side effects: increased scheduling overhead.

For each distributed task, the driver encapsulates it into a TaskDescription and then distributes it to various executors. TaskDescription contains all the information related to task execution, such as task ID, attempt ID, data shard ID to be processed, locally added files and JAR packages, task properties, serialized task code, and so on. After receiving the TaskDescription, the executor needs to deserialize it first to read the task information, then deserialize the task code to obtain executable code, and finally create a task runner by combining other task information.

Therefore, as you can see, scheduling and executing each task require the executor to consume CPU to perform a series of operations mentioned above. Data sharding corresponds to threads and executing tasks one-to-one, when the data is excessively scattered, the number of distributed tasks will increase significantly, but the amount of data that each task needs to process is very small. In terms of CPU consumption, compared to the proportion spent on data processing, the overhead of task scheduling is almost negligible. Obviously, in this case, the effective utilization of CPU is also very low.

How to optimize CPU utilization? #

You may say, “This is awkward. Low parallelism is not feasible as CPU threads can hang up; high parallelism is not feasible as the scheduling overhead is too high and the CPU utilization is also low. It’s not good whether it’s high or low. What should I do then?”

Let’s do some calculations. Let’s use Huang Xiaoyi’s wishful thinking abacus as an example. If 400 villagers apply for his 800 mu of land at the same time, then each villager can get about 1 to 2 mu of land. Correspondingly, each villager needs to buy just enough seeds to fill 1 to 2 mu of land. Because buying too many seeds can’t be planted, and buying too few will cause losses. Assuming that the total amount of seeds in the Wazi Village agricultural product trading market is just enough to plant 1000 mu of land, from the perspective of the sellers, these seeds should be sold 1000/2 = 500 to 1000/1 = 1000 times to make the most money.

Therefore, when given an executor thread pool and the size of the executing memory, we can refer to the algorithm mentioned above to calculate a degree of parallelism that allows the average size of data shards to be between (M/N/2, M/N). This is often a good choice.

In summary, for CPU utilization, the relationship between parallelism, concurrency, and executing memory is like a three-legged tripod filled with boiling water. If all three legs are balanced, everything will be fine. If any side is weakened, the boiling water in the tripod will spill out and affect the innocent.

Summary #

In today’s lecture, we discussed methods to effectively improve CPU utilization by balancing CPU and execution memory. We summarized the relationship between Executor parallelism, concurrency, and execution memory, as well as their impact on CPU utilization.

Firstly, the proportion of memory that each CPU thread can allocate in an Executor has upper and lower limits. The highest limit is not more than 1/N, and the lowest limit is not less than 1/N/2, where N represents the thread pool size.

Secondly, when considering the thread pool size and execution memory, low parallelism and large data sharding can lead to CPU thread suspension, which is not conducive to improving CPU utilization. On the other hand, too high parallelism and excessively scattered data will increase scheduling overhead and also hinder CPU utilization improvement.

Lastly, to effectively improve CPU utilization given an execution memory M, thread pool size N, and total data amount D, we need to calculate the optimal parallelism P. The calculation method is to ensure that the average size of the data shards, D/P, falls within the range of (M/N/2, M/N). With this approach, our CPU utilization during runtime is often not too poor.

Daily Exercise #

  1. From the perspective of Executor concurrency, execution memory size, and distributed task parallelism, when do you think OOM (Out of Memory) problems may occur?
  2. Since the total amount of execution memory M is dynamically changing and the number of concurrent tasks N~ is also dynamically changing, the upper and lower limits of memory allocation per thread are adjusted dynamically. Do you know what is considered as the adjustment period?

Looking forward to seeing your thoughts and answers in the comments. If your friends are also struggling to improve CPU utilization, please feel free to forward this lecture to them. See you in the next lecture!