12 Fundamental Configuration Explained Which Parameters Affect Application Stability

12 Fundamental Configuration Explained - Which Parameters Affect Application Stability #

Hello, I am Wu Lei.

The National Day holiday is coming to an end, and we are also wrapping up our basic module. So far, we have learned together about the RDD programming model, Spark distributed deployment, Spark working principles, and common RDD operators. Congratulations! With this knowledge, I believe you can quickly implement most business requirements in Spark distributed application development.

However, quickly implementing various business requirements with code is only the first step. We not only need to make the code run, but also make it run fast and stable.

To achieve this, we still need configuration parameters. If we consider Spark as an F1 racing car, then configuration parameters are the various parameters of the racing car body, such as the number of cylinders, maximum torque, wheelbase, suspension system, vehicle weight, and so on. Only by configuring the car body parameters reasonably can we ensure the stability and performance of the car itself, laying the foundation for outstanding performance by the driver.

In today’s lecture, we will talk about the configuration options available in Spark, as well as the meanings and functions of these options.

Configuration Items #

Open the Configuration page on the Spark official website to find all the Spark configuration items.

However, it is frustrating that there are too many configuration items, with a wide variety of types. Some need to be set to true/false, while others require us to provide a specific value, which makes it overwhelming and difficult to choose from.

image

So the question is, where should we start with so many configuration items? Don’t worry, since our goal is to make the car “run steadily” and “run fast”, we can start by organizing the configuration items that we must know from these two perspectives.

In this lesson, let’s first sort out the configuration items that can make Spark run steadily. When we introduce Spark SQL later, we will focus on the parts related to “run fast”.

Regarding running steadily, you may have this question: “Normal cars can be driven without adjusting any vehicle parameters. Similarly, most Spark configuration items have default values, allowing developers to use the factory settings and save the trouble of tuning. Isn’t that great?” Unfortunately, for most application scenarios, Spark cannot run smoothly with the default parameter settings.

Take the spark.executor.memory configuration item as an example. It is used to specify the Executor memory, which is the upper limit of available memory for Executors. The default value of this parameter is 1GB. Obviously, for industrial-level data that can reach hundreds of GB or even TB, this setting is too low, and distributed tasks are prone to interruption due to Out of Memory (OOM) errors.

You see, in order to make Spark run steadily, we still need to put some effort into it. If you think that setting the memory parameter to hundreds of GB directly can solve the problem once and for all, it would be too rash. Adjusting configuration item parameters based solely on resource supply is a “rough and simple” approach and is not advisable. In fact, the stability of running an application depends on whether the hardware resource supply matches the computational needs.

This is like assembling a racing car. To get a high-performance car, we don’t need every component to meet the “top configuration” requirements, but we need to make the assembled components fit and match each other to achieve the expected horsepower output of the car.

Therefore, we may as well start from the perspective of hardware resources to explore the configuration items that developers must pay attention to. Since we used memory as an example earlier, and we briefly mentioned configuration items related to memory management in the lesson on memory management, you may still have some impression. Next, let’s start with memory and talk about the related configuration items.

Memory #

When it comes to memory, let’s first review the memory allocation of Spark. For a given Executor Memory, Spark divides the JVM Heap into 4 regions: Reserved Memory, User Memory, Execution Memory, and Storage Memory, as shown in the following figure.

The meanings of different memory regions and their calculation formulas were explained in detail in [Lesson 8]. If you don’t remember clearly, you can review it. Here, we will focus on the setting strategies for these memory configuration item values.

image

With the help of the figure, the size of the Reserved Memory is fixed at 300MB, and the sizes of the other 3 regions are determined by 3 configuration items: spark.executor.memory, spark.memory.fraction, and spark.memory.storageFraction.

For the sake of convenience in the following discussion, we will refer to them as M, mf, and sf respectively. M is used to specify the JVM Heap size allocated to the Executor process, which is the Executor Memory. The Executor Memory is divided among Execution Memory, Storage Memory, and User Memory.

The size allocated to Execution Memory and Storage Memory is determined by (M - 300) * mf, while the size of User Memory is determined by (M - 300) * (1 - mf), which is used to store user-defined data structures, such as various instantiated objects or collection types (such as arrays, lists, etc.) contained in RDD operators.

Therefore, if your distributed application does not require a large amount of custom objects or collection data, you should set the value of mf as close to 1 as possible, so that User Memory tends to be 0, and a large amount of available memory can be left for Execution Memory and Storage Memory.

We know that since version 1.6, Spark has introduced a unified dynamic memory management mode. When the resources of one side are not fully utilized, Execution Memory and Storage Memory can compete with each other. However, even so, we still need sf to set a boundary between them to tell Spark explicitly which side we developers prefer to “bias”.

So, how should developers make trade-offs when setting sf? The answer lies in the frequency of data reuse. Let’s take examples in different scenarios.

For ETL (Extract, Transform, Load) type jobs, data is often processed in a predetermined business logic sequence, and most of the data forms are accessed only once, with few cases of repeated reference.

Therefore, in ETL jobs, RDD caching does not necessarily improve execution performance, so we don’t need to use caching. In this case, we should set sf to a lower value, compressing the available space of Storage Memory to make more memory space available for Execution Memory.

On the contrary, if your application scenario involves machine learning or graph computation, these computational tasks often require repeatedly consuming and iterating the same data, which requires a different approach. In this case, we need to make full use of the performance advantages provided by RDD caching, so naturally we need to set sf to a slightly larger value, allowing Storage Memory to have enough memory space to accommodate distributed datasets that need to be accessed frequently. Alright, that’s it. We have interpreted the meanings and general principles of the three configuration options for memory. You need to set these options reasonably according to your application scenario so that the program can run fast and stable. With these, you have basically scored 80 points in the memory configuration. The remaining 20 points need to be obtained from repeated practice in daily development, and we look forward to you summarizing more configuration experience.

In terms of hardware resources, memory serves the CPU. The effective configuration of memory is not only to accommodate data better, but more importantly, to improve CPU utilization. Having discussed memory, let’s now take a look at the CPU.

CPU #

There are only two parameters that we need to pay attention to when it comes to CPU. They are spark.executor.instances and spark.executor.cores. The former specifies the number of Executors in the cluster, while the latter specifies the number of CPU cores available to each Executor.

As we know, a CPU core can only handle one distributed task at a time. Therefore, the product of spark.executor.instances and spark.executor.cores actually determines the concurrency of the cluster. We define this product as the “degree of concurrency”.

Speaking of concurrency, we have to mention another concept: parallelism. Parallelism is a highly related but completely different concept compared to concurrency. Parallelism is used to define the number and granularity of the partitions of a distributed dataset, which directly determines the computational load of the distributed tasks. The higher the parallelism, the finer the data granularity, the more data shards, and the more dispersed the data.

This also explains why parallelism always corresponds to attributes such as the number of partitions and data shard count. For example, in Lecture 9 we mentioned that parallelism corresponds to the number of partitions of an RDD.

There are also two configuration options related to parallelism: spark.default.parallelism and spark.sql.shuffle.partitions. The former defines the default parallelism for RDDs generated by the SparkContext.parallelize API, while the latter is used to define the default parallelism for the Shuffle Read phase (Reduce phase) in the Shuffle process.

In comparison, the starting point of concurrency is computational power. It, together with the execution memory, constitutes the supply level of computational resources. On the other hand, the starting point of parallelism is data. It determines the computational load of each task, corresponding to the demand level of computational resources. One is supply, the other is demand, and the balance between supply and demand directly affects the stability of program execution.

Balancing CPU, Memory, and Data #

Therefore, the so-called balance between supply and demand actually refers to the balance between CPU, memory, and data. So here’s the question: is there a quantitative way to achieve a balanced state between the three? In fact, with a simple formula, we can easily achieve this.

For ease of explanation, let’s denote the CPU cores specified by the spark.executor.cores configuration option as c, and the size of the Execution Memory as m. Remember? The size of m is given by the formula (M - 300) * mf * (1 - sf). It is not difficult to see that c and m together quantify the available computational resources of an Executor.

After quantifying the supply of resources, let’s talk about the data. For a distributed dataset to be computed, let’s denote its storage size as D and its parallelism as P. Given D and P, it is not difficult to deduce that D/P is the granularity of the distributed dataset, which is the storage size of each data shard.

We have learned about scheduling systems before and we know that in the process of Spark distributed computing, a data shard corresponds to a Task, and a Task corresponds to a CPU core. Therefore, to achieve a balance between CPU, memory, and data, we must ensure that each Task has enough memory for the CPU to process its corresponding data shard.

To do this, we need to ensure that the size of each data shard is within the same order of magnitude as the available memory for each Task. Specifically, we can use the following formula to quantify this relationship:

D/P ~ m/c

Here, the tilde symbol means that the expressions on the left and right sides are of the same order of magnitude. The expression on the left side, D/P, represents the size of a data shard, while the expression on the right side, m/c, represents the available memory allocated to each Task. Guided by this formula, combined with the storage size of the distributed dataset, we can purposefully and systematically set or adjust the three types of configuration options mentioned above, which are related to CPU, memory, and parallelism.

Disk #

After discussing CPU and memory, let’s move on to disk. Compared to the previous two, the configuration options for disk are relatively simple. The only option worth our attention is spark.local.dir, for convenience we will refer to it as ld from now on. This option can be set to any local file system directory and its default value is the /tmp directory.

The directory specified by the ld parameter is used to store various temporary data, such as intermediate files in Shuffle, RDD cache (storage level containing “disk”), and so on. These temporary data play a crucial role in the stable operation of the program.

For example, the intermediate files in Shuffle are the foundation and prerequisite for the execution of the Reduce phase tasks. If these intermediate files are lost, Spark will throw a “Shuffle data not found” exception during the Reduce phase, thus interrupting the execution of the application.

Since these temporary data are indispensable, we cannot blindly follow the default option. Instead, it is necessary to first examine the situation of the /tmp directory. Unfortunately, the default /tmp directory has limited storage space and its stability is also a concern. Therefore, in industrial applications, we usually cannot accept using the /tmp directory to set the ld configuration option.

Now that we understand the purpose of the ld configuration option, it naturally comes to mind that we should set it to a file system with sufficient storage space and even better performance, such as a large SSD (Solid State Disk) file system directory.

Alright, that’s it. We have introduced the configuration options related to CPU, memory, and disk, as well as their meanings, effects, and setting techniques. Speaking of this, you may not be able to hold back and ask, “I already understand the importance of these configuration options, but where should I set them?” Next, we will continue to talk about the ways in which developers can set configuration options.

Ways to set configuration items #

To meet different application scenarios, Spark provides developers with three ways to set configuration items: configuration files, command line parameters, and SparkConf objects. These methods all record and set configuration items in the form of (Key, Value) key-value pairs.

The configuration file refers to spark-defaults.conf, which is stored in the conf subdirectory under the Spark installation directory. The parameter settings in this file apply to all applications in the cluster, so its scope of effect is global. For any application, if developers do not set configuration items through other methods, the application will default to using the parameter values in spark-defaults.conf as the basic settings.

To set configuration items in spark-defaults.conf, you only need to separate the name of the configuration item from its setting value with a space. For example, taking the three configuration items spark.executor.cores, spark.executor.memory, and spark.local.dir as examples, we can set their values using the following method.

spark.executor.cores 2
spark.executor.memory 4g
spark.local.dir /ssd_fs/large_dir

However, in daily development work, different applications have different demands for resources: some need more CPU cores, while others need higher parallelism. There are various demands that are hard to harmonize. In this case, relying on spark-defaults.conf alone for global settings is not enough.

Spark provides developers with two application-level setting methods: command line parameters and SparkConf objects. Their scope of effect is limited to the application itself. Let’s take a closer look at how to use these two methods.

Let’s start with command line parameters, which refers to setting configuration items using the –conf keyword after running the spark-shell or spark-submit command. We know that spark-shell is used to start an interactive distributed runtime environment, and spark-submit is used to submit distributed jobs to the Spark computing cluster.

Taking the three configuration items mentioned earlier as examples, if you want to set them using command line parameters, you need to assign the values to the parameters when submitting the spark-shell or spark-submit command in the form of –conf Key=Value.

spark-shell --master local[*] --conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.local.dir=/ssd_fs/large_dir

It is not difficult to see that although this method allows developers to flexibly set configuration items at the application level, the writing format is too cumbersome, and each configuration item needs to be prefixed with –conf. Moreover, the setting of command line parameters is not conducive to code management. Over time, the setting of parameter values is likely to change with changes in data volume or cluster capacity, but this process of change is difficult to record and maintain, which undoubtedly increases the operation and maintenance costs for developers and operators.

In comparison, in terms of isolation and maintainability, the setting method of SparkConf objects is superior. During the code development process, we can define a SparkConf object and use its set method to set configuration items. Once again, let’s use the three configuration items for CPU, memory, and disk as examples.

import org.apache.spark.SparkConf
val conf = new SparkConf()
conf.set("spark.executor.cores", "2")
conf.set("spark.executor.memory", "4g")
conf.set("spark.local.dir", "/ssd_fs/large_dir")

Alright, with that, we have summarized the relevant configuration items for CPU, memory, and disk, and emphasized the balance between CPU, memory, and data supply and demand. Once you have mastered these setting methods and key points, you can try these configuration items yourself. You can use the Word Count example we discussed earlier to practice and reinforce what you have learned today.

Key Recap #

In today’s lecture, we explained several important parameters that affect the stability of application programs from three aspects: CPU, memory, and disk. You need to understand their meanings, functions, and applicable scenarios. To help you remember them, I have organized them in the table below for your reference.

Image

After familiarizing yourself with these key configuration options, you also need to understand how to set them. Spark provides three methods for setting configuration options: spark-defaults.conf configuration file, command line parameters, and SparkConf object. The first method is used for global settings, while the latter two apply to the application itself.

For these three methods, Spark reads the parameter values of configuration options in the order of “SparkConf object -> command line parameters -> configuration file”. For configuration options that are set multiple times, Spark takes the value of the earlier parameter.

Practice for each lesson #

Please briefly go through the Configuration page on the Spark official website and explain which configuration items are suitable for setting in spark-defaults.conf, and which configuration items are better set using the SparkConf object.

Feel free to leave a message in the comments section to communicate with me. If this lecture is helpful to you, I also recommend sharing this lesson with colleagues or friends who need it. See you in the next lesson.