27 Large Table Joins Small Table What to Do When Broadcast Variables Cannot Fit Small Table

27 Large Table Joins Small Table - What to Do When Broadcast Variables Cannot Fit Small Table #

Hello, I’m Wu Lei.

In the field of data analysis, it is very common to join a large table with a small table. However, size is a relative concept, and generally speaking, if the size of the large table is more than three times larger than the small table, we classify it as a “joining a large table with a small table” computational scenario. Therefore, joining a large table with a small table simply means that there is a significant difference in size between the two tables involved in the join.

For this scenario of joining a large table with a small table, we should prioritize considering the Broadcast Hash Join (BHJ), as it is the most efficient among the five join strategies supported by Spark. The prerequisite for BHJ to handle joining a large table with a small table is that the broadcast variable can accommodate the full data of the small table. However, what should we do if the data size of the small table exceeds the broadcast threshold?

In today’s lecture, we will discuss three real-world business cases to explore the solutions to this situation. Although these three cases may not cover all situations in the “joining a large table with a small table” scenario, analyzing and summarizing the strategies and solutions from these cases can help broaden our thinking and avoid getting stuck in the dilemma of “not knowing where to start” when optimizing.

Case 1: Join Key much larger than Payload #

In the first case, we have a large table of 100GB and a small table of 10GB, both of which exceed the broadcast variable threshold (default is 10MB). However, because the size of the small table already exceeds 8GB, Spark will directly throw an exception and interrupt the task execution when creating a broadcast variable on a dataset larger than 8GB. Therefore, Spark is unable to apply the BHJ (Broadcast Hash Join) mechanism. So what should we do? Let’s not rush, let’s take a look at the business requirements of this case.

This case comes from traffic forecasting in the field of computational advertising, where traffic refers to the number of visits from different types of users in a system over a certain period of time. There are three key factors here: the system, the time period, and the user type. The time granularity is easily understood as the flow is counted in hours. User type refers to different dimensions used to describe users, such as gender, age, education level, occupation, and geographical location. The system refers to the source of the traffic, such as platform, website, channel, and media domain.

Under the combination of the system and user dimensions, the traffic is divided into millions of different “types”. For example, the traffic from a certain platform’s website from college students, or the traffic from a certain media channel from females aged 25-45, and so on.

We know that traffic forecasting itself is a time series problem, similar to stock price forecasting, both are based on historical data to predict the future. In our case, in order to predict the hundreds of thousands of different types of traffic, we need to first generate time series sequences for each type of traffic, and then feed these time series sequences to machine learning algorithms for model training.

The data source for traffic statistics is the online access log, which records which types of users accessed which websites at what time. We need to build time series sequences in units of hours, but because the granularity of traffic segmentation is very fine, there may be some types of traffic that do not have visitor volume in every hour, as shown in the diagram below.

As we can see, in the past 24 hours, a certain type of traffic only had data records in the 20-24 time period, with no records in other time periods, that is, zero traffic. In this case, we need to fill in the missing period with “zero”. So how do we fill it in?

Because the business requirement is to fill in the missing values, at the implementation level, we can first build a complete zero traffic table, and then use the statistical traffic data to replace the corresponding positions of “zero traffic” in the complete zero traffic table at the granularity of system, user, and time dimensions. This concept may seem complex when described in words, but it will be more intuitive and easier to understand with the help of diagrams.

Firstly, we generate a complete zero traffic table, as shown on the left side of the diagram, referred to as the “negative sample table”. The primary key of this table consists of various dimensions that divide the types of traffic, such as gender, age, platform, website, hour period, and so on. The payload of the table only has one column, which represents the traffic volume. When generating the “negative sample table”, all values in this column are set to zero.

Then, we aggregate the access volume from the logs using the same dimensions combination and obtain the “positive sample table” shown on the right side of the diagram. It is not difficult to see that the schema of the two tables is identical. In order to obtain the complete time series sequence, we only need to left join the external table with the internal table using a “left outer join” operation. The specific query statement is as follows:

// Query statement for left join
select t1.gender, t1.age, t1.city, t1.platform, t1.site, t1.hour, coalesce(t2.access, t1.access) as access
from t1 left join t2 on
t1.gender = t2.gender and
t1.age = t2.age and
t1.city = t2.city and
t1.platform = t2.platform and
t1.site = t2.site and
t1.hour = t2.hour

By using a left join, we can directly replace the zero traffic in the external table with the access volume from the internal table. The result of joining the two tables is exactly the time series sequence we want. The “positive sample table” comes from the access log and only contains time segments where traffic exists, while the “negative sample table” is the generated table that includes all time periods. Therefore, in terms of data, the negative sample table is much larger than the positive sample table, which is a typical scenario of “joining a large table with a small table”. Although the size of the small table (10GB) is one order of magnitude smaller than the large table (100GB), the volumes of both tables do not meet the preconditions for BHJ. Therefore, Spark has no choice but to use the SMJ (Shuffle Sort Merge Join) implementation. We know that the SMJ mechanism introduces Shuffle, and it is not a wise choice to distribute hundreds of GB of data across the entire network. So, following the development principle of “save whenever possible,” is it possible for us to “save” the Shuffle here? In order to eliminate Shuffle, we only have one option: converting SMJ to BHJ. You might say, “We’ve said it several times already, the size of the small table, 10GB, far exceeds the broadcast threshold. What else can we do?”

Where there’s a will, there’s a way. Let’s first reflect on the purpose of joining these two tables. The goal is to replace the zero values in the outer table with the access count from the inner table based on the combination of dimensions (Join Keys). So, what are the characteristics of these two tables? First, the schema of the two tables is completely identical. Second, both the number and size of the Join Keys in both tables are much larger than the Payload. So, here’s the question: do we really need to perform the join using so many and such long Join Keys in order to achieve our goal?

The answer is no. In the previous lecture, we introduced the implementation principle of Hash Join. During the Build phase, Hash Join uses the hash algorithm to create a hash table. During the Probe phase, the hash table not only provides an O(1) lookup efficiency but also has a much more efficient comparison between Hash Keys compared to Join Keys. Inspired by this, why don’t we calculate the hash value of the Join Keys and use the generated hash value as the new Join Key?

We can completely generate a brand new data column, which we can call “Hash Key”, based on the existing Join Keys. The generation process consists of two steps:

  • Concatenate all Join Keys together, from gender, age, all the way to hour, as shown in steps 1 and 3 in the figure.
  • Use a hash algorithm (such as MD5 or SHA256) to hash the concatenated string, obtaining the hash value, which becomes the “Hash Key”, as shown in steps 2 and 4 in the figure.

We perform this operation on both tables. In this way, when performing a left join, in order to associate records with matching primary keys together, we no longer need to use the original Join Keys, which are numerous and lengthy. Instead, we can use a single generated column, the Hash Key. Consequently, the SQL query statement also becomes as follows:

// Adjusted left join query statement
select t1.gender, t1.age, t1.city, t1.platform, t1.site, t1.hour, coalesce(t2.access, t1.access) as access
from t1 left join t2 on
t1.hash_key = t2.hash_key

After adding this column, we can eliminate all the Join Keys in the inner table, which is the “positive sample table”, greatly reducing the storage space required for the inner table. Step 5 in the figure demonstrates this process. When the inner table is reduced enough to fit into a broadcast variable, we can convert SMJ to BHJ, thus completely avoiding the Shuffle stage in SMJ.

With this optimization, the storage size of the inner table, after removing the Join Keys, becomes 1.5GB. For this level of storage, we can use configuration options or force broadcasting to perform the transformation from Shuffle Join to Broadcast Join. For the specific transformation method, please refer to the lecture on broadcast variables (Lecture 13).

At this point, practically all the problems in Case Study 1 have been resolved. However, there is one small detail here that we need to pay special attention to. The key to the optimization in Case Study 1 is to replace Join Keys with Hash Keys and then remove redundant data from the inner table. Hash Key is actually the hash value obtained by concatenating the Join Keys. Since a hash computation is involved, we must consider the issue of hash collisions.

We are all familiar with hash collisions, which refer to different data sources resulting in the same hash value after the hash computation. In Case Study 1, if we introduce hash computation for optimization and encounter hash collisions, it will destroy the original association relationship. For example, two different Join Keys might be associated together because their hash values happen to be the same. Obviously, this is not the result we want.

There are actually many methods to eliminate the risk of hash collisions, such as “double hashing,” which means using two different hash algorithms to generate the Hash Key column. The probability of two different records resulting in the same value under two different hash algorithm operations is almost zero.

Case 2: High Selectivity of Filtering Conditions #

In addition to the situation where Join Keys are much larger than Payload, there is another situation where the Selectivity of filtering conditions is high. This case comes from the e-commerce scenario. In a star schema data warehouse, we have two tables: the orders table and the users table. The orders table is the fact table, and the users table is the dimension table.

The business requirement of this case is simple: to calculate the revenue contributed by all head users and sort it in descending order. The schemas of the orders table and the users table are shown in the table below.

// Key fields of the orders table
userId, Int
itemId, Int
price, Float
quantity, Int

// Key fields of the users table
id, Int
name, String
type, String // Enum values, divided into head users and long tail users

Given the above data tables, we only need to perform an inner join on the two tables, followed by grouping, aggregation, and sorting to achieve the business logic. The specific query statement is as follows.

//Query statement
select (orders.price * order.quantity) as revenue, users.name
from orders inner join users on orders.userId = users.id
where users.type = 'Head User'
group by users.name
order by revenue desc

In this case, the storage capacity of the fact table is at the TB level, and the dimension table is around 20GB, both exceeding the broadcast threshold. In fact, such join scenarios are common in e-commerce, computational advertising, and recommendation search fields.

For join scenarios where both tables far exceed the broadcast threshold, if we don’t do any optimization, Spark will choose the Sort-Merge Join (SMJ) strategy. In a distributed environment with 10 C5.4xlarge AWS EC2 instances, SMJ takes nearly 5 hours to complete the join computation of the two tables. We definitely cannot accept such low execution efficiency. So what optimizations can we do? Take a moment to think about it, and then we will analyze it together.

By carefully observing the query statement above, we find that this is a typical star join query, where the fact table is joined with the dimension table and the dimension table has a filtering condition. The filtering condition on the dimension table is users.type = 'Head User', which only selects the head users. Usually, the proportion of head users is very low compared to regular users. In other words, the selectivity of this filtering condition is high, which helps filter out the majority of the dimension table data. In our case, since the proportion of head users is less than one thousandth, the filtered dimension table size is very small and can easily fit into a broadcast variable.

This is where Adaptive Query Execution (AQE) comes in. We know that AQE allows Spark SQL to dynamically adjust the join strategy at runtime. We can leverage this feature to convert the originally selected SMJ strategy to the Broadcast Hash Join (BHJ) strategy (don’t forget, AQE is disabled by default, so we need to enable the spark.sql.adaptive.enabled configuration to use the features it provides).

However, even if the selectivity of the filtering condition is high, around one thousandth, the filtered dimension table still has a size of 20MB, which exceeds the default broadcast threshold of 10MB. Therefore, we need to increase the broadcast threshold, such as to 1GB, so that AQE will downgrade SMJ to BHJ. After making these optimizations, the end-to-end execution time of the job is reduced from the previous 5 hours to 30 minutes, with the same cluster size.

Does our optimization end after improving the job execution performance by an order of magnitude? In the lecture on the essence of optimization, we repeatedly emphasized that optimization is an ongoing process as one bottleneck is resolved while new bottlenecks may arise. In this process, we need to dynamically switch optimization methods based on the changing bottlenecks, in order to achieve a state where all bottlenecks are eliminated and everything is balanced.

So, after using the dynamic join strategy to distribute the massive data introduced by Shuffle in the SMJ strategy, are there any “new” bottlenecks that need to be addressed?

For star joins like the one in our case, we can further reduce the amount of scanning in the fact table and reduce I/O overhead to improve performance by using the Dynamic Partition Pruning (DPP) mechanism. Unlike AQE, DPP does not require any special settings by developers. As long as the conditions are met, the DPP mechanism is triggered automatically.

However, there are three prerequisites that need to be met in order to use DPP optimization:

  • DPP only supports equi-joins and does not support non-equijoin relationships such as greater than or less than.
  • The size of the filtered data set in the dimension table must be smaller than the broadcast threshold, so developers need to pay attention to adjusting the spark.sql.autoBroadcastJoinThreshold configuration item.
  • The fact table must be a partitioned table, and the partitioning column (which can be multiple columns) must include the join key.

We can directly determine that the query satisfies the first two conditions. The satisfaction of the first condition is straightforward. The second condition is satisfied because, after the optimization of AQE, the broadcast threshold is large enough to accommodate the filtered dimension table. So, in order to use the DPP mechanism, we must make orders a partitioned table, which involves two things:

  • Create a new orders table called orders_new, with userId specified as the partition key.
  • Transfer all data from the original orders table orders to this new table orders_new.
//Query statement
select (orders_new.price * orders_new.quantity) as revenue, users.name
from orders_new inner join users on orders_new.userId = users.id
where users.type = 'Head User'
group by users.name
order by revenue desc

After replacing orders table with orders_new table, the query time is further reduced from 30 minutes to 15 minutes in the same distributed environment.

You may say, “Rebuilding and transferring tables takes a significant amount of time in order to use DPP! Doesn’t this shift the running time from the query to table creation and data migration?” You’re right, that’s exactly what it does. If modifying table structures and migrating data temporarily to achieve query performance is not worth it, it’s considered a “temporary solution.” Therefore, in order to make the most of DPP, when planning the data warehouse, developers should design the table structure in advance, including schema, partitioning keys, storage formats, and so on, based on common queries and typical scenarios.

Case 3: Even Distribution of Small Table Data #

In the previous two cases, we followed the development principle of “minimizing as much as possible” and tried our best to switch from Shuffle Joins to Broadcast Joins to eliminate Shuffle. However, there are always some “stubborn” scenarios where no matter how we optimize, it is impossible to achieve this. So what should we do for these “stubborn cases”?

We know that if the join scenario does not meet the BHJ conditions, Spark SQL will prioritize the SMJ strategy to complete the join calculation. However, as we mentioned in the previous lecture, when the sizes of the two tables involved in the join are vastly different and the small table data is evenly distributed, SHJ often has a higher execution efficiency than SMJ. The reason is simple: the overhead of constructing a hash table for the small table is smaller than the overhead of sorting both tables.

Let’s take the query from the previous case as an example again, but this time we remove the filter condition for the dimension table and calculate the revenue contributed by all users. In a distributed environment with 10 C5.4xlarge AWS EC2 instances, SMJ without the filter condition took nearly 7 hours to complete the join calculation of the two tables.

-- Query statement
select (orders.price * order.quantity) as revenue, users.name
from orders inner join users on orders.userId = users.id
group by users.name
order by revenue desc

Since the filter condition for the dimension table no longer exists, the two optimization methods in Case 2, namely AQE Join strategy adjustment and DPP mechanism, are no longer effective prerequisites. In this case, we can use Join Hints to force Spark SQL to choose the SHJ strategy for the join calculation, and the revised query statement is shown in the table below.

-- Query statement after adding Join hints
select /*+ shuffle_hash(orders) */ (orders.price * order.quantity) as revenue, users.name
from orders inner join users on orders.userId = users.id
group by users.name
order by revenue desc

After adjusting the Join strategy to SHJ, in the same cluster size, the end-to-end execution time of the job was reduced from nearly 7 hours to 5 hours. Compared to before the optimization, we achieved an improvement of nearly 30% in performance.

It should be noted that for SHJ to successfully complete the calculation without throwing an OOM exception, it is necessary to ensure that each data shard of the small table can fit into memory. This is also why we require the distribution of data in the small table to be even. If there is data skewness in the small table, OOM due to skewed partitions will be a high probability event, and the SHJ calculation will be interrupted as a result.

Summary #

In today’s lecture, we explored and unlocked optimization ideas and methods for “joining large tables with small tables” in different scenarios based on three cases.

Firstly, we defined what “joining large tables with small tables” means. Generally speaking, when the size difference between the two tables involved in the join is more than three times, it can be considered as a scenario for “joining large tables with small tables”.

Secondly, we discussed three cases where we cannot choose Broadcast Hash Join (BHJ).

The first case is when the size of the join keys is much larger than the payload of the data association. In this case, we can use mapping methods (such as hash operations) to replace long join keys with shorter strings, thereby greatly reducing the storage space of the small table. If the reduced small table is small enough to fit into a broadcast variable, we can convert Sort Merge Join (SMJ) to BHJ to eliminate heavy shuffle calculations. It should be noted that the mapping method should be able to effectively avoid the problem of “mapping conflicts” and prevent different join keys from being mapped to the same value.

The second case is when the small table carries filtering conditions and the selectivity of the filtering conditions is high. In this case, we can use the Join strategy adjustment feature of Adaptive Query Execution (AQE) to convert SMJ to BHJ at runtime, thereby greatly improving execution performance. There are two points we need to pay special attention to: first, in order to successfully complete the conversion, we need to ensure that the size of the dimension table after filtering is smaller than the broadcast threshold; second, if the large table itself is partitioned based on the join keys, we can also make full use of Dynamic Partition Pruning (DPP) mechanism to further reduce the I/O overhead of scanning the large table and improve performance.

The third case is when the small table does not have filtering conditions and its size exceeds the broadcast threshold. If the data distribution of the small table itself is relatively uniform, we can consider using Join hints to force Spark SQL to select Sort Hash Join (SHJ) as the join strategy at runtime. Generally, in the scenario of “joining large tables with small tables”, SHJ has better execution efficiency compared to SMJ. The reason behind this is that the overhead of building a hash table for the small table is smaller than the overhead of sorting both tables.

Daily Exercise #

  1. For case 1, our core idea is to use hash values to replace long Join Keys. Besides using hash values, do you think there are any other ideas or methods to replace long Join Keys with shorter strings?

  2. For case 2, the key to adjusting AQE Join strategy and DDP mechanism is to ensure that the filtered dimension table is smaller than the broadcast threshold. Can you discuss the methods used to calculate the size of the filtered dimension table?

  3. For case 3, assuming the small table with 20GB of data is skewed, converting SMJ to SHJ will throw an OOM exception. In this case, do you think it is still possible to continue optimizing?

Looking forward to seeing your thoughts and answers in the comment section. See you in the next session!