09 Common Rdd Operators Iii Data Checkpointing and Repartitioning Persistency

09 Common RDD Operators III - Data Checkpointing and Repartitioning Persistency #

Hello, I’m Wu Lei.

In the previous two lectures on common RDD operators, we introduced many operators for RDD internal transformations and aggregations. In today’s lecture, we will continue to introduce the remaining operators in the table.

As usual, we will not introduce all the operators in the table, but only select the most commonly used and representative ones for explanation. For the operators we are going to discuss today, I have highlighted them in bold font. You can take a quick look to get an idea.

Image

You may feel that these highlighted operators don’t seem to be related at first glance. However, if we approach them from the perspective of the data lifecycle and classify them, it is easy to find out that these operators belong to a certain stage of the lifecycle.

Image

With the help of the above diagram, let’s take a look at the lifecycle and functionality of each operator.

First, in the data preparation stage, union and sample are used to merge and split data from different sources.

Moving from left to right, we come to the data preprocessing stage. A well-balanced data distribution helps improve CPU utilization in the later data processing stage and overall execution efficiency. How can we achieve such balance? That’s right, it’s time for coalesce and repartition to take the stage. Their role is to rearrange the data distribution of the RDD.

After the data processing and computation are completed, we naturally need to collect the computation results. Spark provides two types of result collection operators. One type, such as take, first, and collect, directly collects the results to the Driver side. The other type directly persists the computation results to a (distributed) file system, such as saveAsTextFile mentioned in this lecture.

Alright, now that we have a clear idea about which operators we will cover today and their general positioning and functions, let’s dive into the specific usage of these operators.

Data Preparation #

First, let’s talk about the union and sample operations in the data preparation stage.

union #

In our daily development, union is very common. It is often used to merge two RDDs of the same type but from different sources to form a unified, larger distributed dataset. For example, in a data analysis scenario, one data source comes from a remote database, and another data source comes from a local file system. To merge the two sources, we need to use the union operation.

How to use it? Let me give you an example. Given two RDDs: rdd1 and rdd2, calling rdd1.union(rdd2) or rdd1 union rdd2 will result in the union of the two RDDs. The code looks like this:

// T: data type
val rdd1: RDD[T] = _
val rdd2: RDD[T] = _
val rdd = rdd1.union(rdd2)
// or rdd1 union rdd2

It should be emphasized that the prerequisite for the union operation to work is that the two RDDs being merged must have the same type. In other words, RDD[String] can only be merged with RDD[String], and cannot be merged with any other RDD type (such as RDD[Int] or even RDD[UserDefinedClass]).

For multiple RDDs of the same type, we can merge them all together by using the union operation repeatedly. For example, given three RDDs of the same type: rdd1, rdd2, and rdd3, we can use the following code to merge them:

// T: data type
val rdd1: RDD[T] = _
val rdd2: RDD[T] = _
val rdd3: RDD[T] = _

val rdd = (rdd1.union(rdd2)).union(rdd3)
// or val rdd = rdd1 union rdd2 union rdd3

It is not difficult to see that the typical use case of union is to merge multiple “small data” into one “big data”, so as to fully utilize the parallel computing advantages of the Spark distributed engine.

On the contrary, in general data exploration scenarios, we often only need a basic understanding of a subset of the data. For example, for a dataset with a size of TB, we may only want to randomly extract a portion of the data and calculate statistical values (mean, variance, etc.) for this subset.

So, how does Spark support such calculations that transform “big data” into “small data”? This brings us to the sample operation of RDD.

sample #

The sample operation of RDD is used to perform random sampling on RDD, thereby transforming a larger dataset into a “small data” subset. Compared with other operations, sample has more parameters, namely withReplacement, fraction, and seed. Therefore, to perform data sampling on RDD, you need to use the following syntax to call the sample operation: sample(withReplacement, fraction, seed).

Among them, withReplacement is of type Boolean, which indicates whether the sampling is done with replacement. If the value of this parameter is true, the sampling result may contain duplicate records. On the other hand, if the value is false, the sampling result does not contain duplicate records.

The fraction parameter is easy to understand. It is of type Double with a value range of 0 to 1, indicating the sampling ratio, i.e., the ratio of the size of the result set to the original dataset. The seed parameter is optional. It is of type Long and is used to control whether the sampling results are consistent for each sampling. Let’s look at some examples to better understand the usage of the sample operation.

// Generate an array of integers from 0 to 99
val arr = (0 until 100).toArray
// Generate RDD using parallelize
val rdd = sc.parallelize(arr)

// Without seed, the return results of each sample call are different
rdd.sample(false, 0.1).collect
// Result: Array(11, 13, 14, 39, 43, 63, 73, 78, 83, 88, 89, 90)
rdd.sample(false, 0.1).collect
// Result: Array(6, 9, 10, 11, 17, 36, 44, 53, 73, 74, 79, 97, 99)

// With seed, the return results of each sample call are the same
rdd.sample(false, 0.1, 123).collect
// Result: Array(3, 11, 26, 59, 82, 89, 96, 99)
rdd.sample(false, 0.1, 123).collect
// Result: Array(3, 11, 26, 59, 82, 89, 96, 99)

// Sampling with replacement, the result may contain duplicate values
rdd.sample(true, 0.1, 456).collect
// Result: Array(7, 11, 11, 23, 26, 26, 33, 41, 57, 74, 96)
rdd.sample(true, 0.1, 456).collect
// Result: Array(7, 11, 11, 23, 26, 26, 33, 41, 57, 74, 96)

Our experiment is divided into 3 groups. The first two groups are used to compare the differences between adding or not adding the seed parameter, and the last group is used to illustrate the effect of the withReplacement parameter.

It is not difficult to see that without the seed parameter, the return results after each call to sample are different. However, when we use the same seed to call the operation, no matter how many times we call sample, the return results are consistent. In addition, if you carefully observe the third group of experiments, you will find that there are duplicate records in the result set. This is because withReplacement is set to true, and the sampling process is done with replacement.

Okay, so far we have covered the two commonly used operations in the data preparation stage. With union and sample, you can easily adjust the size of the distributed dataset and truly have control over your data.

Data Preprocessing #

Next, in the data preprocessing stage, let’s talk about two operators responsible for data redistribution: repartition and coalesce.

Before understanding these two operators, you need to understand the concept of parallelism. Parallelism refers to the number of partitions in an RDD. Remember? The partitions attribute of the RDD records all the data partitions of the RDD. Therefore, the parallelism of the RDD is consistent with its partitions attribute.

Developers can use the repartition operator to adjust (increase or decrease) the parallelism of the RDD as they wish, while the coalesce operator can only be used to decrease the parallelism of the RDD. Obviously, in terms of data distribution adjustment, repartition is more flexible and has more application scenarios. Let’s introduce repartition first, and then look at the usefulness of coalesce.

repartition #

Once an RDD is given, we can adjust the parallelism of the RDD by calling repartition(n). The parameter n is of type Int, which means it is an integer, so we can pass any integer to repartition. As usual, let’s familiarize ourselves with the usage of repartition by using an example.

// Generate an array of integers from 0 to 99
val arr = (0 until 100).toArray
// Create an RDD using parallelize
val rdd = sc.parallelize(arr)

rdd.partitions.length
// 4

val rdd1 = rdd.repartition(2)
rdd1.partitions.length
// 2

val rdd2 = rdd.repartition(8)
rdd2.partitions.length
// 8

First, we create an RDD for experimentation using an array. From this code snippet, we can see that the default parallelism of this RDD is 4. After adjusting the parallelism of the RDD to 2 and 8 respectively, by calculating the length of the RDD partitions attribute, we find that the parallelism of the new RDD is adjusted accordingly to 2 and 8.

At this point, you may still have questions like “Why do we need to adjust the parallelism of the RDD? There doesn’t seem to be any substantial difference between 2 and 8”.

In the lecture on RDD ([Lecture 2]), we mentioned that each data partition of an RDD corresponds to a distributed task, and each task requires a CPU thread to execute.

Therefore, the parallelism of the RDD largely determines the CPU utilization in a distributed system, which in turn affects the execution efficiency of parallel computing in a distributed system. Too high or too low parallelism will reduce CPU utilization, thus wasting valuable distributed computing resources. Therefore, setting the parallelism of the RDD reasonably and effectively is crucial.

You may ask, “If that’s the case, how should I set the parallelism of the RDD reasonably?” To be honest, there is no fixed answer to this question. It depends on the available system resources, the size of the distributed data set, and even the amount of available memory.

However, based on experience, setting the parallelism to 2 to 3 times the number of available CPUs is often a good start. For example, if the number of Executors that can be allocated to the Spark job is N, and the number of CPUs configured for each Executor is C, then the recommended parallelism is between N_C_2 and N_C_3.

Although repartition is very flexible and allows you to adjust the parallelism of the RDD at will, you also need to be aware that this operator has a fatal drawback, which is that it introduces Shuffle.

As we know ([Lecture 6] explained in detail), Shuffle consumes all types of hardware resources during calculation, especially disk I/O and network I/O. Therefore, Shuffle is often the bottleneck of job execution efficiency. It is for this reason that when developing applications, we should try our best to avoid introducing Shuffle.

But you might say, “If data redistribution is a must and repartition will inevitably introduce Shuffle, what should I do?” If you want to increase the parallelism, then we really have to rely on repartition, and the problem of Shuffle cannot be avoided. However, if your requirement is to decrease the parallelism, then we can turn our attention to the twin brother of repartition: coalesce.

coalesce #

In terms of usage, coalesce is similar to repartition. Both of them adjust the parallelism of an RDD by specifying an integer parameter, i.e., coalesce(n). So what is the difference between them? Let’s compare coalesce and repartition with the code example provided.

// Generate an array of integers from 0 to 99
val arr = (0 until 100).toArray
// Create an RDD using parallelize
val rdd = sc.parallelize(arr)

rdd.partitions.length
// 4

val rdd1 = rdd.repartition(2)
rdd1.partitions.length
// 2

val rdd2 = rdd.coalesce(2)
rdd2.partitions.length
// 2

As we can see, in terms of usage, coalesce and repartition can be used interchangeably, and they have the same effect. However, if we observe the Directed Acyclic Graph (DAG) of both operations, we will find that they have different execution plans even though the computation logic is the same.

Image

By calling toDebugString on an RDD, Spark can print out the DAG of the current RDD. Although the printed text in the image may look messy, you only need to grasp one key point.

The key point is that in the output text of toDebugString, each numbered parentheses, such as “(2)” and “(4)” in rdd1, represents a stage in the DAG. Different stages are distinguished by indentation using tabs. In the image, “(4)” is indented further than “(2)”.

For the interpretation of toDebugString, knowing this much is enough. After studying the scheduling system, we already know that the boundaries between different stages in the same DAG are shuffles. Therefore, by observing the printed text above, we can clearly see that repartition introduces a shuffle, while coalesce does not.

So, why does repartition introduce a shuffle, while coalesce does not? The reason is that the two operations have fundamentally different working principles.

When adjusting the parallelism of an RDD using repartition, whether it increases or decreases, repartition has an indiscriminate data distribution effect on each data record in the RDD.

Specifically, for any given data record, the computation process of repartition first hashes and then takes the modulus, resulting in the target partition index for that data record. For the majority of data records, the target partition is often located on another executor or even another node, making shuffle unavoidable.

On the other hand, coalesce is different. When decreasing the parallelism, coalesce merges different data partitions within the same executor. As a result, data does not need to be distributed across executors or nodes, and therefore, no shuffle is introduced.

Here, I have prepared an illustrative diagram to provide a more intuitive demonstration of the computation process of repartition and coalesce. This diagram, combined with textual descriptions, will help you better understand the differences and connections between repartition and coalesce.

Image

That’s it for now. With the two operators, repartition and coalesce, used for redistributing RDDs in the data preprocessing stage, you are now able to adjust the parallelism of RDDs freely, based on the dataset size and available cluster resources. By doing so, you can improve CPU utilization and the execution performance of your jobs.

Result Collection #

After preprocessing, the next stage in the data lifecycle is data processing. In this stage, you can use various RDD common operators (Part II) [from the previous lesson] to perform different types of data processing, such as data transformation, data filtering, data aggregation, etc. After the processing is complete, we naturally need to collect the computation results.

In terms of result collection, Spark provides us with a rich set of operators. These operators can be mainly divided into two categories based on the collection path. The first category is to collect the computation results from various Executors to the Driver, and the second category is to directly persist the computation results to the file system through Executors. In the field of big data processing, the file system often refers to distributed file systems like HDFS or S3.

first, take, and collect #

The first category of operators we are going to introduce today includes first, take, and collect. Their usage is very simple, and as usual, we will explain them using code examples. Here, we will combine the Word Count from Lesson 1 and use these three operators to explore different stages of RDD.

import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// Read file contents
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

lineRDD.first
// res1: String = Apache Spark

// Tokenize by line
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))

cleanWordRDD.take(3)
// res2: Array[String] = Array(Apache, Spark, From)

// Convert RDD elements to (Key, Value) format
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// Group and count by word
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

wordCounts.collect
// res3: Array[(String, Int)] = Array((Because,1), (Open,1), (impl...

In this code, first is used to collect any data record from the RDD data set, while take(n: Int) is used to collect multiple records, with the number of records specified by the n parameter of type Int.

It is not difficult to observe that the main purpose of first and take is data exploration. At each step of RDD transformation, such as from text lines to words, or from words to key-value conversion in Word Count, we can use first or take to get a few computation results to ensure that the transformation logic matches the expected output.

In contrast, collect does not obtain partial results, but rather the entire data set. It collects the computation results of the RDD in their entirety to the Driver. In the example of Word Count above, you can see that due to the large size of the entire result set, the screen printout is truncated.

To help you better understand the working principle of the collect operator, I have included its computation process in the diagram below.

Image

Based on the diagram, it is not difficult to see that the collect operator has two performance pitfalls: network overhead introduced during data fetching and Driver’s Out of Memory (OOM).

The network overhead is easy to understand. Since data fetching and transportation are done across processes and nodes, similar to Shuffle, this process will inevitably introduce network overhead.

Moreover, generally speaking, the preset memory of the Driver is often in the order of GB, while the size of RDD is usually tens of GB or even hundreds of GB. Therefore, the risk of OOM is apparent. When the result set size exceeds the pre-set memory size of the Driver, Spark naturally throws an OOM exception.

It is precisely for these reasons that we must be cautious when using the collect operator. However, you might ask, “If the business logic requires collecting the entire result set and the collect operator is not suitable, what should I do?” Don’t worry, let’s continue reading.

saveAsTextFile #

For the entire result set, we can also directly persist them to disk using the second category of operators. Among these operators, the most representative one is saveAsTextFile. Its usage is straightforward. Given an RDD, we simply call saveAsTextFile(path: String). Here, path represents the target directory in the file system, which can be a local file system or distributed file systems like HDFS or Amazon S3.

To help you understand the second category of operators better, I have also summarized their working principles in the diagram below. As represented by saveAsTextFile, these operators directly materialize RDD data partitions to the file system through Executors, without any interaction with the Driver.

Image

Since the persistence of data is independent of the Driver, these operators naturally avoid the two performance pitfalls caused by the collect operator.

Well, that’s it. We have finished introducing the operators for result collection. With a good understanding of operators like first, take, collect, and saveAsTextFile, you can use first and take operators to verify the correctness of your computation logic and then use the saveAsTextFile operator to persist the entire result set to disk for future use.

Key Review #

In today’s lecture, we introduced and explained many RDD operators, which can be classified into different stages of the data lifecycle. The correspondence between operators and stages is shown in the following diagram.

Picture

In the data preparation stage, you can use union and sample to expand or shrink the distributed dataset. It is important to note that the multiple RDDs participating in union must be consistent in type.

In the data preprocessing stage, you can use repartition and coalesce to adjust the parallelism of the RDD. The parallelism of the RDD is crucial for CPU utilization and largely determines the efficiency of parallel computation. Generally, given the number of Executors N and the number of CPU/Executor configurations C, I would recommend setting the parallelism of the RDD between N_C_2 and N_C_3.

Finally, in the result collection stage, you can use first, take, collect, and other operators to explore the data. These operators can be used to verify whether the transformation logic in the computation process matches the expectations. Once you have confirmed the accuracy of the computation logic, you can use saveAsTextFile and other operators to persist the full result set to a (distributed) file system.

So far, we have covered most of the operators in the RDD development API in three lectures. By flexibly using these operators, you can easily meet most of the business requirements in daily development. For your convenience in reviewing and referencing, I have summarized the operators we have learned together in the table below. I hope it will be helpful to you.

Picture

Practice Questions #

  1. Given three RDDs, besides using rdd1 union rdd2 union rdd3 to merge them together, do you think there is a more elegant way? (Hint: reduce)

  2. What potential risks does coalesce have compared to repartition? (Hint: data distribution)

Feel free to leave a comment to interact with me. I also recommend sharing this lesson with more colleagues and friends to help them understand the commonly used operators of RDD.