10 Broadcast Variables Accumulators, Shared Variables for What Purpose

10 Broadcast Variables - Accumulators, Shared Variables for What Purpose #

Hello, I am Wu Lei.

Today is the first day of the National Day, so first of all, I wish you a happy holiday. Since the launch of this column, many students have left comments saying that they are looking forward to the follow-up content. Therefore, during the National Day, we will continue to update the main content so that we can wrap up the basic knowledge module together.

After learning about the commonly used operators of RDD, when we review these operators, we will find that they all take effect (apply) on RDDs. The computation of RDDs is based on data partitioning. According to the logic of the operators, Executors complete the calculation and transformation of different data partitions independently in a mutually exclusive manner.

It is not difficult to see that for Executors, the data in each partition is local data. In other words, at the same time, the data partition belonging to a specific Executor is not visible to other Executors.

However, in application development, there will always be some calculation logic that needs to access “global variables”, such as global counters, and these global variables are visible and shared by all Executors at any time. So, how does Spark support such global variables, or shared variables?

Today, I am going to talk to you about Spark’s shared variables. According to the different creation and usage methods, Spark provides two types of shared variables, which are broadcast variables and accumulators. Next, let’s formally enter today’s study and delve into the usage and respective application scenarios of these two shared variables.

Broadcast Variables #

First, let’s talk about broadcast variables. The usage of broadcast variables is simple. Given a regular variable x, we can create a broadcast variable by calling the broadcast API under the SparkContext. Let’s see an example with some code.

val list: List[String] = List("Apache", "Spark")

// sc is an instance of SparkContext
val bc = sc.broadcast(list)

In the code example above, we first define a string list called list, which contains the words “Apache” and “Spark”. Then, we use the broadcast function to create a broadcast variable bc, which encapsulates the content of the list list.

// Access the content of the broadcast variable
bc.value
// List[String] = List(Apache, Spark)

// Access the list content directly
list
// List[String] = List(Apache, Spark)

After creating the broadcast variable using the broadcast API, we can access the encapsulated data content by calling its value function. As you can see, the effect of calling bc.value is exactly the same as directly accessing the string list list.

At this point, you may ask, “Why do we need to encapsulate the string list in a broadcast variable and then access it through its value function, when we can simply access the list variable directly to get the string list?” This is indeed a very good question. To answer this question, let’s make a deduction and see what drawbacks we might encounter if we directly access the list variable.

In the previous few lessons, we varied the Word Count calculation logic in different ways. Although we have “overused” the Word Count example, consistently using the same instance helps us quickly grasp new knowledge and skills by comparing them. Therefore, to quickly grasp the “essence” of broadcast variables, let’s “reuse” our previous skills and continue working on the Word Count example.

The Pain Points of Regular Variables #

This time, to compare the differences before and after using broadcast variables, we will change the Word Count into “Selective Counting”.

Selective counting means that we only count certain words, such as “Apache” and “Spark” in the file wikiOfSpark.txt, while ignoring other words. Combining the complete code of the Word Count shown in Lesson 1, we can easily implement this calculation logic as shown in the table below.

import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// Read the file content
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// Tokenize the lines
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))

// Create a word list
val list: List[String] = List("Apache", "Spark")
// Filter the RDD using the list
val cleanWordRDD: RDD[String] = wordRDD.filter(word => list.contains(word))
// Convert RDD elements to (Key, Value) pairs
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// Group and count the words
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// Get the calculation result
  wordCounts.collect
  // Array[(String, Int)] = Array((Apache, 34), (Spark, 63))

When the above code is run in the spark-shell, we can quickly calculate that the word “Apache” appears 34 times in the file wikiOfSpark.txt, while “Spark” appears 63 times. Although it is easy to obtain the calculation result, it is important to understand how this code works at runtime.

Image

As shown in the above figure, the list variable itself is created in the Driver, and it is not part of the distributed dataset (such as lineRDD and wordRDD). Therefore, in the process of distributed computing, Spark needs to distribute the list variable to each distributed task (Task) to filter the contents of different data partitions.

In this working mechanism, if the RDD has a high parallelism or the size of the variable is large, the repetitive content distribution will introduce a large amount of network and storage overhead, which will significantly weaken the performance of the job. Why is this?

You see, the distribution of the Driver variable is based on the granularity of the Task. The more Tasks there are in the system, the more times the variable needs to be distributed in the network. What’s more, after receiving the variable, each Task needs to temporarily store it in memory for subsequent filtering. In other words, within the same Executor, multiple different Tasks repeatedly cache the same content copy. Undoubtedly, this is a huge waste of precious memory resources.

If the RDD has a high parallelism, it means that the number of data partitions of the RDD is large, and the number of Tasks is the same as the number of partitions, which means that there are a large number of distributed tasks to be executed in the system. If the size of the variable itself is large, the network and memory overhead introduced by a large number of distributed tasks will be further upgraded. In industrial-level applications, the parallelism of RDD is often in the order of thousands or tens of thousands. In this case, variables such as list will be distributed in the network tens of thousands of times, and the overall execution efficiency of the job will naturally be poor.

Faced with this predicament, do we have any way to avoid the duplication of distribution and storage of the same variable? The answer is definitely yes. At this time, we can use broadcast variables as the “trump card”.

Advantages of Broadcast Variables #

To understand the advantages of broadcast variables, let’s rewrite the previous code implementation using broadcast variables, and then make a comparison. We can easily find out why broadcast variables can solve the pain points of ordinary variables.

import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// Read file content
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// Split by lines
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))

// Create a word list
val list: List[String] = List("Apache", "Spark")
// Create a broadcast variable bc
val bc = sc.broadcast(list)
// Filter the RDD using bc.value
val cleanWordRDD: RDD[String] = wordRDD.filter(word => bc.value.contains(word))
// Convert the RDD elements into (Key, Value) pairs
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// Count the words by grouping
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// Get the calculation result
wordCounts.collect
// Array[(String, Int)] = Array((Apache,34), (Spark,63))

As you can see, the modification of the code is very simple. We first use the broadcast function to wrap the list variable, and then call bc.value to access the content of the list variable when filtering the RDD. Although the modification of the code is minimal and almost negligible, the entire computation process has undergone a radical change during runtime.

Image

Before using the broadcast variable, the distribution of the list variable is based on the granularity of the Task. After using the broadcast variable, the granularity of variable distribution becomes the Executors. Multiple different Tasks within the same Executor only need to access the same copy of the data. In other words, the number of times the variable is distributed and stored in the network is reduced from the number of RDD partitions to the number of Executors in the cluster.

You should know that in industrial-level systems, the number of Executors usually differs from the parallelism of RDD by at least two orders of magnitude. In such a scale, the savings in network and memory overhead brought by broadcast variables can be very significant. By eliminating these overheads, the overall execution performance of the job is naturally improved.

Alright, up to now, we have explained the usage and working principle of broadcast variables, as well as their advantages. In your daily development work, when you need multiple Tasks to share the same large variable (such as lists, arrays, mappings, and other data structures), you can consider using broadcast variables to optimize your Spark job. Next, we will continue to talk about the second type of shared variables supported by Spark: accumulators.

Accumulators #

As the name suggests, accumulators are mainly used for global counting in a distributed system. Unlike in a single-machine system, we cannot rely on simple ordinary variables to achieve global counting in a distributed system. Instead, we must rely on special data structures like accumulators to achieve this goal.

Similar to broadcast variables, accumulators are also defined on the Driver side, but their updates are done by calling the add function in RDD operators. After the application finishes running, developers can call the value function of the accumulator on the Driver side to obtain the global counting result. As usual, let’s familiarize ourselves with the usage of accumulators through code examples.

Smart as you are, you may have guessed that we are once again going to “work on” the Word Count. In the Word Count in Lesson 1, we filtered out empty strings and then counted the occurrences of all words in the file wikiOfSpark.txt.

However, this time, we not only want to filter out empty strings, but also want to know how many empty strings there are in the file, so that we have a clear understanding of the “dirty data” in the file.

Note that the counting of empty strings here is not the main logic of the code, and its calculation result will not be written into the final statistical result of the Word Count. So simply removing the filter step cannot achieve the counting of empty strings.

So you may naturally ask, “If we don’t remove the filter step, how can we count empty strings?” Don’t worry, this kind of calculation requirement is exactly where accumulators can come in handy. You can take a glance at the code implementation below, and then we’ll get familiar with the usage of accumulators together.

import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// Read the contents of the file
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// Split the lines into words
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))

// Define a long accumulator
val ac = sc.longAccumulator("Empty string")

// Define the judging function f for the filter operator, note that the return type of f must be Boolean
def f(x: String): Boolean = {
    if(x.equals("")) {
        // Increment the accumulator by 1 when an empty string is encountered
        ac.add(1)
        return false
    } else {
        return true
    }
}

// Use f to filter the RDD
val cleanWordRDD: RDD[String] = wordRDD.filter(f)
// Convert RDD elements into (Key, Value) format
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// Count by grouping the words
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// Collect the counting results
wordCounts.collect

// After the job is finished, call the value function to obtain the accumulator result
ac.value
// Long = 79

Compared with the Word Count in Lesson 1, there are four main changes in the code above:

  • Use longAccumulator under SparkContext to define a long type of accumulator.
  • Define the judging function f for the filter operator, and increment the accumulator by 1 when an empty string is encountered.
  • Use the filter operator with f as the argument to filter the RDD.
  • After the job is finished, call the value function of the accumulator to obtain the global counting result.

You may enter the above code into the spark-shell to directly experience the usage and effect of accumulators. The result of ac.value is 79, which means that after splitting the source file wikiOfSpark.txt with a space as the delimiter, there are 79 empty strings left.

In addition, you can also verify the RDD wordCounts, which contains the counting results of all words. However, you will find that its elements do not include empty strings, which is consistent with our expected logical calculation.

In addition to the longAccumulator used in the code above, SparkContext also provides doubleAccumulator and collectionAccumulator, two different types of accumulators used to meet different calculation needs. If you are interested, you can try them out yourself.

Among these three types of accumulators, although the types are different, their usage is exactly the same: first define the accumulator variable, then update the accumulator state by calling the add function in RDD operators, and finally obtain the final result of the accumulator by calling the value function.

Well, we have finished explaining the usage of accumulators here. In your daily development, when you encounter scenarios that require global counting, don’t forget to use this practical tool called accumulators.

Key Review #

Today’s content is finished, let’s summarize together. In today’s lecture, we focused on the usage and applicable scenarios of broadcast variables and accumulators.

Broadcast variables are defined and initialized by the Driver, and Executors access the data carried by broadcast variables in read-only mode. Similarly, accumulators are also defined by the Driver, but the Driver does not write any data into accumulators. The content of the accumulator is updated only by Executors in write-only mode, and the Driver only accesses the updated content in read-only mode.

Regarding broadcast variables, you first need to understand their basic usage. You can use the broadcast API under SparkContext to create broadcast variables with any type of ordinary variable. Next, during the transformation and computation of RDD, you can access the encapsulated data content by calling the value function of the broadcast variable, thereby assisting in the data processing of RDD.

It should be noted that the distribution and storage of ordinary variables between the Driver and the Executors are based on the granularity of the Task. Therefore, the network and memory overhead introduced by it can become a major hidden danger to the performance of job execution. In the case of using broadcast variables, the distribution granularity of data content becomes Executors. Compared with the former, the advantages of broadcast variables are obvious. It can greatly eliminate the network and memory overhead introduced by the former, thereby improving the overall execution efficiency of the job.

Regarding accumulators, first you need to understand their applicable scenarios. When you need to do global counting, accumulators will be a good helper. Secondly, you need to master the specific usage of accumulators, which can be divided into the following 3 steps:

  1. Use [long | double | collection]Accumulator under SparkContext to define accumulators;
  2. During the transformation process of RDD, use the add function to update the accumulator state;
  3. After the job is completed, call the value function to retrieve the global result of the accumulator.

Practice for Each Lesson #

  1. In the code that uses an accumulator to count empty strings globally, please try replacing the accumulator with a regular variable to see if you can achieve the expected calculation result without using the accumulator.

  2. The accumulator provides support for three types: Long, Double, and Collection. Are there any restrictions on the types supported by the broadcast variable? Apart from regular types and collection types, does the broadcast variable support any other types? For example, does Spark support creating a broadcast variable on RDD?

Feel free to interact with me in the comments section and share this lesson with your friends around you. It might help them solve a difficult problem.