12 Broadcast Variables How to Counter Shuffle in One Move

12 Broadcast Variables - How to Counter Shuffle in One Move #

Hello, I am Wu Lei.

In the field of data analysis, data correlation (Joins) and Shuffle operations go hand in hand. Where there are Joins, there is Shuffle.

As we have mentioned before, when facing Shuffle, developers should “minimize and delay” it as much as possible. We have already discussed how to delay it, which means dragging the operations that introduce Shuffle to the later computational steps as much as possible. So how can we minimize it specifically?

In the data correlation scenario, broadcast variables can easily eliminate Shuffle. So in today’s lecture, let’s first talk about the meaning and purpose of broadcast variables, and then discuss how they help developers avoid Shuffle operations.

How to Understand Broadcast Variables? #

Next, let’s use a small example to explain the meaning and purpose of broadcast variables. This example is related to Word Count, which can be said to be the “Hello world” in distributed programming. Word Count is used to count all the words in a file, which you are probably very familiar with. Therefore, in our example, we add a bit of complexity to the requirement: we need to count the given words in a specified list.

val dict = List("spark", "tune")
val words = spark.sparkContext.textFile("~/words.csv")
val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

According to this requirement, student A implemented the above code, which consists of 4 lines. Let’s go through it one by one. In the first line, we define the list dict of words to look for on the Driver. In the second line, we use the textFile API to read a distributed file that contains a column with common words. In the third line, we filter the content of the distributed file to keep only the words in dict. In the fourth line, we use reduceByKey to accumulate and count the words.

image

After learning about the scheduling system, we know that the list dict defined in the first line, along with the following 3 lines of code, will be packaged into tasks. At this point, the tasks are like small planes carrying this “luggage” and flying to different Executors in the cluster. For this “luggage”, the “weight” of the code is relatively light and can be ignored, while the “weight” of the data becomes the main burden.

You might say, “Well, the dict list is not that big, so it’s not a big deal.” However, if we assume that the parallelism in this example is 10000, then the Driver needs to distribute 10000 copies of the dict. At this point, all Executors in the cluster will consume a large amount of memory to store these 10000 copies. For valuable network and memory resources, this is already a considerable waste. Moreover, if we replace it with a larger data structure, the network and memory overhead introduced by task distribution will be even more significant.

In other words, before the business logic of counting starts to run, Spark already consumes a large amount of network and storage resources, which is simply unreasonable. Therefore, we need to optimize the code in the example to overcome this predicament.

However, before we start the optimization, let’s think about what the current problems are and what our goals are. Combining the analysis just now, we can see that the core pain point of Word Count is that the distribution and storage of data structures are restricted by parallelism and occur too frequently at the granularity of tasks. Once the pain point is clear, the purpose of optimization becomes clear as well: we need to reduce the frequency of data structure distribution.

To achieve this goal, the first thing that comes to mind is to reduce the parallelism. However, once we adjust the parallelism, other configurations related to CPU and memory need to be adapted as well, which inevitably complicates the optimization. In fact, to reduce the frequency of data structure distribution, we can also consider using broadcast variables.

Broadcast variables are a distribution mechanism that encapsulates the target data structure and distributes it at the granularity of Executors. In other words, in the mechanism of broadcast variables, the frequency of data distribution is equal to the number of Executors in the cluster. Generally, the number of Executors in the cluster is much smaller than the number of Tasks, with a difference of several orders of magnitude. So, for the initial implementation of Word Count, what changes would there be if we use broadcast variables?

The changes in the code are simple. There are two main modifications: the first modification is to wrap the dict list in a broadcast variable, and the second modification is to use broadcast.value instead of accessing the dict list.

val dict = List("spark", "tune")
val bc = spark.sparkContext.broadcast(dict)
val words = spark.sparkContext.textFile("~/words.csv")
val keywords = words.filter(word => bc.value.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

You might say, “This modification doesn’t seem to do much!” Well, let’s analyze the changes in the code after the modification at runtime.

Under the mechanism of broadcast variables, the data encapsulated in the broadcast variables is distributed by the Driver at the granularity of Executors, and each Executor receives the broadcast variable and hands it over to the BlockManager for management. Since the data carried by broadcast variables has been stored in the BlockManager through a special channel, the tasks distributed to Executors do not need to carry the same data.

At this point, you can imagine broadcast variables as dedicated cargo aircraft that transport “large baggage” for tasks like small planes. There is a dedicated cargo route between the Driver and each Executor, which transports the “heavy data baggage” in a unified manner. With the help of the dedicated cargo aircraft, the tasks only need to carry the relatively light-weight code. When these tasks land on Executors, they can retrieve their “large baggage” from the shared warehouse BlockManager of the Executors.

image

In short, under the mechanism of broadcast variables, the number of times the dict list needs to be distributed and stored is greatly reduced. Let’s assume that there are 20 Executors in the cluster, and the task parallelism is still 10000. In this case, the number of copies of the dict list that the Driver needs to distribute through the network will be reduced from 10000 to 20. Likewise, the number of copies of the dict that all Executors in the cluster need to store will be reduced from 10000 to 20. At this point, the overhead introduced by using broadcast variables is only 1/500 of the original task distribution!

Broadcast Distributed Dataset #

In the previous example code, a broadcast variable encapsulated a normal variable created by the driver side: a string list. In addition to that, broadcast variables can also encapsulate distributed datasets.

Let’s take a look at an example. In the field of e-commerce development, developers often use fact tables to store transactional data and dimension tables to store descriptive data such as items and users. Fact tables are characterized by their large scale, and the amount of data grows rapidly with the development of the business. Dimension tables are much smaller in scale, and the changes in data volume are relatively stable.

Assume that user dimension data is stored in the HDFS file system in Parquet file format. The business department needs us to read the user data and create a broadcast variable for future use. How should we do it? It’s simple, just a few lines of code!

val userFile: String = "hdfs://ip:port/rootDir/userData"
val df: DataFrame = spark.read.parquet(userFile)
val bc_df: Broadcast[DataFrame] = spark.sparkContext.broadcast(df)

First, we use the Parquet API to read the HDFS distributed data file and generate a DataFrame. Then, we encapsulate the DataFrame using broadcast. From the code perspective, this implementation is not much different from encapsulating normal variables. They both call the broadcast API, but with different parameters.

However, if we shift our focus from a development perspective to observing the runtime process of creating broadcast variables, we will find that there are significant differences between distributed datasets and normal variables.

When creating a broadcast variable from a normal variable, the data source is on the driver side. Therefore, the driver only needs to distribute the data to all the executors and let the executors cache the data in the BlockManager.

But creating a broadcast variable from a distributed dataset is much more complicated, as shown in the figure below.

Compared to normal variables, the data source of distributed datasets is not on the driver side, but from all the executors. Each distributed task in the executors is responsible for producing a part of the full dataset, which corresponds to different data partitions in the figure. Therefore, Step 1 is the driver pulling these data partitions from all the executors and then building the full dataset locally. Step 2 is similar to the process of creating a broadcast variable from a normal variable. The driver distributes the aggregated full dataset to all the executors, and the executors cache the received full dataset in the storage system’s BlockManager.

It is not difficult to see that compared to creating a broadcast variable from a normal variable, creating a broadcast variable from a distributed dataset incurs higher network overhead. There are two main reasons for this: first, there is an additional step of network communication; second, the data volume in the former case is usually much larger than the latter.

How to Use Broadcast Variables to Constrain Shuffle? #

You may ask, “Why does the Driver pull data slices of DataFrame from Executors, mix them into a complete dataset, and then broadcast it? Apart from the network overhead, why go through all this trouble?” This is a good question because caching distributed datasets in the form of broadcast variables is the ultimate tool to constrain the shuffle killer.

Shuffle Joins #

Why do I say this? Let’s use an e-commerce scenario as an example. After obtaining user data, the business department requires us to associate the transaction table with the user table in order to analyze the shopping habits of different users. This type of data association requirement is quite common in the field of data analysis.

val transactionsDF: DataFrame = _
val userDF: DataFrame = _
transactionsDF.join(userDF, Seq("userID"), "inner")

Because the requirement is very clear, our colleague, Xia A, immediately calls the Parquet data source API to read the distributed file and create DataFrames for the transaction table and user table. Then, the DataFrame’s join method is called to complete the data association between the two tables using userID as the join key with an inner join.

In a distributed environment, in order to associate the transaction table and the user table using userID as the join key, one premise must be ensured: the transaction records and the corresponding user information should be in the same Executors. In other words, if user Huang Xiaoyi’s shopping information is stored in Executor 0, while personal attribute information is cached in Executor 2, in a distributed environment, these two types of information must be combined into the same process in order to perform the association calculation.

Without any optimization, Spark defaults to using Shuffle Join to achieve this. The Shuffle Join process mainly consists of two steps.

The first step is to shuffle the participating left and right tables separately. The shuffle partitioning rule is to calculate the hash value of the join keys and then take the hash value modulo the partition number. Since the partition numbers of the left and right tables are the same, after shuffling, it can be ensured that transaction records and user data with the same userID will be located in the same Executors.

After the shuffle is complete, the second step is to perform the join operation on the records with consistent userIDs within the same Executors using a Reduce task. However, since the transaction table is a fact table with an extremely large data volume, the thought of shuffling TB-level data is terrifying! Therefore, the code for directly joining the two DataFrames mentioned above still has a lot of room for optimization. What can we do? In other words, for data association in a distributed environment, is there any other way to ensure that transaction records and corresponding user information are in the same Executors?

Ways to Constrain Shuffle #

Remember when the business department asked us to encapsulate the user table as a broadcast variable for later use? Now it finally comes into play!

import org.apache.spark.sql.functions.broadcast
     
val transactionsDF: DataFrame = _
val userDF: DataFrame = _
     
val bcUserDF = broadcast(userDF)
transactionsDF.join(bcUserDF, Seq("userID"), "inner")

The Driver collects all data slices belonging to userDF from all Executors, aggregates the user data locally, and then sends a copy of the complete data to each Executor. Since each Executor has the complete data of userDF, at this point, the data partitions of the transaction table remain in place without any movement, making it easy to associate them with consistent user data. In this way, we don’t need to shuffle the transaction table with a huge data volume, but still achieve data association between the two tables in a distributed environment.

By using broadcast variables, we have successfully avoided the storage and distribution of massive data within the cluster, saving the disk and network overhead introduced by Shuffle and greatly improving runtime performance. Of course, optimizing with broadcast variables also comes at a cost, as creating and distributing broadcast variables also incurs network overhead. However, compared to the full network distribution of large tables, the network overhead of small tables is almost negligible. This kind of small investment and large output allows us to achieve high performance gains with minimal cost, truly showing the effect of “achieving a lot with a small effort”!

Conclusion #

In the context of data association, broadcast variables are a killer move to restrain Shuffle. By mastering it, we can achieve high performance benefits with minimal cost. The key is to grasp the two ways of creating broadcast variables.

The first way is to create a broadcast variable from a regular variable. Under the operation mechanism of broadcast variables, the data stored in regular variables is encapsulated into a broadcast variable, which is distributed by the driver to the executors. After receiving the broadcast variable, each executor assigns it to the BlockManager for management.

The second way is to create a broadcast variable from a distributed dataset, which is a bit more complex than the first method. The first step is for the driver to pull data partitions from all the executors and then construct a complete dataset locally. The second step is for the driver to distribute the aggregated complete dataset to each executor, and the executor caches the received complete dataset in the BlockManager of the storage system.

By combining these two methods, when we are doing data association, we can transform Shuffle Joins into Broadcast Joins. We can use broadcasting of small tables to replace the full distribution of large tables, truly restraining Shuffle.

Daily Exercise #

  1. The current implementation of the Spark broadcast mechanism has potential issues. When dealing with large amounts of data, the Driver may become a bottleneck. Can you think of a better way to re-implement the Spark broadcast mechanism? (Hint: SPARK-17556)

  2. In what situations is it not suitable to convert Shuffle Joins into Broadcast Joins?

Looking forward to reading your thoughts and answers in the comment section. See you in the next lesson!