07 Common Rdd Operators Ii How Spark Achieves Data Aggregation

07 Common RDD Operators II - How Spark Achieves Data Aggregation #

Hello, I am Wu Lei.

After accumulating a certain theoretical foundation, today we continue to learn about common RDD operators. In the [Part 1 of Common RDD Operators], we discussed four operators: map, mapPartitions, flatMap, and filter. At the same time, we left a question to think about: “What are the common characteristics among these operators?”

Today, we will reveal the answer. First of all, in terms of functionality, these four operators are used for data transformation within RDD. After learning about the working principle of Shuffle, we can easily find that among these four operators, none of them introduce Shuffle computation.

However, the operators we are going to learn today are the opposite; they all introduce heavy Shuffle computation. These operators are groupByKey, reduceByKey, aggregateByKey, and sortByKey, which are the bolded parts in the table.

We know that in the data analysis scenario, the typical computation types are grouping, aggregation, and sorting. The functionalities of groupByKey, reduceByKey, aggregateByKey, and sortByKey are exactly used to implement the computation logic for grouping, aggregation, and sorting.

Image

Although these operators seem to have a narrower range of application compared to other operators, that is, they can only be applied to Paired RDD, which refers to RDDs with element types of (Key, Value) key-value pairs.

But in terms of functionality, it can be said that they undertake most of the responsibilities in the data analysis scenario. Therefore, mastering the usage of these operators is an important foundation for us to develop data analysis applications proficiently. Next, we will familiarize ourselves with and learn the usage of these operators through several examples.

Let’s start with groupByKey. Honestly, compared to the other three operators, groupByKey does not have a high “appearance rate” in our daily development. The reason for introducing it first is mainly to pave the way for the two important operators: reduceByKey and aggregateByKey that will be discussed later.

groupByKey: Grouping and Collecting #

The literal meaning of groupByKey is “grouping by key”, but in reality, the groupByKey operator consists of two steps: grouping and collecting.

Specifically, for a Paired RDD with elements of the type (Key, Value), the groupByKey function groups elements with the same key and collects the corresponding values into a collection. In other words, groupByKey transforms the type of RDD from RDD[(Key, Value)] to RDD[(Key, Collection of Values)].

To better illustrate the usage of groupByKey, let’s use a small example. We will continue with the well-known Word Count problem, but instead of counting the occurrences of each word, we only want to collect the words that are the same. So how do we do that? As usual, let’s start by providing the implementation:

import org.apache.spark.rdd.RDD

// Tokenize the lines
val cleanWordRDD: RDD[String] = _ // Please refer to the Word Count in Lecture 1 for the complete code
// Map the regular RDD to a Paired RDD
val kvRDD: RDD[(String, String)] = cleanWordRDD.map(word => (word, word))

// Group and collect the words
val words: RDD[(String, Iterable[String])] = kvRDD.groupByKey()

Looking at the code above, we only need to make two small changes compared to the previous Word Count implementation in order to achieve the new logic. The first change is in the mapping function f of the map operator. Instead of word => (word, 1), we change it to word => (word, word). This change transforms both the key and value of each element in the kvRDD to be the same word.

The second change is that we replace reduceByKey with groupByKey. Compared to reduceByKey, the usage of groupByKey is much simpler. groupByKey is a parameterless function, and to group and collect a Paired RDD, we only need to call groupByKey() on the RDD.

Although the usage of groupByKey is simple, its calculation process deserves special attention. I will use a diagram to explain the calculation process of the code above, so that you can have a more intuitive understanding of the potential performance issues of groupByKey.

Image

From the diagram, we can see that in order to complete the grouping and collecting process, Spark needs to perform a Shuffle operation to distribute the original data records with the same key but scattered across different data partitions to the same data partition. As we mentioned in Lecture 6, Shuffle is a resource-intensive computation. For RDDs with millions or even billions of data records, such Shuffle computations can incur a large amount of disk I/O and network I/O overhead, seriously impacting the execution performance of the job.

Although the efficiency of groupByKey is relatively poor, its usage in application development is not very common. The reason is simple: in the field of data analysis, the use cases for grouping and collecting are few, and group aggregation is the real demand for statistical analysis.

To meet the diverse calculation needs of group aggregation, Spark provides three RDD operators that allow developers to implement the calculation logic flexibly. These operators are reduceByKey, aggregateByKey, and combineByKey.

We are already familiar with reduceByKey; we used it in the Word Count implementation in Lecture 1. aggregateByKey is an “upgraded version” of reduceByKey. Compared to reduceByKey, aggregateByKey is more flexible in usage and supports more functions.

Next, let’s review reduceByKey and then expand on aggregateByKey. As for combineByKey, it only differs in the initialization method, so I’ll leave it to you as homework to explore.

reduceByKey: Grouping Aggregation #

The literal meaning of reduceByKey is “aggregation based on Key values”. Its calculation logic is to aggregate multiple elements with the same Key value into one element based on the aggregation function f.

In the implementation of Word Count in [Lesson 1], we used reduceByKey to achieve grouping and counting:

// Convert RDD elements to the form of (Key, Value)
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// Group and count by word
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x: Int, y: Int) => x + y)

Reviewing the above code, do you notice any similarities between reduceByKey and the previous map, filter operators? Yes, given the processing function f, their usage is all “operator(f)”. However, for map, we call f a mapping function, for filter, we call f a predicate function, and for reduceByKey, we call f an aggregation function.

In the code example above, the aggregation function of reduceByKey is an anonymous function: (x, y) => x + y. Just like the usage of map, filter, etc., you can also explicitly define a named function f and then use reduceByKey(f) to achieve the same calculation logic.

It should be emphasized that given RDD[(Key type, Value type)], the type of the aggregation function f must be (Value type, Value type) => (Value type). In other words, the parameters of function f must be two values, and the types of the values must be the same as the type of Value. The return value of f must also be a value of type Value.

Let’s take another example to deepen your understanding of the reduceByKey operator.

Next, let’s change the calculation logic of Word Count to randomly assign values and extract the maximum value for the same Key. In the process of generating kvRDD, instead of using the mapping function word => (word, 1), we will change it to word => (word, random number), and then use the reduceByKey operator to calculate the maximum random number for each word.

You can take a moment to think about how to implement this logic and then refer to the code below:

import scala.util.Random._

// Convert RDD elements to the form of (Key, Value)
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, nextInt(100)))

// Explicitly define the aggregation function f to extract the maximum value
def f(x: Int, y: Int): Int = {
  return math.max(x, y)
}

// Extract the maximum value for each word
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey(f)

Observe the code snippet above, you can easily find that the usage of the reduceByKey operator is quite simple. You just need to define the aggregation function f, and then pass it to the reduceByKey operator. So, what is the calculation process of the above code at runtime?

I have abstracted the calculation process of reduceByKey into the following figure:

Image

From the figure, you can see that although reduceByKey also introduces shuffle, compared to groupByKey, which consumes disk and network resources by processing the entire original data record, reduceByKey will perform preliminary aggregation calculation in the Map phase of Shuffle, before persisting and distributing data.

For example, in the processing of data partition 0, in the Map phase, reduceByKey aggregates two data records with the same Key as Streaming into one record. The aggregation logic is defined by the function f and takes the value that is larger between the two records. We call this process “Map-side Aggregation”. Correspondingly, after the data is distributed over the network, the calculation completed in the Reduce phase is called “Reduce-side Aggregation”.

You might say, “What’s the big deal with Map-side Aggregation? The performance improvements brought by reduceByKey are not significant compared to groupByKey!” Indeed, based on the above schematic diagram, it is difficult for us to perceive the performance improvement brought by reduceByKey. However, when dealing with large-scale industrial-level massive data, reduceByKey can often increase the execution efficiency by at least double by greatly reducing the amount of data that needs to be persisted and distributed in the Map phase.

It should be said that for most grouping & aggregation calculation requirements, as long as you design the aggregation function f properly, you can use reduceByKey to implement the calculation logic. However, everyone has their own areas of expertise. The limitation of the reduceByKey operator lies in that its calculation logic must remain consistent between the Map phase and the Reduce phase, and this calculation logic is defined by the aggregation function f. When a calculation scenario requires different calculation logic in the two phases, reduceByKey is no longer useful.

For example, let’s still use the Word Count example from Lesson 1, and adjust the calculation logic of word counting as follows:

  • In the Map phase, calculate the sum of the words on a per data partition basis;
  • In the Reduce phase, for the same word, take the maximum sum.

Obviously, the calculation logic in the Map phase is sum, and the calculation logic in the Reduce phase is max. reduceByKey is useless for this business requirement. At this time, it’s the perfect time for the aggregateByKey operator to shine.

aggregateByKey: A More Flexible Aggregation Operator #

As usual, let’s start with the usage of the operator. Compared to other operators, the aggregateByKey operator has more parameters. To use aggregateByKey on a Paired RDD, you need to provide an initial value, a Map-side aggregation function f1, and a Reduce-side aggregation function f2. The calling form of aggregateByKey is as follows:

val rdd: RDD[(Key type, Value type)] = _
rdd.aggregateByKey(initial value)(f1, f2)

The initial value can be any numerical value or string, and we are familiar with the aggregation functions as well. They are ordinary functions with two parameters and one output result. Regarding these three parameters, what is more confusing is that their types need to be consistent. Specifically:

  • The type of the initial value must be consistent with the result type of f2.
  • The parameter types of f1 must be consistent with the Value type of the Paired RDD.
  • The parameter types of f2 must be consistent with the result type of f1.

It is difficult to describe the consistency between different types. Let’s try to deepen our understanding by combining the diagram:

图片

After familiarizing ourselves with the usage of aggregateByKey, next we will use this operator to implement the calculation logic of “first addition, then maximum value”. The code implementation is as follows:

// Convert RDD elements into (Key, Value) format
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// Explicitly define the Map-side aggregation function f1
def f1(x: Int, y: Int): Int = {
  return x + y
}

// Explicitly define the Reduce-side aggregation function f2
def f2(x: Int, y: Int): Int = {
  return math.max(x, y)
}

// Call aggregateByKey to perform the first addition and then calculate the maximum value
val wordCounts: RDD[(String, Int)] = kvRDD.aggregateByKey(0) (f1, f2)

How’s that? It’s easy, right? According to the calculation logic requirements, we only need to define the two aggregation functions in advance, while ensuring the consistency of the parameter types. Then, pass the initial value and aggregation functions into the aggregateByKey operator. As usual, let’s use the runtime calculation process of aggregateByKey to help you understand the working principle of the operator:

图片

It is not difficult to find that the execution process of aggregateByKey is no different from reduceByKey at runtime. The main difference lies in whether the aggregation logic of the Map-side and Reduce-side is consistent. It is worth mentioning that, just like reduceByKey, aggregateByKey can also greatly reduce the amount of data through preliminary aggregation on the Map-side. This can reduce disk and network overhead while improving the execution performance of the Shuffle phase.

sortByKey: Sorting #

In the last section, let’s talk about the sortByKey operator. As the name suggests, its function is to “sort by Key”. Given a Paired RDD containing (Key, Value) pairs, sortByKey sorts the RDD based on the Key. The usage of this operator is quite simple, just call sortByKey() on the RDD:

val rdd: RDD[(Key type, Value type)] = _
rdd.sortByKey()

By default, sortByKey sorts the RDD in ascending order based on the Key. If you want to sort the RDD in descending order, you need to pass false to sortByKey. In summary, here are the rules for sorting:

  • Ascending order: Call sortByKey() or sortByKey(true).
  • Descending order: Call sortByKey(false).

Key Highlights #

In today’s lecture, we introduced four commonly used operators in data analysis scenarios: groupByKey, reduceByKey, aggregateByKey, and sortByKey. Understanding the usage and principles of these operators will lay a solid foundation for developing data analysis applications with ease.

Regarding these operators, you first need to understand their commonalities. Firstly, all four operators apply to paired RDDs, and secondly, they all involve Shuffle during computation. Shuffle often becomes the bottleneck for Spark job performance, so when using these operators, it is important to be aware of the potential performance issues they may bring.

Furthermore, you need to grasp the specific usage and workings of each operator. groupByKey is a parameterless operator, and you only need to call groupByKey() on the RDD to group and collect the dataset. However, it is important to note that writing the full original data records to disk in the cluster and distributing them over the network will result in significant performance overhead. Therefore, unless necessary, you should try to avoid using the groupByKey operator.

Using an aggregation function f, reduceByKey can perform preliminary aggregation on the map side, greatly reducing the amount of data that needs to be written to disk and distributed. This, in turn, can significantly improve the execution efficiency of Shuffle. For the majority of grouping and aggregation computation needs, as long as the aggregation function f is well-designed, reduceByKey can handle the business logic. However, reduceByKey does have its limitations, namely that the computation logic in the map and reduce stages must remain consistent.

For cases where the computation logic for map-side and reduce-side aggregation is inconsistent, aggregateByKey is suitable. The usage of aggregateByKey is aggregateByKey(initial value)(map-side aggregation function, reduce-side aggregation function). For the three parameters of aggregateByKey, you need to ensure their consistency in types. Once type consistency is satisfied, you can flexibly define two aggregation functions to perform various types of data analysis.

Lastly, for sorting computation needs, you can use sortByKey. sortByKey supports two sorting methods. By default, sortByKey() sorts in ascending order based on the key values. The effect of sortByKey() is the same as sortByKey(true). If you want to sort in descending order, you just need to call sortByKey(false).

So far, we have covered the first two categories of commonly used RDD operators, namely data transformation and data aggregation. It can be said that in daily development work, the majority of business requirements can be implemented using these operators.

Therefore, congratulations! It is not an exaggeration to say that by learning up to this point, you have already set foot in the doorway of Spark distributed application development. However, we should not be complacent. There is still some distance between “learning” and “mastery”. In the coming days, I look forward to continuing to work together with you to truly understand Spark and master Spark!

Practice for Each Lesson #

This lesson is coming to an end, and today’s practice question is as follows:

After learning reduceByKey and aggregateByKey, can you explain the relationship between the two? Can you use aggregateByKey to achieve the same functionality as reduceByKey?

You are welcome to share your answers. If this lesson has been helpful to you, please feel free to share it with your friends and discuss the practice question of this lesson together. See you in the next lesson.