03 Common Rdd Operators Data Transformations Within Rdds

03 Common RDD Operators - Data Transformations Within RDDs #

Hello, I am Wu Lei.

In the last lecture, we organized the RDD operators provided by the Spark official website in a table. Understanding and mastering these operators is undoubtedly crucial for quickly implementing business logic on Spark.

Therefore, in the next few lectures, I will guide you through the usage and purpose of these common operators. Different operators, just like the frying spoons, shovels, knives, and various pots and pans in the kitchen, can only quickly prepare a table of delicious dishes when you are familiar with the operation methods of these “kitchen utensils”.

In today’s lecture, let’s first learn about the data transformation within the same RDD. Mastering commonly used RDD operators is the foundation for developing Spark applications, and data transformation operators are the foundation of this foundation. Therefore, we will prioritize learning this type of RDD operator.

Among these operators, we will focus on explaining map, mapPartitions, flatMap, and filter. These four operators almost cover 99% of data transformation scenarios in daily development. As for the remaining mapPartitionsWithIndex, I will leave it as an exercise for you to explore.

Image

As the saying goes, no rice, no meal. In order to master the kitchen utensils, we need to prepare ingredients such as rice, noodles, and oil. Learning RDD operators is the same. In order to manipulate these operators, we need to start with having an RDD.

Therefore, let’s take a look at how RDD is created next.

Creating RDDs #

In Spark, there are two typical ways to create RDDs:

  • Creating RDDs on internal data using SparkContext.parallelize;
  • Creating RDDs from external data using APIs like SparkContext.textFile.

Here, “internal” and “external” are in relation to the application itself. Various custom data structures like arrays, lists, and maps defined by developers in Spark applications belong to “internal data”. On the other hand, “external data” refers to all data outside the Spark system, such as data in local or distributed file systems, or data from other big data components (such as Hive, HBase, RDBMS).

The first method of creating RDDs is very simple. You just need to use the parallelize function to encapsulate internal data, as shown in the example below:

import org.apache.spark.rdd.RDD
val words: Array[String] = Array("Spark", "is", "cool")
val rdd: RDD[String] = sc.parallelize(words)

You can input the above code in spark-shell to intuitively understand the process of creating RDDs using parallelize. Usually, it is not recommended to define data sets with very large volumes within a Spark application because the data set is completely created by the driver end and after creation, it needs to be distributed to other executors across the network and processes. This often leads to performance problems. Therefore, the typical use case of the parallelize API is to create RDDs based on “small data”.

To create RDDs on real “big data”, we need to rely on the second method, which is creating RDDs from external data using APIs like SparkContext.textFile. Since the textFile API is relatively simple and occurs frequently in daily development, we will use the textFile API to create RDDs throughout the explanation of different RDD transformations.

To maintain continuity in the explanation, we will continue to use the input file wikiOfSpark.txt from the first lesson to create RDDs. The code implementation is as follows:

import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// Read the file content
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

Great! Now that we have created the RDD, we have the ingredients ready for cooking. Next, let’s go into the kitchen with our ladles and stir-fry.

Data Transformation in RDDs #

First, let’s get to know the map operator in RDDs. To put it bluntly, among all the RDD operators, the map operator has the highest probability of “appearing”. Therefore, we must master the usage and considerations of map.

map: Data transformation at the element level #

Let’s first talk about the usage of the map operator: Given a mapping function f, map(f) performs data transformation on the RDD at the element level. The parameter f can be a named function with an explicit signature, or it can be an anonymous function. Its parameter type must be consistent with the element type of the RDD, while the output type can be decided by the developer.

This straightforward introduction may be a bit confusing, but don’t worry, next we will use some small examples to demonstrate the usage of map more intuitively.

In the Word Count example in [Lesson 1], we use the following code to transform the RDD containing words into an RDD of key-value pairs, which is collectively referred to as the Paired RDD.

// Convert a regular RDD to a Paired RDD
val cleanWordRDD: RDD[String] = _ // Please refer to Lesson 1 for the complete code
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

In the above code implementation, the parameter passed to the map operator, i.e., word => (word, 1), is the mapping function f we mentioned earlier. However, here f is defined in the form of an anonymous function, where the word on the left represents the input parameter of the anonymous function f, and (word, 1) on the right represents the output of the function f.

If we convert the anonymous function into a named function, you may find it clearer. Here is a piece of code that redefines the named mapping function f.

// Convert RDD elements into (Key, Value) format

// Define the mapping function f
def f(word: String): (String, Int) = {
  return (word, 1)
}

val cleanWordRDD: RDD[String] = _ // Please refer to Lesson 1 for the complete code
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)

As you can see, we use the def syntax in Scala to explicitly define the named mapping function f, whose computational logic is consistent with the anonymous function just now. When doing RDD data transformation, we only need to pass the function f to the map operator. Regardless of whether f is an anonymous function or a named function, the transformation logic of the map operator is the same. You may want to enter the above two implementations separately into the spark-shell to verify the consistency of the execution results.

So far, we have mastered the basic usage of the map operator. Now you can define arbitrarily complex mapping functions f and then use map(f) on RDDs to perform various data transformations.

For example, by defining the following mapping function f, we can rewrite the counting logic of Word Count, which is to double the counting weight of the word “Spark”:

// Convert RDD elements into (Key, Value) format

// Define the mapping function f
def f(word: String): (String, Int) = {
  if (word.equals("Spark")) { return (word, 2) }
  return (word, 1)
}

val cleanWordRDD: RDD[String] = _ // Please refer to Lesson 1 for the complete code
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(f)

Although the map operator is flexible enough to allow developers to freely define transformation logic, as we just mentioned, map(f) performs data transformation on RDDs at the element level. In certain computational scenarios, this characteristic can seriously affect execution efficiency. Why is that? Let’s look at a specific example.

For example, if we change the counting requirement of Word Count from counting words to counting the hash values of words, what changes do we need to make in our code implementation? Let me demonstrate with an example:

// Convert a regular RDD to a Paired RDD

import java.security.MessageDigest

val cleanWordRDD: RDD[String] = _ // Please refer to Lesson 1 for the complete code

val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
  // Get an instance of MD5 object
  val md5 = MessageDigest.getInstance("MD5")
  // Calculate the hash value using MD5
  val hash = md5.digest(word.getBytes).mkString
  // Return the Pair of hash value and number 1
  (hash, 1)
}

Since map(f) performs transformation at the element level, for every data record in the RDD, we need to instantiate a MessageDigest object to calculate the hash value of this element.

In industrial production systems, an RDD often contains millions or even billions of data records. If creating a MessageDigest object is required for each record, the cost of instantiating objects will accumulate and become the culprit affecting execution efficiency without us realizing.

So the question is, is there any way to allow Spark to process data at a coarser granularity? Indeed, mapPartitions and mapPartitionsWithIndex, the “twin brothers”, are designed to solve similar problems. Compared to mapPartitions, mapPartitionsWithIndex only has an additional data partition index, so we will focus on mapPartitions next.

mapPartitions: Data transformation at the data partition level #

Following the convention of introducing operators, let’s first talk about the usage of mapPartitions. As the name suggests, mapPartitions is used to perform data transformation on RDDs at the data partition level using a mapping function f. Let’s take a look at how to use mapPartitions to improve execution performance by combining the example of word hash value counting with the subsequent code.

// Convert a regular RDD to a Paired RDD

import java.security.MessageDigest

val cleanWordRDD: RDD[String] = _ // Please refer to Lesson 1 for the complete code

val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
  // Note! Here we are getting an instance of MD5 object at the data partition level
  val md5 = MessageDigest.getInstance("MD5")
  val newPartition = partition.map(word => {
    // When processing each data record,
    // you can reuse the MD5 object within the same Partition
    (md5.digest(word.getBytes()).mkString, 1)
  })
  newPartition
})

The improvement code above shows that mapPartitions transforms the RDD at the level of data partitions (the anonymous function's parameter partition). The specific data processing logic is then further invoked by the parameter partition, which represents the data partition, to complete the mapping. You may say, "partition.map(f) is still mapping at the level of elements! What is the fundamental difference compared to the previous version?"

If you observe carefully, you will find that compared to the previous version, we move the instantiation of the MD5 object outside the map operator. In this way, the operation of instantiating objects only needs to be performed once at the level of data partitions, and all data records within the same data partition can share the MD5 object to complete the conversion from words to hash values.

By comparing the following intuitive diagram, you will find that mapPartitions only needs to instantiate the MD5 object once at the level of data partitions, while the map operator needs to instantiate it multiple times, and the exact number depends on the number of data records within the partition.

![Image](../images/c76be8ff89f1c37e52e9f17b66bf398d.jpg)

For an RDD with millions of records, the partitioning of data partitions is often in the range of hundreds. Therefore, compared to the map operator, mapPartitions can significantly reduce the computational overhead of object instantiation, which is undoubtedly very friendly for the end-to-end performance of Spark jobs.

In fact, in addition to computing hash values, for data records, any operations that can be shared can be optimized using the mapPartitions operator. There are many such shared operations, such as creating Connections objects for connecting to remote databases, or file system handles for connecting to Amazon S3, or machine learning models for online inference, and so on. You can organize the shared operations you encounter in actual work scenarios in the comments section and share your thoughts.

Compared to mapPartitions, mapPartitionsWithIndex only adds an additional data partition index. This data partition index allows us to obtain the partition number when the partition number is needed in your business logic. Consider using this operator to implement your code when you need to use the partition number. Except for this additional partition index, mapPartitionsWithIndex is exactly the same as mapPartitions in other aspects.

After introducing the map and mapPartitions operators, let's take a look at another operator, flatMap, which is similar in functionality to the previous two: flatMap.

### **flatMap: From element to collection, and from collection to element**

Actually, flatMap is similar to the map and mapPartitions operators. In terms of functionality, like map and mapPartitions, flatMap is also used for data mapping. In terms of implementation, for a given mapping function f, flatMap(f) transforms RDD at the element level.

However, compared to the previous two, the mapping function f of flatMap has a significant difference. For map and mapPartitions, the type of the mapping function f is (element) => (element), that is, from element to element. The type of the flatMap mapping function f, on the other hand, is (element) => (collection), that is, from element to collection (such as an array, list, etc.). Therefore, the mapping process of flatMap can be logically divided into two steps:

- Creating a collection at the element level;
- Removing the "wrapper" of the collection and extracting the collection elements.

This explanation may be a bit abstract, so let's use an example for clarification. Suppose we once again change the Word Count calculation logic from counting individual words to counting the co-occurrence of adjacent words, as shown in the figure below:

![Image](../images/b9feyy652bb60d7d30a25e3e122a9f6a.jpg)

How do we use flatMap to implement this calculation logic? Here's the code implementation first, then we'll analyze the flatMap mapping process in stages:

// Read file content
val lineRDD: RDD[String] = _ // Please refer to the complete code in the first lecture
// Extract adjacent words at the line level
val wordPairRDD: RDD[String] = lineRDD.flatMap(line => {
  // Convert lines to word arrays
val words: Array[String] = line.split(" ")
// Convert an array of individual words into an array of adjacent word pairs
for (i <- 0 until words.length - 1) yield words(i) + "-" + words(i + 1)
})

In the above code, we use an anonymous function to provide the mapping function f. Here, the parameter of f is a string, which is a line of text from the source file, and the return type of f is an array of strings. In the body of the mapping function f, we first use the split statement to convert the line into an array of words, and then use a for loop combined with the yield statement to transform each individual word into an adjacent word pair.

Note that the for loop returns an array, which is an array of string pairs. Therefore, the type of function f is (String) => (Array[String]), which is the first step we mentioned earlier, from element to collection. However, if we observe the two RDDs before and after the transformation, namely lineRDD and wordPairRDD, we will find that their types are both RDD[String], which means their element types are both strings.

Looking back at the map and mapPartitions operators, we can see that the element types of the RDD before and after the transformation are consistent with the types of the mapping function f. However, in flatMap, we have a situation where the element type of RDD is not consistent with the function type. What’s going on here? In fact, this is the “mystery” of flatMap. In order to help you understand the mapping process of flatMap intuitively, I drew a diagram as shown below:

Image

It is not difficult to see that the calculation process of the mapping function f corresponds to steps 1 and 2 in the diagram, where each line of text is transformed into an array containing adjacent word pairs. Then, flatMap removes the “packaging” of each array, extracts the string type word pairs in the array, and then builds a new data partition based on the word pairs, as shown in step 3 of the diagram. This is the second step of the flatMap mapping process: remove the “packaging” of the collection and extract the collection elements.

After obtaining the wordPairRDD containing the word pair elements, we can continue to use the subsequent logic of Word Count to calculate the co-occurrence frequency of adjacent words. You may try to implement the complete version of “Adjacent Word Count” by combining the code in the document with the Word Count code in the first lecture.

filter: Filtering RDD #

Finally, let’s learn another commonly used operator, filter, which is similar to map. As the name suggests, the filter operator is used to filter an RDD. Just like the map operator relies on its mapping function, the filter operator also needs a predicate function f to achieve the filtering transformation of the RDD.

The so-called predicate function refers to a function of type (RDD element type) => (Boolean). As you can see, the parameter type of the predicate function f must be consistent with the element type of RDD, and the return result of f can only be True or False. When filter(f) is called on any RDD, its purpose is to keep the data elements in the RDD that satisfy f (i.e., f returns True), and filter out the data elements that do not satisfy f (i.e., f returns False).

As usual, we will use an example to explain the filter operator and the predicate function f.

After the last example of flatMap, we obtained the wordPairRDD with elements as adjacent word pairs, which includes strings like “Spark-is” and “is-cool”. In order to retain meaningful word pair elements, we want to filter the wordPairRDD based on a list of punctuation marks. For example, we want to filter out word pairs like “Spark-&” and “|-data”.

Once you have learned how to use the filter operator, you will be able to implement the filtering logic like this quickly:

// Define a list of special characters
val list: List[String] = List("&", "|", "#", "^", "@")

// Define the predicate function f
def f(s: String): Boolean = {
  val words: Array[String] = s.split("-")
  val b1: Boolean = list.contains(words(0))
  val b2: Boolean = list.contains(words(1))
  !b1 && !b2 // Return word pairs not in the list of special characters
}

// Use filter(f) to filter the RDD
val cleanedPairRDD: RDD[String] = wordPairRDD.filter(f)

Once you have mastered the usage of the filter operator, you can define arbitrary complex predicate functions f, and then use filter(f) on the RDD to perform data filtering in various ways to meet different business requirements.

Key Review #

Alright, that’s it for RDD data transformation operators. Let’s summarize what we’ve learned. In today’s lesson, you need to understand the purposes and specific usage of map, mapPartitions, flatMap, and filter.

Firstly, we talked about the usage of the map operator. It allows developers to freely transform RDD data in various ways. Given a mapping function f, map(f) performs data transformation on the RDD at the element level. The function f can be a named function or an anonymous function, and its parameter type must match the element type of the RDD, while the output type is determined by the developer.

To improve the efficiency of data transformation, Spark provides the mapPartitions operator, which operates on data partitions. The argument of mapPartitions is the partition that represents the data partition. It completes data transformation by calling map(f) again on the partition. Compared to map, the advantage of mapPartitions is the ability to initialize shared objects at the granularity of data partitions. These shared objects are common in our daily development, such as database connection objects, S3 file handles, machine learning models, etc.

Next, we introduced the flatMap operator. The mapping function f of flatMap is special. Its function type is (element) => (collection), where the collection refers to data structures like arrays and lists. Therefore, the mapping process of flatMap is logically divided into two steps, and you should pay special attention to this:

  • Create collections at the element level;
  • Remove the “wrapper” of the collection and extract the collection elements.

Finally, we learned the filter operator. The usage of the filter operator is similar to map. It requires a judgement function f to filter the data of the RDD. The type of the judgement function must be (RDD element type) => (Boolean), which means the parameter type must match the element type of the RDD, and the return type must be a boolean. Whether an element in the RDD can be retained depends on whether the return value of the judgement function f is True or False.

Although we only learned 4 operators today, these 4 operators are very commonly used in daily development. With a good understanding of these simple RDD operators, you can handle almost 90% of data transformation scenarios in RDD. I hope you practice these operators more so that you can apply them in your daily development work.

Practice after Each Lesson #

After finishing the main lesson, I have 3 questions for you to think about:

  1. Please learn about the mapPartitionsWithIndex transformation operator based on the official documentation. Can you discuss in which scenarios this operator might be used?

  2. For the four operators we learned today, along with mapPartitionsWithIndex, can you identify any similarities or common characteristics among them?

  3. Can you mention any shared objects that can be initialized within the mapPartitions function in your daily work?

Feel free to answer these practice questions in the comment section. You can also share this lesson with more friends or colleagues and discuss together. Communication is a catalyst for learning. I’ll be waiting for you in the comments.