13 Broadcast Variables Ii How to Let Spark SQL Choose Broadcast Joins

13 Broadcast Variables II - How to Let Spark SQL Choose Broadcast Joins #

Hello, I’m Wu Lei.

In the previous lesson, we talked about how broadcasting variables are a powerful tool for restraining shuffle in data association scenarios. Using broadcast joins instead of shuffle joins can greatly improve execution performance. However, many students only know how to use the default broadcasting variables and don’t know how to optimize them. So, how can we ensure that Spark prioritizes the broadcast join strategy at runtime?

In today’s lesson, I will focus on data association scenarios and provide two categories of optimization techniques from the perspectives of configuration items and development APIs, helping you to skillfully use broadcasting variables.

Forcing Broadcasts Using Configuration #

Let’s start by discussing the ways to make Spark prioritize Broadcast Joins from a configuration perspective. In the Spark SQL configuration, we mentioned the spark.sql.autoBroadcastJoinThreshold configuration parameter. Its default value is 10MB, representing the storage size. Its meaning is that if any of the two tables involved in a Join has a size smaller than 10MB, Spark will use Broadcast Joins for data association at runtime. In addition, AQE (Adaptive Query Execution) attempts to dynamically adjust the Join strategy at runtime based on this parameter, determining whether the filtered table is small enough to convert Shuffle Joins into Broadcast Joins.

Broadcast Joins

To help you understand, let me give you an example. In a data warehouse, we often see two tables: one is the order fact table, which for convenience, we’ll refer to as Fact; the other is the user dimension table, which we’ll call Dim. The fact table has a large volume at around 100GB, while the dimension table is quite small at around 1GB. The schemas of the two tables are as follows:

// Order fact table schema
orderID: Int
userID: Int
trxDate: Timestamp
productId: Int
price: Float
volume: Int
 
// User dimension table schema
userID: Int
name: String
age: Int
gender: String

When joining the Fact table and the Dim table based on the userID, since both tables have sizes far exceeding the default value of the spark.sql.autoBroadcastJoinThreshold parameter, Spark has no choice but to use Shuffle Joins as the implementation. However, if we adjust the value of this parameter to 2GB, and since the size of the Dim table is smaller than 2GB, Spark will encapsulate the Dim table into a broadcast variable and use Broadcast Joins to establish the data association between the two tables.

Clearly, for the majority of Join scenarios, the default value of the autoBroadcastJoinThreshold parameter, 10MB, is too low, as enterprise data volumes are typically in the TB or even PB range. Therefore, in order to effectively utilize Broadcast Joins, we need to increase the value of this parameter. Generally, choosing around 2GB is a good option.

Now we know that the key to forcing Spark to prioritize Broadcast Joins using the broadcast threshold configuration parameter is to ensure that at least one table has a storage size smaller than the broadcast threshold.

However, when setting the broadcast threshold, many students complain to me, saying, “My data size is clearly smaller than the broadcast threshold set by the autoBroadcastJoinThreshold parameter, so why doesn’t Spark SQL choose Broadcast Joins at runtime?”

After understanding the details, I realized that what these students refer to as data size actually refers to the storage size of the tables on disk, such as the results obtained using system commands like ls or du -sh. It is important to note that the storage size of the same set of data in memory can often be several times or even dozens of times larger than its size on disk. This is mainly due to two reasons.

First, in order to improve storage and access efficiency, developers generally use compressed formats such as Parquet or ORC to store data on disk. After the data with high compression ratio is decoded into memory, its size often increases several times.

Second, limited by the object management mechanism, JVM often requires a larger memory space than the original data size to store objects in heap memory.

Let’s take an example. The string “abcd” theoretically only needs to consume 4 bytes, but JVM requires a total of 48 bytes to store these 4 strings in heap memory! So when running, it is not uncommon for a seemingly small disk dataset to increase in size by 4 or 5 times or even more in memory. Therefore, if you configure the autoBroadcastJoinThreshold broadcast threshold based on the storage size on disk, you will likely encounter the same dilemma.

Now the question arises: what methods can accurately estimate the storage size of a table in memory?

First, we need to avoid a pitfall. I have found that many materials recommend using the built-in Spark SizeEstimator to estimate the storage size of distributed datasets. Based on multiple practical experiences and lessons learned, we have to point out that the estimation results of SizeEstimator are not accurate. Therefore, you can skip this method, which will also save you time and effort in optimization.

I believe a more reliable method is: First, cache the table you want to estimate the size of in memory, for example, by calling the cache method directly on the DataFrame or Dataset; Second, read the statistical data of the Spark SQL execution plan. This is because Spark SQL relies on these statistical data to define and adjust execution strategies at runtime.

val df: DataFrame = _
df.cache.count
 
val plan = df.queryExecution.logical
val estimated: BigInt = spark
.sessionState
.executePlan(plan)
.optimizedPlan
.stats
.sizeInBytes

You might say, “Although this method is accurate, by caching the data in memory and then calculating its storage size, it is actually optimizing at runtime. It is more accurate, of course, but it also requires more time and effort for optimization.” Yes, using this calculation method does indeed require more time and effort for optimization, but in many cases, especially when the execution efficiency of Shuffle Joins is extremely frustrating, this level of effort is worthwhile.

Forcing Broadcast with API #

Since it’s so complicated to estimate the data size, is there a way to let Spark SQL choose Broadcast Joins without configuring a broadcast threshold? Yes, there are several ways.

Developers can use Join Hints or the broadcast function in SQL functions to force Spark SQL to use Broadcast Joins during runtime for data association. Let me talk about their meanings, purposes, and how to use them separately. It must be noted that these two methods are equivalent and have no difference in quality. It’s just that with more options, you can flexibly develop according to your preferences and habits.

Forcing Broadcast with Join Hints #

Hints in Join Hints mean “suggestions”. It refers to using special syntax during development to explicitly inform Spark SQL which Join strategy to use at runtime. Once you enable Join Hints, regardless of whether your data tables meet the broadcast threshold or not, Spark SQL will do its best to respect your wishes and choices and use Broadcast Joins to complete data association.

Let’s take an example. Suppose there are two tables, one with a memory size in the order of 100GB and the other smaller, around 2GB. When the broadcast threshold is set to 2GB, Broadcast Joins are not triggered. However, we don’t want to spend time and effort to accurately calculate the memory usage of the small table. In this case, we can use Join Hints to help us optimize with just a few hints.

val table1: DataFrame = spark.read.parquet(path1)
val table2: DataFrame = spark.read.parquet(path2)
table1.createOrReplaceTempView("t1")
table2.createOrReplaceTempView("t2")
 
val query: String = “select /*+ broadcast(t2) */ * from t1 inner join t2 on t1.key = t2.key”
val queryResults: DataFrame = spark.sql(query)

You see, in the above code example, as long as we add the hint /*+ broadcast(t2) */ in the SQL structured query statement, we can force Spark SQL to broadcast the small table t2 and use Broadcast Joins for implementation at runtime. In the hint statement, besides using broadcast, we can also use broadcastjoin or mapjoin, which have the same effect.

If you don’t like using SQL structured query statements, especially if you don’t want to frequently register data tables in the Spark SQL context, you can also use Join Hints in the DSL syntax of DataFrame.

table1.join(table2.hint("broadcast"), Seq("key"), "inner")

In the above DSL statement, as long as we call the hint method on table2 and specify the broadcast keyword, we can achieve the same effect of forcing broadcasting of table 2.

In conclusion, Join Hints allow developers to flexibly choose the Join strategy at runtime. For those who are familiar with the business and understand the data, Join Hints allow developers to leverage their expertise above the optimization engine of Spark SQL and better serve the business.

However, Join Hints also have a small flaw. If there is a spelling mistake in the keyword, Spark SQL will not explicitly throw an exception at runtime but silently ignore the misspelled hints as if they do not exist. Therefore, when using Join Hints, we need to verify and debug them during compilation.

Forcing Broadcast with the broadcast function #

If you don’t want to wait until runtime to detect problems and want the compiler to help you check similar spelling mistakes, then you can use the second method of forcing broadcast: the broadcast function. This function is in the library org.apache.spark.sql.functions. The usage is very simple and more convenient than Join Hints. Just wrap the data table to be broadcasted with the broadcast function, as shown below.

import org.apache.spark.sql.functions.broadcast
table1.join(broadcast(table2), Seq("key"), "inner")

You may ask, “Since developers can force Spark SQL to choose Broadcast Joins through Join Hints and the broadcast function, can I disregard the broadcast threshold configuration?” Actually, that’s not the case. I believe that the broadcast threshold configuration should be the main consideration, and forcing broadcast should be a supplementary method. This often turns out to be a good choice.

Setting the broadcast threshold allows Spark SQL to have the choice, especially under the mechanism of AQE, to dynamically adjust the Join strategy at runtime. Forcing broadcast allows developers to guide Spark SQL on how to choose runtime strategies based on their expertise. The two methods are complementary and not conflicting. Developers can balance the Spark SQL optimization strategy and expert experience in their applications by using them flexibly.

Broadcast Variables are not a Silver Bullet #

However, although we have been emphasizing that broadcast variables are a killer feature in data correlation scenarios for restraining Shuffle operations, broadcast variables are not a silver bullet.

Just as some students would say, “As developers, we have so many options, and we can even force Spark to choose Broadcast Joins. Can we use Broadcast Joins for all Join operations?” The answer is definitely no. Broadcast variables cannot solve all data correlation problems.

First of all, in terms of performance, the Driver needs to pull all data shards of the distributed dataset when creating broadcast variables. In this process, network overhead and Driver memory become performance risks. The larger the size of the broadcast variable, the more additional performance overhead it introduces. Moreover, if the size of the broadcast variable exceeds 8GB, Spark will directly throw an exception and interrupt task execution.

Secondly, in terms of functionality, not all Join types can be converted to Broadcast Joins. Firstly, Broadcast Joins do not support Full Outer Joins. Secondly, in all data correlations, we cannot broadcast the base table. Even if the developer forces the base table to be broadcast, it is useless. For example, in Left Outer Join, we can only broadcast the right table, and in Right Outer Join, we can only broadcast the left table. In the code below, even if we force broadcast using the broadcast function, Spark SQL will still choose Shuffle Joins at runtime.

import org.apache.spark.sql.functions.broadcast
broadcast (table1).join(table2, Seq("key"), "left")
table1.join(broadcast(table2), Seq("key"), "right")

Summary #

In this lecture, we summarized two methods to enable Spark SQL to choose the Broadcast Joins strategy at runtime, which are setting configuration options and using API to force broadcasting.

Firstly, setting the configuration option is mainly done by setting the autoBroadcastJoinThreshold configuration parameter. Developers use this configuration parameter to instruct the Spark SQL optimizer. If the size of one of the two tables participating in the join is smaller than the value set by this parameter, the Broadcast Joins implementation will be used at runtime.

To enable Spark SQL to use Broadcast Joins, developers need to ensure that the size of the data table in memory is smaller than the value set for the autoBroadcastJoinThreshold parameter.

In addition, when setting the broadcasting threshold, we need to consider that the storage size increases exponentially when the disk data is expanded to memory. This often leads to Spark SQL being unable to use the Broadcast Joins strategy. Therefore, when performing data association, we also need to estimate the storage size of a table in memory. One accurate estimation method is to first cache the DataFrame and then read the statistics data from the execution plan.

Secondly, there are two methods to force broadcasting using API, which are setting Join Hints and using the broadcast function. The method of setting Join Hints is to add a hint “/*+ broadcast(table) */” in the structured query statement. The broadcast keyword here can also be replaced with broadcastjoin or mapjoin. In addition, you can also use the hint method in the DSL syntax of the DataFrame to specify the broadcast keyword to achieve the same effect. The method of using the broadcast function is very simple, just wrap the data table that needs to be broadcasted with the broadcast function.

In summary, both setting configuration options and using API to force broadcasting have their advantages and disadvantages. Therefore, configuring the broadcast threshold as the main approach with forced broadcasting as a supplement is often a good choice.

Finally, it is worth noting that broadcast variables are not a silver bullet and cannot solve all data association problems. Therefore, in daily development work, you should be careful not to abuse broadcasting.

Daily Practice #

  1. In addition to the broadcast keyword, what other join types and keywords does Join Hints support in Spark 3.0?
  2. Can a DataFrame be broadcasted using the sparkContext.broadcast function? What is the difference between this and the org.apache.spark.sql.functions.broadcast function?

Looking forward to seeing your thoughts and answers in the comment section. See you in the next lesson!